From 615453d40a35e8a6030740cb242f6c361fd913b5 Mon Sep 17 00:00:00 2001 From: Threated Date: Wed, 13 May 2026 13:11:49 +0200 Subject: [PATCH 1/2] pin vault to 1.21 to fix CI --- dev/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index fa16c583..d0a6bed7 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.7" services: vault: - image: hashicorp/vault + image: hashicorp/vault:1.21 ports: - 127.0.0.1:8200:8200 environment: From 7d50b64dd06d344acf9cfc58a1353f59ae1c7961 Mon Sep 17 00:00:00 2001 From: Threated Date: Wed, 13 May 2026 11:39:00 +0200 Subject: [PATCH 2/2] drop socket task if one side gave up on the connection --- .gitignore | 1 + broker/src/serve_sockets.rs | 26 ++++++++++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 78e17712..2483001e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ dev/pki/* !dev/pki/pki **/mitmproxy/ dev/logs/ +.zed/ diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index 381755e5..51caae93 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -100,6 +100,12 @@ async fn connect_socket( } } + let tm = state.task_manager.clone(); + // Drop the task if any side that connected to it loses interest (aka drops the connection) + let _guard = DropGuard::new(move || { + // We don't care if the task expired by now + _ = tm.remove(&task_id); + }); let Some(conn) = parts.extensions.remove::() else { warn!("Failed to upgrade connection: {:#?}", parts.headers); return Err(StatusCode::UPGRADE_REQUIRED); @@ -117,8 +123,6 @@ async fn connect_socket( debug!("Socket expired because nobody connected"); return Err(StatusCode::GONE); }; - // We don't care if the task expired by now - _ = state.task_manager.remove(&task_id); tokio::spawn(async move { let (socket1, socket2) = match tokio::try_join!(conn, other_con) { Ok(sockets) => sockets, @@ -139,3 +143,21 @@ async fn connect_socket( (header::CONNECTION, HeaderValue::from_static("upgrade")) ], StatusCode::SWITCHING_PROTOCOLS).into_response()) } + +struct DropGuard { + on_drop: Option, +} + +impl DropGuard { + fn new(on_drop: F) -> Self { + Self { on_drop: Some(on_drop) } + } +} + +impl Drop for DropGuard { + fn drop(&mut self) { + if let Some(f) = self.on_drop.take() { + f(); + } + } +}