Skip to content

Commit 4d98675

Browse files
committed
Fix HTTP/2 authority routing for CDN requests
1 parent 4835264 commit 4d98675

2 files changed

Lines changed: 130 additions & 12 deletions

File tree

src/proxy.rs

Lines changed: 129 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@ use tokio_rustls::rustls::{ClientConfig, DigitallySignedStruct, Error as RustlsE
3737

3838
use crate::certs::CertificateBundle;
3939
use crate::config::AppConfig;
40+
use crate::paths::AppPaths;
41+
use crate::runtime_log;
4042

4143
struct AppState {
4244
config: AppConfig,
45+
paths: AppPaths,
4346
doh_client: Client,
4447
upstream_tls_connector: TlsConnector,
4548
resolve_cache: RwLock<HashMap<ResolveCacheKey, CachedResolvedUpstream>>,
@@ -77,6 +80,11 @@ struct CachedDohAnswers {
7780
expires_at: Instant,
7881
}
7982

83+
struct UpstreamResponse {
84+
response: Response<Incoming>,
85+
negotiated_protocol: &'static str,
86+
}
87+
8088
#[derive(Debug, Default)]
8189
struct HttpsServiceBinding {
8290
priority: u16,
@@ -106,7 +114,7 @@ const DOH_CONNECT_TIMEOUT: Duration = Duration::from_secs(4);
106114
const DOH_REQUEST_TIMEOUT: Duration = Duration::from_secs(6);
107115
const UPSTREAM_CONNECT_TIMEOUT: Duration = Duration::from_secs(4);
108116

109-
pub async fn run_proxy(config: AppConfig, bundle: CertificateBundle) -> Result<()> {
117+
pub async fn run_proxy(config: AppConfig, paths: AppPaths, bundle: CertificateBundle) -> Result<()> {
110118
let doh_client = Client::builder()
111119
.connect_timeout(DOH_CONNECT_TIMEOUT)
112120
.timeout(DOH_REQUEST_TIMEOUT)
@@ -124,6 +132,7 @@ pub async fn run_proxy(config: AppConfig, bundle: CertificateBundle) -> Result<(
124132
upstream_tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
125133
let state = Arc::new(AppState {
126134
config,
135+
paths,
127136
doh_client,
128137
upstream_tls_connector: TlsConnector::from(Arc::new(upstream_tls_config)),
129138
resolve_cache: RwLock::new(HashMap::new()),
@@ -225,7 +234,7 @@ async fn redirect_handler(
225234
request: Request<Incoming>,
226235
state: Arc<AppState>,
227236
) -> Result<Response<Full<Bytes>>, Infallible> {
228-
let host = extract_host(request.headers(), &state.config)
237+
let host = extract_host(request.headers(), request.uri(), &state.config)
229238
.unwrap_or_else(|| state.config.server_common_name.clone());
230239
let path = request
231240
.uri()
@@ -251,7 +260,7 @@ async fn proxy_handler(
251260
request: Request<Incoming>,
252261
state: Arc<AppState>,
253262
) -> Result<Response<Full<Bytes>>, Infallible> {
254-
let host = match extract_host(request.headers(), &state.config) {
263+
let host = match extract_host(request.headers(), request.uri(), &state.config) {
255264
Some(host) => host,
256265
None => {
257266
return Ok(simple_response(
@@ -315,9 +324,10 @@ async fn forward_request(
315324
)
316325
.await?;
317326

318-
let status = upstream_response.status();
319-
let headers = upstream_response.headers().clone();
327+
let status = upstream_response.response.status();
328+
let headers = upstream_response.response.headers().clone();
320329
let body = upstream_response
330+
.response
321331
.into_body()
322332
.collect()
323333
.await
@@ -346,7 +356,7 @@ async fn dispatch_upstream_request(
346356
headers: HeaderMap,
347357
body: Bytes,
348358
path_and_query: &str,
349-
) -> Result<Response<Incoming>> {
359+
) -> Result<UpstreamResponse> {
350360
let upstream = resolve_upstream(state, request_host, upstream_port).await?;
351361
let ech_config = upstream
352362
.ech_config
@@ -355,6 +365,20 @@ async fn dispatch_upstream_request(
355365

356366
let mut last_error = None;
357367
for addr in upstream.addrs.iter().copied() {
368+
log_upstream_debug(
369+
state,
370+
request_host,
371+
path_and_query,
372+
&format!(
373+
"attempt addr={addr} scheme={upstream_scheme} ech={} edge_node={}",
374+
if upstream.ech_config.is_some() { "yes" } else { "no" },
375+
state
376+
.config
377+
.edge_node_override()
378+
.filter(|value| !value.trim().is_empty())
379+
.unwrap_or("-")
380+
),
381+
);
358382
match send_once(
359383
state,
360384
upstream_scheme,
@@ -371,9 +395,37 @@ async fn dispatch_upstream_request(
371395
{
372396
Ok(response) => {
373397
remember_successful_upstream(state, request_host, upstream_port, addr).await;
398+
let status = response.response.status();
399+
let cf_ray = response
400+
.response
401+
.headers()
402+
.get("cf-ray")
403+
.and_then(|value| value.to_str().ok())
404+
.unwrap_or("-");
405+
let content_type = response
406+
.response
407+
.headers()
408+
.get("content-type")
409+
.and_then(|value| value.to_str().ok())
410+
.unwrap_or("-");
411+
log_upstream_debug(
412+
state,
413+
request_host,
414+
path_and_query,
415+
&format!(
416+
"success addr={addr} protocol={} status={} cf-ray={} content-type={}",
417+
response.negotiated_protocol, status, cf_ray, content_type
418+
),
419+
);
374420
return Ok(response);
375421
}
376422
Err(error) => {
423+
log_upstream_debug(
424+
state,
425+
request_host,
426+
path_and_query,
427+
&format!("failure addr={addr} error={error:#}"),
428+
);
377429
eprintln!("ech upstream attempt failed for {request_host} via {addr}: {error:#}");
378430
last_error = Some(error);
379431
}
@@ -395,20 +447,29 @@ async fn send_once(
395447
headers: HeaderMap,
396448
body: Bytes,
397449
path_and_query: &str,
398-
) -> Result<Response<Incoming>> {
450+
) -> Result<UpstreamResponse> {
399451
let request = build_upstream_request(request_host, method, headers, body, path_and_query)?;
400452

401453
if upstream_scheme.eq_ignore_ascii_case("http") {
402454
let stream = connect_tcp(addr).await?;
403-
return send_over_io(TokioIo::new(stream), request).await;
455+
return Ok(UpstreamResponse {
456+
response: send_over_io(TokioIo::new(stream), request).await?,
457+
negotiated_protocol: "http/1.1",
458+
});
404459
}
405460

406461
let tls_stream = connect_tls(state, request_host, outer_sni, ech_config, addr).await?;
407462
let negotiated_h2 = tls_stream.get_ref().1.alpn_protocol() == Some(b"h2");
408463
if negotiated_h2 {
409-
return send_over_io_http2(TokioIo::new(tls_stream), request).await;
464+
return Ok(UpstreamResponse {
465+
response: send_over_io_http2(TokioIo::new(tls_stream), request).await?,
466+
negotiated_protocol: "h2",
467+
});
410468
}
411-
send_over_io(TokioIo::new(tls_stream), request).await
469+
Ok(UpstreamResponse {
470+
response: send_over_io(TokioIo::new(tls_stream), request).await?,
471+
negotiated_protocol: "http/1.1",
472+
})
412473
}
413474

414475
fn build_upstream_request(
@@ -538,6 +599,16 @@ async fn resolve_upstream(state: &AppState, host: &str, port: u16) -> Result<Res
538599
if let Some(cached) = read_cached_upstream(state, &cache_key).await {
539600
let mut cached = cached;
540601
prioritize_preferred_upstream(state, &cache_key, &mut cached.addrs).await;
602+
log_upstream_debug(
603+
state,
604+
host,
605+
"/",
606+
&format!(
607+
"resolve cache-hit addrs={} ech={}",
608+
format_socket_addrs(&cached.addrs),
609+
if cached.ech_config.is_some() { "yes" } else { "no" }
610+
),
611+
);
541612
return Ok(cached);
542613
}
543614

@@ -638,6 +709,21 @@ async fn resolve_upstream(state: &AppState, host: &str, port: u16) -> Result<Res
638709
.collect::<Vec<_>>();
639710
let mut upstream = ResolvedUpstream { addrs, ech_config };
640711
prioritize_preferred_upstream(state, &cache_key, &mut upstream.addrs).await;
712+
log_upstream_debug(
713+
state,
714+
host,
715+
"/",
716+
&format!(
717+
"resolve binding_host={binding_host} target_host={target_host} addrs={} ech={} edge_node={}",
718+
format_socket_addrs(&upstream.addrs),
719+
if upstream.ech_config.is_some() { "yes" } else { "no" },
720+
state
721+
.config
722+
.edge_node_override()
723+
.filter(|value| !value.trim().is_empty())
724+
.unwrap_or("-")
725+
),
726+
);
641727
let resolve_ttl = min_duration_options(binding_ttl, min_duration_options(addr_ttl, extra_ttl))
642728
.unwrap_or(FALLBACK_RESOLVE_CACHE_TTL);
643729
write_cached_upstream(state, cache_key, upstream.clone(), resolve_ttl).await;
@@ -919,6 +1005,34 @@ fn parse_dns_host_override(raw: &str) -> Result<DnsHostOverride> {
9191005
Ok(DnsHostOverride::Alias(value.to_string()))
9201006
}
9211007

1008+
fn should_trace_upstream(host: &str) -> bool {
1009+
matches!(
1010+
host.to_ascii_lowercase().as_str(),
1011+
"cdn3.linux.do" | "linux.do"
1012+
)
1013+
}
1014+
1015+
fn log_upstream_debug(state: &AppState, host: &str, path_and_query: &str, message: &str) {
1016+
if !should_trace_upstream(host) {
1017+
return;
1018+
}
1019+
1020+
let _ = runtime_log::append(
1021+
&state.paths,
1022+
"INFO",
1023+
"proxy-upstream",
1024+
&format!("host={host} path={path_and_query} {message}"),
1025+
);
1026+
}
1027+
1028+
fn format_socket_addrs(addrs: &[SocketAddr]) -> String {
1029+
addrs
1030+
.iter()
1031+
.map(ToString::to_string)
1032+
.collect::<Vec<_>>()
1033+
.join(",")
1034+
}
1035+
9221036
fn parse_https_answer(raw_rdata: &str) -> Result<HttpsServiceBinding> {
9231037
let bytes = parse_dns_json_hex_rdata(raw_rdata)?;
9241038
if bytes.len() < 3 {
@@ -1136,7 +1250,10 @@ impl ServerCertVerifier for NoCertificateVerification {
11361250
}
11371251
}
11381252

1139-
fn extract_host(headers: &HeaderMap, config: &AppConfig) -> Option<String> {
1253+
fn extract_host(headers: &HeaderMap, uri: &http::Uri, config: &AppConfig) -> Option<String> {
1254+
uri.authority()
1255+
.map(|authority| authority.host().to_ascii_lowercase())
1256+
.or_else(|| {
11401257
headers
11411258
.get(HOST)
11421259
.and_then(|value| value.to_str().ok())
@@ -1147,6 +1264,7 @@ fn extract_host(headers: &HeaderMap, config: &AppConfig) -> Option<String> {
11471264
.unwrap_or(value)
11481265
.to_ascii_lowercase()
11491266
})
1267+
})
11501268
.or_else(|| config.proxy_domains.first().cloned())
11511269
}
11521270

src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub async fn run_foreground(config_path: Option<PathBuf>, with_setup: bool) -> R
100100
let pid = std::process::id();
101101
state::write_pid(&paths, pid)?;
102102
state::mark_running(&paths, pid)?;
103-
let result = run_proxy(config, bundle).await;
103+
let result = run_proxy(config, paths.clone(), bundle).await;
104104
let _ = state::clear_pid_if_matches(&paths, pid);
105105
match &result {
106106
Ok(_) => {

0 commit comments

Comments
 (0)