From 5e45d794cbce6044994e2e0aff356d146e0aa708 Mon Sep 17 00:00:00 2001 From: Mshehu5 Date: Fri, 1 May 2026 22:31:47 +0100 Subject: [PATCH 1/2] Document AS-aware v2 config --- payjoin-cli/README.md | 28 ++++++++++++++++++++++++++-- payjoin-cli/example.config.toml | 12 +++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/payjoin-cli/README.md b/payjoin-cli/README.md index da2e86cc1..88dbeca0a 100644 --- a/payjoin-cli/README.md +++ b/payjoin-cli/README.md @@ -75,7 +75,7 @@ rpchost = "http://localhost:18443/wallet/sender" # For v2, our config also requires a payjoin directory server and OHTTP relay [v2] -pj_directory = "https://payjo.in" +pj_directories = ["https://payjo.in"] ohttp_relays = ["https://pj.benalleng.com", "https://pj.bobspacebkk.com", "https://ohttp.achow101.com"] ``` @@ -90,7 +90,7 @@ rpchost = "http://localhost:18443/wallet/receiver" # For v2, our config also requires a payjoin directory server and OHTTP relay [v2] -pj_directory = "https://payjo.in" +pj_directories = ["https://payjo.in"] ohttp_relays = ["https://pj.benalleng.com", "https://pj.bobspacebkk.com", "https://ohttp.achow101.com"] ``` @@ -138,6 +138,30 @@ See the [example.config.toml](https://github.com/payjoin/rust-payjoin/blob/fde867b93ede767c9a50913432a73782a94ef40b/payjoin-cli/example.config.toml) for inspiration. +`payjoin-cli` also supports optional AS-aware filtering for BIP77 relay +selection: + +```toml +[v2] +pj_directories = ["https://payjo.in", "https://backup.example"] +ohttp_relays = ["https://relay-1.example", "https://relay-2.example"] + +[v2.asmap] +asmap_file = "./ip_asn.dat" +user_public_ips = ["198.51.100.10"] +user_asns = [64512] +``` + +Build `payjoin-cli` with `--features asmap` to enable the `[v2.asmap]` +configuration block. + +When enabled, directories and relays that resolve into the same ASN as the +configured user identity are excluded, mixed-ASN hostnames are rejected, and +relay ordering becomes deterministic from the receiver key embedded in the +BIP77 URI. This mitigates some AS-level correlation risks, but it does not +eliminate traffic analysis when sender and receiver already share the same +network. + ### Asynchronous Operation Sender and receiver state is saved to a database in the directory from which `payjoin-cli` is run, called `payjoin.sqlite`. Once a send or receive session is started, it may resume using the `resume` argument if prior payjoin sessions have not yet complete. diff --git a/payjoin-cli/example.config.toml b/payjoin-cli/example.config.toml index ad1864ced..31e265b5e 100644 --- a/payjoin-cli/example.config.toml +++ b/payjoin-cli/example.config.toml @@ -48,9 +48,19 @@ rpcpassword = "password" # Version 2 Configuration # [v2] -# pj_directory = "https://payjo.in" +# pj_directories = ["https://payjo.in"] # ohttp_relays = ["https://pj.benalleng.com", "https://pj.bobspacebkk.com", "https://ohttp.achow101.com", "https://example.com"] # # Optional: The HPKE keys which need to be fetched ahead of time from the pj_endpoint # # for the payjoin packets to be encrypted. # # These can now be fetched and no longer need to be configured. # ohttp_keys = "./path/to/ohttp_keys" +# +# # Optional AS-aware relay and directory filtering. +# # When enabled, payjoin-cli will reject directories/relays that share an +# # ASN with the configured user identity, and will deterministically order +# # relay selection from the remaining candidates. +# # Requires building payjoin-cli with `--features asmap`. +# [v2.asmap] +# asmap_file = "./ip_asn.dat" +# user_public_ips = ["198.51.100.10", "2001:db8::10"] +# user_asns = [64512] From 4547e8e56bc8aea91330aee1d7481b7d50497b24 Mon Sep 17 00:00:00 2001 From: Mshehu5 Date: Sun, 17 May 2026 23:14:41 +0100 Subject: [PATCH 2/2] Add trusted v2 directory selection, relay planning ASMap-based filtering to payjoin-cli. This introduces pinned relay and directory resolution, deterministic relay ordering, and the config plumbing needed to keep sender and receiver relay choice aligned. --- Cargo-minimal.lock | 7 + Cargo-recent.lock | 19 +- payjoin-cli/Cargo.toml | 6 +- payjoin-cli/src/app/config.rs | 196 ++++- payjoin-cli/src/app/mod.rs | 34 +- payjoin-cli/src/app/v2/mod.rs | 299 +++++--- payjoin-cli/src/app/v2/ohttp.rs | 209 +++--- payjoin-cli/src/app/v2/relay_selection.rs | 871 ++++++++++++++++++++++ payjoin-cli/tests/e2e.rs | 12 + 9 files changed, 1422 insertions(+), 231 deletions(-) create mode 100644 payjoin-cli/src/app/v2/relay_selection.rs diff --git a/Cargo-minimal.lock b/Cargo-minimal.lock index fba3b1dd1..93ad78c0d 100644 --- a/Cargo-minimal.lock +++ b/Cargo-minimal.lock @@ -215,6 +215,12 @@ dependencies = [ "winnow 0.7.15", ] +[[package]] +name = "asmap" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "996e3818c450a9497e2f1aff7306d1c56b4198a07880222ee0aa85b6c42ac81f" + [[package]] name = "asn1-rs" version = "0.7.1" @@ -2784,6 +2790,7 @@ name = "payjoin-cli" version = "0.2.0" dependencies = [ "anyhow", + "asmap", "async-trait", "bitcoind-async-client", "clap 4.6.0", diff --git a/Cargo-recent.lock b/Cargo-recent.lock index ceaae50e2..b2edb278e 100644 --- a/Cargo-recent.lock +++ b/Cargo-recent.lock @@ -215,6 +215,12 @@ dependencies = [ "winnow", ] +[[package]] +name = "asmap" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "996e3818c450a9497e2f1aff7306d1c56b4198a07880222ee0aa85b6c42ac81f" + [[package]] name = "asn1-rs" version = "0.7.1" @@ -1950,7 +1956,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.2", "tower-service", - "webpki-roots 1.0.2", + "webpki-roots 1.0.7", ] [[package]] @@ -2752,6 +2758,7 @@ name = "payjoin-cli" version = "0.2.0" dependencies = [ "anyhow", + "asmap", "async-trait", "bitcoind-async-client", "clap 4.5.46", @@ -3404,7 +3411,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots 1.0.2", + "webpki-roots 1.0.7", ] [[package]] @@ -4332,7 +4339,7 @@ dependencies = [ "time", "tokio", "tokio-rustls 0.26.2", - "webpki-roots 1.0.2", + "webpki-roots 1.0.7", "x509-parser 0.18.0", ] @@ -5094,14 +5101,14 @@ version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" dependencies = [ - "webpki-roots 1.0.2", + "webpki-roots 1.0.7", ] [[package]] name = "webpki-roots" -version = "1.0.2" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2" +checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" dependencies = [ "rustls-pki-types", ] diff --git a/payjoin-cli/Cargo.toml b/payjoin-cli/Cargo.toml index 35f8391ea..38cd74bc9 100644 --- a/payjoin-cli/Cargo.toml +++ b/payjoin-cli/Cargo.toml @@ -20,13 +20,15 @@ path = "src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = ["v2"] +asmap = ["dep:asmap"] native-certs = ["reqwest/rustls-tls-native-roots"] -_manual-tls = ["reqwest/rustls-tls", "payjoin/_manual-tls", "tokio-rustls"] +_manual-tls = ["reqwest/rustls-tls", "payjoin/_manual-tls"] v1 = ["payjoin/v1", "hyper", "hyper-util", "http-body-util"] v2 = ["payjoin/v2", "payjoin/io"] [dependencies] anyhow = "1.0.99" +asmap = { version = "0.1.0", optional = true } async-trait = "0.1.89" bitcoind-async-client = "0.10.2" clap = { version = "4.5.45", features = ["derive"] } @@ -48,7 +50,7 @@ serde_json = "1.0.149" tokio = { version = "1.47.1", features = ["full"] } tokio-rustls = { version = "0.26.2", features = [ "ring", -], default-features = false, optional = true } +], default-features = false } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } diff --git a/payjoin-cli/src/app/config.rs b/payjoin-cli/src/app/config.rs index f866380e0..50e63a766 100644 --- a/payjoin-cli/src/app/config.rs +++ b/payjoin-cli/src/app/config.rs @@ -1,4 +1,10 @@ +#[cfg(all(feature = "v2", feature = "asmap"))] +use std::fmt; +#[cfg(all(feature = "v2", feature = "asmap"))] +use std::net::IpAddr; use std::path::PathBuf; +#[cfg(all(feature = "v2", feature = "asmap"))] +use std::sync::Arc; use anyhow::Result; use config::builder::DefaultState; @@ -29,24 +35,112 @@ pub struct V1Config { pub pj_endpoint: Url, } +#[cfg(all(feature = "v2", feature = "asmap"))] +#[derive(Clone)] +pub struct LoadedAsmap { + map: Arc<::asmap::Asmap>, +} + +#[cfg(all(feature = "v2", feature = "asmap"))] +impl LoadedAsmap { + pub fn lookup(&self, ip: IpAddr) -> u32 { self.map.lookup(ip) } + + pub fn as_bytes(&self) -> &[u8] { self.map.as_bytes() } +} + +#[cfg(all(feature = "v2", feature = "asmap"))] +impl<'de> Deserialize<'de> for LoadedAsmap { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let path = PathBuf::deserialize(deserializer)?; + let map = ::asmap::Asmap::from_file(&path).map_err(|e| { + serde::de::Error::custom(format!( + "Failed to load v2.asmap.asmap_file {}: {e}", + path.display() + )) + })?; + Ok(LoadedAsmap { map: Arc::new(map) }) + } +} + +#[cfg(all(feature = "v2", feature = "asmap"))] +impl fmt::Debug for LoadedAsmap { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LoadedAsmap").field("bytes", &self.as_bytes().len()).finish() + } +} + +#[cfg(all(feature = "v2", feature = "asmap"))] +#[derive(Debug, Clone, Deserialize)] +pub struct AsmapConfig { + #[serde(rename = "asmap_file")] + pub asmap: LoadedAsmap, + #[serde(default)] + pub user_public_ips: Vec, + #[serde(default)] + pub user_asns: Vec, +} + +#[cfg(all(feature = "v2", feature = "asmap"))] +impl AsmapConfig { + fn validate(&self) -> Result<(), ConfigError> { + if self.user_public_ips.is_empty() && self.user_asns.is_empty() { + return Err(ConfigError::Message( + "v2.asmap requires at least one of user_public_ips or user_asns".into(), + )); + } + Ok(()) + } +} + #[cfg(feature = "v2")] #[derive(Debug, Clone, Deserialize)] pub struct V2Config { - #[serde(deserialize_with = "deserialize_ohttp_keys_from_path")] + #[serde(default, deserialize_with = "deserialize_ohttp_keys_from_path")] pub ohttp_keys: Option, pub ohttp_relays: Vec, - pub pj_directory: Url, + #[serde(rename = "pj_directories")] + pub trusted_directories: Vec, + #[cfg(feature = "asmap")] + #[serde(default)] + pub asmap: Option, +} + +#[cfg(feature = "v2")] +impl V2Config { + pub fn trusted_directories(&self) -> &[Url] { &self.trusted_directories } + + fn validate(&self) -> Result<(), ConfigError> { + if self.trusted_directories.is_empty() { + return Err(ConfigError::Message( + "At least one v2 trusted directory is required".to_owned(), + )); + } + + if self.ohttp_keys.is_some() && self.trusted_directories.len() != 1 { + return Err(ConfigError::Message( + "v2.ohttp_keys is only valid when exactly one v2.pj_directories entry is configured" + .to_owned(), + )); + } + + #[cfg(feature = "asmap")] + if let Some(asmap) = &self.asmap { + asmap.validate()?; + } + + Ok(()) + } } #[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone, Deserialize)] -#[serde(tag = "version")] +#[derive(Debug, Clone)] pub enum VersionConfig { #[cfg(feature = "v1")] - #[serde(rename = "v1")] V1(V1Config), #[cfg(feature = "v2")] - #[serde(rename = "v2")] V2(V2Config), } @@ -178,7 +272,7 @@ impl Config { Version::Two => { #[cfg(feature = "v2")] { - match built_config.get::("v2") { + match load_v2_config(&built_config) { Ok(v2) => config.version = Some(VersionConfig::V2(v2)), Err(e) => return Err(ConfigError::Message(format!( @@ -269,19 +363,20 @@ fn add_v1_defaults(config: Builder, cli: &Cli) -> Result { fn add_v2_defaults(config: Builder, cli: &Cli) -> Result { // Set default values let config = config - .set_default("v2.pj_directory", "https://payjo.in")? + .set_default("v2.pj_directories", vec!["https://payjo.in"])? .set_default("v2.ohttp_keys", None::)?; // Override config values with command line arguments if applicable let pj_directory = cli.pj_directory.as_ref().map(|s| s.as_str()); let ohttp_keys = cli.ohttp_keys.as_ref().map(|p| p.to_string_lossy().into_owned()); + let pj_directories = pj_directory.map(|dir| vec![dir]); let ohttp_relays = cli .ohttp_relays .as_ref() .map(|urls| urls.iter().map(|url| url.as_str()).collect::>()); config - .set_override_option("v2.pj_directory", pj_directory)? + .set_override_option("v2.pj_directories", pj_directories)? .set_override_option("v2.ohttp_keys", ohttp_keys)? .set_override_option("v2.ohttp_relays", ohttp_relays) } @@ -323,8 +418,8 @@ fn handle_subcommands(config: Builder, cli: &Cli) -> Result Result Result { + #[cfg(not(feature = "asmap"))] + if built_config.get_table("v2.asmap").is_ok() { + return Err(ConfigError::Message( + "This build does not include ASMap support. Recompile with --features asmap".to_owned(), + )); + } + + let v2 = built_config.get::("v2")?; + v2.validate()?; + Ok(v2) +} + #[cfg(feature = "v2")] fn deserialize_ohttp_keys_from_path<'de, D>( deserializer: D, @@ -348,17 +457,60 @@ fn deserialize_ohttp_keys_from_path<'de, D>( where D: serde::Deserializer<'de>, { - let path_str: Option = Option::deserialize(deserializer)?; - - match path_str { + let path: Option = Option::deserialize(deserializer)?; + match path { None => Ok(None), - Some(path) => std::fs::read(path) - .map_err(|e| serde::de::Error::custom(format!("Failed to read ohttp_keys file: {e}"))) - .and_then(|bytes| { - payjoin::OhttpKeys::decode(&bytes).map_err(|e| { - serde::de::Error::custom(format!("Failed to decode ohttp keys: {e}")) - }) - }) - .map(Some), + Some(path) => { + let bytes = std::fs::read(&path).map_err(|e| { + serde::de::Error::custom(format!( + "Failed to read ohttp_keys file {}: {e}", + path.display() + )) + })?; + let keys = payjoin::OhttpKeys::decode(&bytes).map_err(|e| { + serde::de::Error::custom(format!( + "Failed to decode ohttp keys from {}: {e}", + path.display() + )) + })?; + Ok(Some(keys)) + } + } +} + +#[cfg(all(test, feature = "v2"))] +mod tests { + use super::*; + + fn test_ohttp_keys() -> payjoin::OhttpKeys { + payjoin::OhttpKeys::try_from( + &[ + 1, 2, 121, 190, 102, 126, 249, 220, 187, 172, 85, 160, 98, 149, 206, 135, 11, 7, 2, + 155, 252, 219, 45, 206, 40, 217, 89, 242, 129, 91, 22, 248, 23, 152, + ][..], + ) + .expect("valid OHTTP keys") + } + + #[test] + fn rejects_singular_ohttp_keys_with_multiple_directories() { + let config = V2Config { + ohttp_keys: Some(test_ohttp_keys()), + ohttp_relays: vec![], + trusted_directories: vec![ + Url::parse("https://payjo.in").expect("valid url"), + Url::parse("https://backup.example").expect("valid url"), + ], + #[cfg(feature = "asmap")] + asmap: None, + }; + + let error = config.validate().expect_err("ambiguous OHTTP keys should fail validation"); + assert!( + error + .to_string() + .contains("v2.ohttp_keys is only valid when exactly one v2.pj_directories entry"), + "unexpected error: {error}" + ); } } diff --git a/payjoin-cli/src/app/mod.rs b/payjoin-cli/src/app/mod.rs index 48499bebe..c58ad164f 100644 --- a/payjoin-cli/src/app/mod.rs +++ b/payjoin-cli/src/app/mod.rs @@ -66,28 +66,28 @@ pub trait App: Send + Sync { } } -#[cfg(feature = "_manual-tls")] +#[cfg(feature = "v1")] fn http_agent(config: &Config) -> Result { - Ok(http_agent_builder(config.root_certificate.as_ref())?.build()?) -} - -#[cfg(not(feature = "_manual-tls"))] -fn http_agent(_config: &Config) -> Result { - Ok(reqwest::Client::builder().http1_only().build()?) + Ok(http_client_builder(config)?.build()?) } -#[cfg(feature = "_manual-tls")] -fn http_agent_builder( - root_cert_path: Option<&std::path::PathBuf>, -) -> Result { - let mut builder = reqwest::ClientBuilder::new().use_rustls_tls().http1_only(); +pub(crate) fn http_client_builder(config: &Config) -> Result { + #[cfg(feature = "_manual-tls")] + { + let mut builder = reqwest::ClientBuilder::new().use_rustls_tls().http1_only(); + if let Some(root_cert_path) = config.root_certificate.as_ref() { + let cert_der = std::fs::read(root_cert_path)?; + builder = builder + .add_root_certificate(reqwest::tls::Certificate::from_der(cert_der.as_slice())?); + } + Ok(builder) + } - if let Some(root_cert_path) = root_cert_path { - let cert_der = std::fs::read(root_cert_path)?; - builder = - builder.add_root_certificate(reqwest::tls::Certificate::from_der(cert_der.as_slice())?) + #[cfg(not(feature = "_manual-tls"))] + { + let _ = config; + Ok(reqwest::Client::builder().http1_only()) } - Ok(builder) } async fn handle_interrupt(tx: watch::Sender<()>) { diff --git a/payjoin-cli/src/app/v2/mod.rs b/payjoin-cli/src/app/v2/mod.rs index 82fb87dcc..627232381 100644 --- a/payjoin-cli/src/app/v2/mod.rs +++ b/payjoin-cli/src/app/v2/mod.rs @@ -1,5 +1,5 @@ use std::fmt; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use payjoin::bitcoin::consensus::encode::serialize_hex; @@ -22,12 +22,18 @@ use tokio::sync::watch; use super::config::Config; use super::wallet::BitcoindWallet; use super::App as AppTrait; -use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager}; -use crate::app::{handle_interrupt, http_agent}; +#[cfg(feature = "v1")] +use crate::app::http_agent; +use crate::app::v2::ohttp::{ + classify_reqwest_error, unwrap_ohttp_keys_or_else_fetch, RelayAttemptError, +}; +use crate::app::v2::relay_selection::{MessageKind, RelaySelector, RequestKind}; +use crate::app::{handle_interrupt, http_client_builder}; use crate::db::v2::{ReceiverPersister, SenderPersister, SessionId}; use crate::db::Database; mod ohttp; +pub(crate) mod relay_selection; const W_ID: usize = 12; const W_ROLE: usize = 25; @@ -40,7 +46,6 @@ pub(crate) struct App { db: Arc, wallet: BitcoindWallet, interrupt: watch::Receiver<()>, - relay_manager: Arc>, } trait StatusText { @@ -140,11 +145,10 @@ impl fmt::Display for SessionHistoryRow { impl AppTrait for App { async fn new(config: Config) -> Result { let db = Arc::new(Database::create(&config.db_path)?); - let relay_manager = Arc::new(Mutex::new(RelayManager::new())); let (interrupt_tx, interrupt_rx) = watch::channel(()); tokio::spawn(handle_interrupt(interrupt_tx)); let wallet = BitcoindWallet::new(&config.bitcoind).await?; - let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager }; + let app = Self { config, db, wallet, interrupt: interrupt_rx }; app.wallet() .network() .context("Failed to connect to bitcoind. Check config RPC connection.")?; @@ -161,6 +165,7 @@ impl AppTrait for App { .assume_checked() .check_pj_supported() .map_err(|_| anyhow!("URI does not support Payjoin"))?; + let pj_endpoint = uri.extras.endpoint(); let address = uri.address; let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?; match uri.extras.pj_param() { @@ -215,6 +220,8 @@ impl AppTrait for App { Ok(()) } PjParam::V2(pj_param) => { + let directory = relay_selection::directory_from_endpoint(&pj_endpoint)?; + relay_selection::ensure_directory_trusted(self.config.v2()?, &directory)?; let receiver_pubkey = pj_param.receiver_pubkey(); let sender_state = self.db.get_send_session_ids()?.into_iter().find_map(|session_id| { @@ -276,25 +283,35 @@ impl AppTrait for App { async fn receive_payjoin(&self, amount: Amount) -> Result<()> { let address = self.wallet().get_new_address()?; - let ohttp_keys = - unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone()) - .await? - .ohttp_keys; + let v2_config = self.config.v2()?; + let network = relay_selection::SystemNetwork::new(v2_config); + let network_selection = + relay_selection::choose_receiver_network_selection(v2_config, &network)?; + let ohttp_keys = unwrap_ohttp_keys_or_else_fetch(&self.config, &network_selection).await?; let persister = ReceiverPersister::new(self.db.clone())?; - let session = - ReceiverBuilder::new(address, self.config.v2()?.pj_directory.as_str(), ohttp_keys)? - .with_amount(amount) - .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN)) - .build() - .save(&persister)?; + let directory = network_selection.directory.url.clone(); + let session = ReceiverBuilder::new(address, directory.as_str(), ohttp_keys)? + .with_amount(amount) + .with_max_fee_rate(self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN)) + .build() + .save(&persister)?; println!("Receive session established"); let pj_uri = session.pj_uri(); + let receiver_endpoint = pj_uri.extras.endpoint(); + let relay_selector = relay_selection::relay_selector_from_network_selection( + &receiver_endpoint, + network_selection, + )?; println!("Request Payjoin by sharing this Payjoin Uri:"); println!("{pj_uri}"); - self.process_receiver_session(ReceiveSession::Initialized(session.clone()), &persister) - .await?; + self.process_receiver_session( + ReceiveSession::Initialized(session.clone()), + &persister, + &relay_selector, + ) + .await?; Ok(()) } @@ -315,9 +332,33 @@ impl AppTrait for App { let self_clone = self.clone(); let recv_persister = ReceiverPersister::from_id(self.db.clone(), session_id.clone()); match replay_receiver_event_log(&recv_persister) { - Ok((receiver_state, _)) => { + Ok((receiver_state, history)) => { + let receiver_endpoint = history.pj_uri().extras.endpoint(); + let v2_config = self_clone.config.v2()?; + let network = relay_selection::SystemNetwork::new(v2_config); + let relay_selector = match relay_selection::relay_selector_from_endpoint( + v2_config, + &receiver_endpoint, + &network, + ) { + Ok(plan) => plan, + Err(error) => { + tracing::error!( + "Failed to derive relay selector for receiver session {}: {:?}", + session_id, + error + ); + continue; + } + }; tasks.push(tokio::spawn(async move { - self_clone.process_receiver_session(receiver_state, &recv_persister).await + self_clone + .process_receiver_session( + receiver_state, + &recv_persister, + &relay_selector, + ) + .await })); } Err(e) => { @@ -529,10 +570,26 @@ impl App { persister: &SenderPersister, ) -> Result<()> { match session { - SendSession::WithReplyKey(context) => - self.post_original_proposal(context, persister).await?, - SendSession::PollingForProposal(context) => - self.get_proposed_payjoin_psbt(context, persister).await?, + SendSession::WithReplyKey(context) => { + let v2_config = self.config.v2()?; + let network = relay_selection::SystemNetwork::new(v2_config); + let relay_selector = relay_selection::relay_selector_from_endpoint( + v2_config, + &context.endpoint(), + &network, + )?; + self.post_original_proposal(context, persister, &relay_selector).await? + } + SendSession::PollingForProposal(context) => { + let v2_config = self.config.v2()?; + let network = relay_selection::SystemNetwork::new(v2_config); + let relay_selector = relay_selection::relay_selector_from_endpoint( + v2_config, + &context.endpoint(), + &network, + )?; + self.get_proposed_payjoin_psbt(context, persister, &relay_selector).await? + } SendSession::Closed(SenderSessionOutcome::Success(proposal)) => { self.process_pj_response(proposal)?; return Ok(()); @@ -553,27 +610,36 @@ impl App { &self, sender: Sender, persister: &SenderPersister, + relay_selector: &RelaySelector, ) -> Result<()> { - let (req, ctx) = sender.create_v2_post_request( - self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?.as_str(), - )?; - let response = self.post_request(req).await?; + let (response, ctx) = self + .send_ohttp_request_with_relay_selector( + relay_selector, + RequestKind::Post(MessageKind::Original), + |relay| sender.create_v2_post_request(relay.as_str()).map_err(Into::into), + ) + .await?; let sender = sender.process_response(&response.bytes().await?, ctx).save(persister)?; println!("Posted Original PSBT..."); - self.get_proposed_payjoin_psbt(sender, persister).await + self.get_proposed_payjoin_psbt(sender, persister, relay_selector).await } async fn get_proposed_payjoin_psbt( &self, sender: Sender, persister: &SenderPersister, + relay_selector: &RelaySelector, ) -> Result<()> { - let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(&sender.endpoint())).await?; let mut session = sender.clone(); // Long poll until we get a response loop { - let (req, ctx) = session.create_poll_request(ohttp_relay.as_str())?; - let response = self.post_request(req).await?; + let (response, ctx) = self + .send_ohttp_request_with_relay_selector( + relay_selector, + RequestKind::Poll(MessageKind::Proposal), + |relay| session.create_poll_request(relay.as_str()).map_err(Into::into), + ) + .await?; let res = session.process_response(&response.bytes().await?, ctx).save(persister); match res { Ok(OptionalTransitionOutcome::Progress(psbt)) => { @@ -599,15 +665,18 @@ impl App { &self, session: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result> { - let ohttp_relay = - self.unwrap_relay_or_else_fetch(Some(&session.pj_uri().extras.endpoint())).await?; - let mut session = session; loop { - let (req, context) = session.create_poll_request(ohttp_relay.as_str())?; + let (ohttp_response, context) = self + .send_ohttp_request_with_relay_selector( + relay_selector, + RequestKind::Poll(MessageKind::Original), + |relay| session.create_poll_request(relay.as_str()).map_err(Into::into), + ) + .await?; println!("Polling receive request..."); - let ohttp_response = self.post_request(req).await?; let state_transition = session .process_response(ohttp_response.bytes().await?.to_vec().as_slice(), context) .save(persister); @@ -629,31 +698,32 @@ impl App { &self, session: ReceiveSession, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let res = { match session { ReceiveSession::Initialized(proposal) => - self.read_from_directory(proposal, persister).await, + self.read_from_directory(proposal, persister, relay_selector).await, ReceiveSession::UncheckedOriginalPayload(proposal) => - self.check_proposal(proposal, persister).await, + self.check_proposal(proposal, persister, relay_selector).await, ReceiveSession::MaybeInputsOwned(proposal) => - self.check_inputs_not_owned(proposal, persister).await, + self.check_inputs_not_owned(proposal, persister, relay_selector).await, ReceiveSession::MaybeInputsSeen(proposal) => - self.check_no_inputs_seen_before(proposal, persister).await, + self.check_no_inputs_seen_before(proposal, persister, relay_selector).await, ReceiveSession::OutputsUnknown(proposal) => - self.identify_receiver_outputs(proposal, persister).await, + self.identify_receiver_outputs(proposal, persister, relay_selector).await, ReceiveSession::WantsOutputs(proposal) => - self.commit_outputs(proposal, persister).await, + self.commit_outputs(proposal, persister, relay_selector).await, ReceiveSession::WantsInputs(proposal) => - self.contribute_inputs(proposal, persister).await, + self.contribute_inputs(proposal, persister, relay_selector).await, ReceiveSession::WantsFeeRange(proposal) => - self.apply_fee_range(proposal, persister).await, + self.apply_fee_range(proposal, persister, relay_selector).await, ReceiveSession::ProvisionalProposal(proposal) => - self.finalize_proposal(proposal, persister).await, + self.finalize_proposal(proposal, persister, relay_selector).await, ReceiveSession::PayjoinProposal(proposal) => - self.send_payjoin_proposal(proposal, persister).await, + self.send_payjoin_proposal(proposal, persister, relay_selector).await, ReceiveSession::HasReplyableError(error) => - self.handle_error(error, persister).await, + self.handle_error(error, persister, relay_selector).await, ReceiveSession::Monitor(proposal) => self.monitor_payjoin_proposal(proposal, persister).await, ReceiveSession::Closed(_) => return Err(anyhow!("Session closed")), @@ -667,22 +737,24 @@ impl App { &self, session: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let mut interrupt = self.interrupt.clone(); let receiver = tokio::select! { - res = self.long_poll_fallback(session, persister) => res, + res = self.long_poll_fallback(session, persister, relay_selector) => res, _ = interrupt.changed() => { println!("Interrupted. Call the `resume` command to resume all sessions."); return Err(anyhow!("Interrupted")); } }?; - self.check_proposal(receiver, persister).await + self.check_proposal(receiver, persister, relay_selector).await } async fn check_proposal( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -695,13 +767,14 @@ impl App { println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:"); println!("{}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast())); - self.check_inputs_not_owned(proposal, persister).await + self.check_inputs_not_owned(proposal, persister, relay_selector).await } async fn check_inputs_not_owned( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -711,26 +784,28 @@ impl App { .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error())) }) .save(persister)?; - self.check_no_inputs_seen_before(proposal, persister).await + self.check_no_inputs_seen_before(proposal, persister, relay_selector).await } async fn check_no_inputs_seen_before( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let proposal = proposal .check_no_inputs_seen_before(&mut |input| { Ok(self.db.insert_input_seen_before(*input)?) }) .save(persister)?; - self.identify_receiver_outputs(proposal, persister).await + self.identify_receiver_outputs(proposal, persister, relay_selector).await } async fn identify_receiver_outputs( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -740,22 +815,24 @@ impl App { .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error())) }) .save(persister)?; - self.commit_outputs(proposal, persister).await + self.commit_outputs(proposal, persister, relay_selector).await } async fn commit_outputs( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let proposal = proposal.commit_outputs().save(persister)?; - self.contribute_inputs(proposal, persister).await + self.contribute_inputs(proposal, persister, relay_selector).await } async fn contribute_inputs( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let wallet = self.wallet(); let candidate_inputs = wallet.list_unspent()?; @@ -769,22 +846,24 @@ impl App { let selected_input = proposal.try_preserving_privacy(candidate_inputs)?; let proposal = proposal.contribute_inputs(vec![selected_input])?.commit_inputs().save(persister)?; - self.apply_fee_range(proposal, persister).await + self.apply_fee_range(proposal, persister, relay_selector).await } async fn apply_fee_range( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let proposal = proposal.apply_fee_range(None, self.config.max_fee_rate).save(persister)?; - self.finalize_proposal(proposal, persister).await + self.finalize_proposal(proposal, persister, relay_selector).await } async fn finalize_proposal( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { let wallet = self.wallet(); let proposal = proposal @@ -794,18 +873,26 @@ impl App { .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error())) }) .save(persister)?; - self.send_payjoin_proposal(proposal, persister).await + self.send_payjoin_proposal(proposal, persister, relay_selector).await } async fn send_payjoin_proposal( &self, proposal: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { - let (req, ohttp_ctx) = proposal - .create_post_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str()) - .map_err(|e| anyhow!("v2 req extraction failed {}", e))?; - let res = self.post_request(req).await?; + let (res, ohttp_ctx) = self + .send_ohttp_request_with_relay_selector( + relay_selector, + RequestKind::Post(MessageKind::Proposal), + |relay| { + proposal + .create_post_request(relay.as_str()) + .map_err(|e| anyhow!("v2 req extraction failed {}", e)) + }, + ) + .await?; let payjoin_psbt = proposal.psbt().clone(); let session = proposal.process_response(&res.bytes().await?, ohttp_ctx).save(persister)?; println!( @@ -865,21 +952,41 @@ impl App { } } - async fn unwrap_relay_or_else_fetch( + async fn send_ohttp_request_with_relay_selector( &self, - directory: Option, - ) -> Result { - let directory = directory.map(|url| url.into_url()).transpose()?; - let selected_relay = - self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay(); - let ohttp_relay = match selected_relay { - Some(relay) => relay, - None => - unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone()) - .await? - .relay_url, + relay_selector: &RelaySelector, + request_kind: RequestKind, + mut build_request: F, + ) -> Result<(reqwest::Response, Ctx)> + where + F: FnMut(&payjoin::Url) -> Result<(payjoin::Request, Ctx)>, + { + let request_label = match request_kind { + RequestKind::Post(_) => "POST", + RequestKind::Poll(_) => "POLL", }; - Ok(ohttp_relay) + let mut relays = + relay_selector.select_relays_for_request(request_kind)?.into_iter().peekable(); + + while let Some(relay) = relays.next() { + let is_last_relay = relays.peek().is_none(); + let (req, ctx) = build_request(&relay.url)?; + match self.post_request(req, &relay).await { + Ok(response) => return Ok((response, ctx)), + Err(RelayAttemptError::Retryable(error)) => { + tracing::debug!( + "Retryable OHTTP {request_label} failure via relay {}: {error:?}", + relay.url + ); + if is_last_relay { + return Err(error); + } + } + Err(RelayAttemptError::Terminal(error)) => return Err(error), + } + } + + Err(anyhow!("No valid relays available")) } /// Handle error by attempting to send an error response over the directory @@ -887,14 +994,16 @@ impl App { &self, session: Receiver, persister: &ReceiverPersister, + relay_selector: &RelaySelector, ) -> Result<()> { - let (err_req, err_ctx) = session - .create_error_request(self.unwrap_relay_or_else_fetch(None::<&str>).await?.as_str())?; - - let err_response = match self.post_request(err_req).await { - Ok(response) => response, - Err(e) => return Err(anyhow!("Failed to post error request: {}", e)), - }; + let (err_response, err_ctx) = self + .send_ohttp_request_with_relay_selector( + relay_selector, + RequestKind::Post(MessageKind::Proposal), + |relay| session.create_error_request(relay.as_str()).map_err(Into::into), + ) + .await + .map_err(|e| anyhow!("Failed to post error request: {e}"))?; let err_bytes = match err_response.bytes().await { Ok(bytes) => bytes, @@ -908,14 +1017,30 @@ impl App { Ok(()) } - async fn post_request(&self, req: payjoin::Request) -> Result { - let http = http_agent(&self.config)?; - http.post(req.url) + async fn post_request( + &self, + req: payjoin::Request, + relay: &relay_selection::ResolvedUrl, + ) -> std::result::Result { + let mut builder = http_client_builder(&self.config).map_err(|err| { + RelayAttemptError::Terminal(anyhow!("Failed to build HTTP client: {err}")) + })?; + if let Some(domain) = relay.domain() { + builder = builder.resolve_to_addrs(domain, &relay.socket_addrs); + } + let http = builder.build().map_err(|err| { + RelayAttemptError::Terminal(anyhow!("Failed to build HTTP client: {err}")) + })?; + let response = http + .post(req.url) .header("Content-Type", req.content_type) .body(req.body) .send() .await - .and_then(|r| r.error_for_status()) - .context("HTTP request failed") + .map_err(|err| classify_reqwest_error(err, "HTTP request failed"))?; + + response + .error_for_status() + .map_err(|err| classify_reqwest_error(err, "HTTP request failed")) } } diff --git a/payjoin-cli/src/app/v2/ohttp.rs b/payjoin-cli/src/app/v2/ohttp.rs index e034d99fd..4397fa370 100644 --- a/payjoin-cli/src/app/v2/ohttp.rs +++ b/payjoin-cli/src/app/v2/ohttp.rs @@ -1,126 +1,141 @@ //! OHTTP relay selection and key bootstrapping for the payjoin-cli. //! -//! [`RelayManager`] tracks the currently selected relay and any relays that -//! have failed, excluding them from future selections for the lifetime of -//! the [`RelayManager`]. -//! -//! `fetch_ohttp_keys` selects a relay at random from the configured list, -//! excluding relays that [`RelayManager`] has marked as failed, -//! to avoid a fixed contact pattern at the network layer. -use std::sync::{Arc, Mutex}; +//! Bootstrap key fetching uses temporary relay failover. Protocol requests use +//! stateless relay selection from the receiver network selection. +use std::time::Duration; use anyhow::{anyhow, Result}; -use payjoin::Url; +use reqwest::header::ACCEPT; +use reqwest::Proxy; +use super::relay_selection::{ReceiverNetworkSelection, ResolvedUrl}; use super::Config; - -#[derive(Debug, Clone)] -pub struct RelayManager { - selected_relay: Option, - failed_relays: Vec, -} - -impl RelayManager { - pub fn new() -> Self { RelayManager { selected_relay: None, failed_relays: Vec::new() } } - - pub fn set_selected_relay(&mut self, relay: Url) { self.selected_relay = Some(relay); } - - pub fn get_selected_relay(&self) -> Option { self.selected_relay.clone() } - - pub fn add_failed_relay(&mut self, relay: Url) { self.failed_relays.push(relay); } - - pub fn get_failed_relays(&self) -> Vec { self.failed_relays.clone() } +use crate::app::http_client_builder; + +#[derive(Debug)] +pub(crate) enum RelayAttemptError { + /// Network-shaped failures can try the next relay candidate. + Retryable(anyhow::Error), + /// Protocol/configuration-shaped failures should stop immediately. + Terminal(anyhow::Error), } -pub(crate) struct ValidatedOhttpKeys { - pub(crate) ohttp_keys: payjoin::OhttpKeys, - pub(crate) relay_url: Url, +/// Decide whether a reqwest failure should fail over to another relay. +pub(crate) fn classify_reqwest_error( + err: reqwest::Error, + context: &'static str, +) -> RelayAttemptError { + let error = anyhow!("{context}: {err}"); + if err.is_timeout() || err.is_connect() || err.is_request() { + RelayAttemptError::Retryable(error) + } else { + RelayAttemptError::Terminal(error) + } } pub(crate) async fn unwrap_ohttp_keys_or_else_fetch( config: &Config, - directory: Option, - relay_manager: Arc>, -) -> Result { + network_selection: &ReceiverNetworkSelection, +) -> Result { if let Some(ohttp_keys) = config.v2()?.ohttp_keys.clone() { println!("Using OHTTP Keys from config"); - let validated = fetch_ohttp_keys(config, directory, relay_manager).await?; - Ok(ValidatedOhttpKeys { ohttp_keys, relay_url: validated.relay_url }) + Ok(ohttp_keys) } else { println!("Bootstrapping private network transport over Oblivious HTTP"); - let fetched_keys = fetch_ohttp_keys(config, directory, relay_manager).await?; - - Ok(fetched_keys) + fetch_ohttp_keys(config, network_selection).await } } +// Fetch directory OHTTP keys through the already chosen receiver network selection. +// This happens before the receiver key exists, so it cannot use RelaySelector. async fn fetch_ohttp_keys( config: &Config, - directory: Option, - relay_manager: Arc>, -) -> Result { - use payjoin::bitcoin::secp256k1::rand::prelude::SliceRandom; - let payjoin_directory = directory.unwrap_or(config.v2()?.pj_directory.clone()); - let relays = config.v2()?.ohttp_relays.clone(); - - loop { - let failed_relays = - relay_manager.lock().expect("Lock should not be poisoned").get_failed_relays(); - - let remaining_relays: Vec<_> = - relays.iter().filter(|r| !failed_relays.contains(r)).cloned().collect(); + network_selection: &ReceiverNetworkSelection, +) -> Result { + if network_selection.relays.is_empty() { + return Err(anyhow!( + "No valid relays available for {}", + network_selection.directory.url.as_str() + )); + } - if remaining_relays.is_empty() { - return Err(anyhow!("No valid relays available")); + let last_relay_index = network_selection.relays.len() - 1; + for (index, relay) in network_selection.relays.iter().enumerate() { + match fetch_directory_ohttp_keys_via_resolved_relay_url( + config, + &relay.resolved, + &network_selection.directory, + ) + .await + { + Ok(keys) => return Ok(keys), + Err(RelayAttemptError::Retryable(error)) => { + tracing::debug!( + "Failed to fetch OHTTP keys via relay {}: {error:?}", + relay.resolved.url + ); + if index == last_relay_index { + return Err(error); + } + } + Err(RelayAttemptError::Terminal(error)) => return Err(error), } + } - let selected_relay = - match remaining_relays.choose(&mut payjoin::bitcoin::key::rand::thread_rng()) { - Some(relay) => relay.clone(), - None => return Err(anyhow!("Failed to select from remaining relays")), - }; + unreachable!( + "empty relay selections return before the loop and successful key fetches return inside it" + ) +} - relay_manager - .lock() - .expect("Lock should not be poisoned") - .set_selected_relay(selected_relay.clone()); +// This mirrors payjoin::io::fetch_ohttp_keys, but keeps the CLI-specific +// pieces: resolved socket addresses, configured TLS roots, and relay failover +// error classification. +// Build a proxied request through one resolved relay. The relay address is +// resolved to the DNS result checked by relay_selection. +async fn fetch_directory_ohttp_keys_via_resolved_relay_url( + config: &Config, + relay: &ResolvedUrl, + directory: &ResolvedUrl, +) -> std::result::Result { + let proxy = Proxy::all(relay.url.as_str()).map_err(|err| { + RelayAttemptError::Terminal(anyhow!("Failed to configure OHTTP relay proxy: {err}")) + })?; + let mut builder = http_client_builder(config) + .map_err(|err| RelayAttemptError::Terminal(anyhow!("Failed to build HTTP client: {err}")))? + .proxy(proxy); + + if let Some(domain) = relay.domain() { + builder = builder.resolve_to_addrs(domain, &relay.socket_addrs); + } - let ohttp_keys = { - #[cfg(feature = "_manual-tls")] - { - if let Some(cert_path) = config.root_certificate.as_ref() { - let cert_der = std::fs::read(cert_path)?; - payjoin::io::fetch_ohttp_keys_with_cert( - selected_relay.as_str(), - payjoin_directory.as_str(), - &cert_der, - ) - .await - } else { - payjoin::io::fetch_ohttp_keys( - selected_relay.as_str(), - payjoin_directory.as_str(), - ) - .await - } - } - #[cfg(not(feature = "_manual-tls"))] - payjoin::io::fetch_ohttp_keys(selected_relay.as_str(), payjoin_directory.as_str()).await - }; + if let Some(directory_domain) = directory.domain() { + builder = builder.resolve_to_addrs(directory_domain, &directory.socket_addrs); + } - match ohttp_keys { - Ok(keys) => - return Ok(ValidatedOhttpKeys { ohttp_keys: keys, relay_url: selected_relay }), - Err(payjoin::io::Error::UnexpectedStatusCode(e)) => { - return Err(payjoin::io::Error::UnexpectedStatusCode(e).into()); - } - Err(e) => { - tracing::debug!("Failed to connect to relay: {selected_relay}, {e:?}"); - relay_manager - .lock() - .expect("Lock should not be poisoned") - .add_failed_relay(selected_relay); - } - } + let client = builder.build().map_err(|err| { + RelayAttemptError::Terminal(anyhow!("Failed to build HTTP client: {err}")) + })?; + let ohttp_keys_url = directory.url.join("/.well-known/ohttp-gateway").map_err(|err| { + RelayAttemptError::Terminal(anyhow!("Failed to construct OHTTP key URL: {err}")) + })?; + let response = client + .get(ohttp_keys_url.as_str()) + .timeout(Duration::from_secs(10)) + .header(ACCEPT, "application/ohttp-keys") + .send() + .await + .map_err(|err| classify_reqwest_error(err, "Failed to fetch OHTTP keys"))?; + + if !response.status().is_success() { + return Err(RelayAttemptError::Terminal(anyhow!( + "Unexpected OHTTP key status code {}", + response.status() + ))); } + + let body = response.bytes().await.map_err(|err| { + RelayAttemptError::Terminal(anyhow!("Failed to read OHTTP key response body: {err}")) + })?; + payjoin::OhttpKeys::decode(&body) + .map_err(|err| RelayAttemptError::Terminal(anyhow!("Failed to decode OHTTP keys: {err}"))) } diff --git a/payjoin-cli/src/app/v2/relay_selection.rs b/payjoin-cli/src/app/v2/relay_selection.rs new file mode 100644 index 000000000..b8f2a3a09 --- /dev/null +++ b/payjoin-cli/src/app/v2/relay_selection.rs @@ -0,0 +1,871 @@ +//! Stateless OHTTP relay selection for BIP77 sessions. +//! +//! This module has two jobs: +//! - choose the receiver's directory and usable relay set before the receiver key exists +//! - use the receiver key, request kind, and time window to choose relays without +//! storing a current relay index or failed-relay state +//! +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use anyhow::{anyhow, bail, Context, Result}; +use payjoin::bitcoin::hashes::{sha256, Hash, HashEngine}; +use payjoin::bitcoin::key::rand::seq::SliceRandom; +use payjoin::bitcoin::key::rand::{thread_rng, Rng}; +use payjoin::{HpkePublicKey, PjParam, Url}; + +use crate::app::config::V2Config; +#[cfg(feature = "asmap")] +use crate::app::config::{AsmapConfig, LoadedAsmap}; +const RELAY_SELECTION_TAG: &[u8] = b"payjoin-cli-stateless-relay-selection-v1"; + +#[cfg(any(feature = "asmap", test))] +type Asn = u32; + +pub(crate) const WINDOW_SECS: u64 = 30; +const CLOCK_SKEW_WINDOWS: i64 = 2; +const POST_RESERVED_COUNT: usize = 3; + +// CLI/app primitive: this includes resolved network addresses, so it is tied to +// how payjoin-cli performs DNS and HTTP requests. +/// Directory and relay set chosen before the receiver key is available. +/// +/// The receiver needs this during session creation: it must choose a directory +/// and fetch OHTTP keys before the receiver pubkey can be read from the endpoint. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ReceiverNetworkSelection { + pub(crate) directory: ResolvedUrl, + pub(crate) relays: Vec, +} + +/// Chooses which OHTTP relays to try for a request. +/// +/// It stores the usable relay candidates and the receiver public key. For each +/// POST or POLL request, it combines those values with the current time window +/// to compute a fresh relay order. It does not remember the last relay used or +/// keep a cursor into the previous ordering. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct RelaySelector { + relays: Vec, + receiver_pubkey: HpkePublicKey, +} + +// Candidate for the main crate: every wallet should use the same protocol +// request labels so relay selection is derived consistently. +/// Whether the current OHTTP request is posting data or polling for data. +/// +/// POST and POLL intentionally derive different relay orderings so that a +/// sender POST is less likely to interrupt a receiver POLL on the same AS. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum RequestKind { + Post(MessageKind), + Poll(MessageKind), +} + +impl RequestKind { + fn method_tag(self) -> &'static [u8] { + match self { + RequestKind::Post(_) => b"post", + RequestKind::Poll(_) => b"poll", + } + } + + fn message(self) -> MessageKind { + match self { + RequestKind::Post(message) | RequestKind::Poll(message) => message, + } + } +} + +// Candidate for the main crate: this is the issue's "message 0 or 1" domain +// separator, mapped onto BIP77's two mailbox/message directions. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum MessageKind { + /// Sender's original PSBT message. + Original, + /// Receiver's payjoin proposal or replyable error message. + Proposal, +} + +impl MessageKind { + fn tag(self) -> &'static [u8] { + match self { + MessageKind::Original => b"original", + MessageKind::Proposal => b"proposal", + } + } +} + +// Candidate for the main crate: the main crate should expose construction from +// a caller-provided unix timestamp, while the CLI can call SystemTime::now(). +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct TimeWindow(u64); + +impl TimeWindow { + /// Current 30 second selection window with a receiver-key-derived offset. + /// + /// The offset keeps all sessions from switching relay preferences at the + /// exact same wall-clock boundary. + pub(crate) fn current(receiver_pubkey: &HpkePublicKey) -> Self { + let unix_seconds = + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(); + Self::from_unix_seconds(unix_seconds, receiver_pubkey) + } + + fn from_unix_seconds(unix_seconds: u64, receiver_pubkey: &HpkePublicKey) -> Self { + let offset = receiver_key_offset(receiver_pubkey); + Self((unix_seconds + offset) / WINDOW_SECS) + } + + fn saturating_offset(self, offset: i64) -> Self { + if offset.is_negative() { + Self(self.0.saturating_sub(offset.unsigned_abs())) + } else { + Self(self.0.saturating_add(offset as u64)) + } + } +} + +// CLI/app primitive: this carries the concrete socket addresses that reqwest +// should use for a URL after DNS and optional ASMap checks. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ResolvedUrl { + pub(crate) url: Url, + /// Socket addresses that were resolved and, when ASMap is enabled, ASN-checked. + pub(crate) socket_addrs: Vec, +} + +impl ResolvedUrl { + pub(crate) fn new(url: Url, socket_addrs: Vec) -> Self { + Self { url, socket_addrs } + } + + pub(crate) fn domain(&self) -> Option<&str> { self.url.domain() } +} + +// Possible main-crate shape: `RelayCandidate { uri, bucket }`. The CLI version +// carries `ResolvedUrl` because it also owns concrete network routing. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct RelayCandidate { + pub(crate) resolved: ResolvedUrl, + /// Selection bucket. With ASMap this is the ASN; otherwise it is the URL. + bucket: RelayBucket, +} + +impl RelayCandidate { + fn new(resolved: ResolvedUrl, bucket: RelayBucket) -> Self { Self { resolved, bucket } } +} + +// Candidate for the main crate: the selector only needs stable bucket IDs. ASNs +// are one bucket source, but a future library type could also allow opaque IDs. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +enum RelayBucket { + #[cfg(feature = "asmap")] + Asn(Asn), + /// Fallback bucket when ASMap is unavailable or not configured. + Url(String), +} + +impl RelayBucket { + fn score_payload(&self) -> Vec { + match self { + #[cfg(feature = "asmap")] + RelayBucket::Asn(asn) => asn.to_be_bytes().to_vec(), + RelayBucket::Url(url) => url.as_bytes().to_vec(), + } + } +} + +impl RelaySelector { + /// Return the relay order to try for one POST or POLL request. + /// + /// The ordering is recomputed from the receiver key and current time, so no + /// relay-selection progress needs to be stored between requests. + pub(crate) fn select_relays_for_request( + &self, + request_kind: RequestKind, + ) -> Result> { + select_relay_candidates( + &self.relays, + request_kind, + &self.receiver_pubkey, + TimeWindow::current(&self.receiver_pubkey), + ) + .map(|candidates| candidates.into_iter().map(|candidate| candidate.resolved).collect()) + } +} + +/// Convert the receiver network selection into request-time relay selection after +/// the receiver endpoint exists and contains the receiver pubkey. +pub(crate) fn relay_selector_from_network_selection( + endpoint: &str, + network_selection: ReceiverNetworkSelection, +) -> Result { + let endpoint_directory = directory_from_endpoint(endpoint)?; + if normalized_url(&endpoint_directory) != normalized_url(&network_selection.directory.url) { + bail!( + "Receiver endpoint directory {} does not match selected directory {}", + endpoint_directory.as_str(), + network_selection.directory.url.as_str() + ); + } + Ok(RelaySelector { + relays: network_selection.relays, + receiver_pubkey: receiver_pubkey_from_endpoint(endpoint)?, + }) +} + +fn select_relay_candidates( + candidates: &[RelayCandidate], + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, +) -> Result> { + if candidates.is_empty() { + bail!("No valid relays available"); + } + + match request_kind { + RequestKind::Post(_) => + Ok(select_post_candidates(candidates, request_kind, receiver_pubkey, window)), + RequestKind::Poll(_) => + select_poll_candidates(candidates, request_kind, receiver_pubkey, window), + } +} + +// Candidate for the main crate: pure selection policy. It only depends on +// candidates, receiver key, request label, and time window. +// POSTs are rare, so reserve up to POST_RESERVED_COUNT preferred AS buckets and +// try one relay from each bucket. +fn select_post_candidates( + candidates: &[RelayCandidate], + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, +) -> Vec { + let reserved = ranked_buckets(candidates, request_kind, receiver_pubkey, window) + .into_iter() + .take(POST_RESERVED_COUNT) + .collect::>(); + let mut selected_buckets = BTreeSet::new(); + + ordered_candidates(candidates, request_kind, receiver_pubkey, window) + .into_iter() + .filter(|candidate| { + reserved.contains(&candidate.bucket) + && selected_buckets.insert(candidate.bucket.clone()) + }) + .collect() +} + +// Candidate for the main crate: pure selection policy. This is the central +// stateless POST/POLL separation described in the issue. +// POLLs avoid the AS buckets that a matching POST would use around the current +// time window. This is the main traffic-analysis mitigation. +fn select_poll_candidates( + candidates: &[RelayCandidate], + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, +) -> Result> { + let reserved = + post_reserved_buckets(candidates, request_kind.message(), receiver_pubkey, window); + let poll_candidates = candidates + .iter() + .filter(|candidate| !reserved.contains(&candidate.bucket)) + .cloned() + .collect::>(); + + let ordered_poll_candidates = + ordered_candidates(&poll_candidates, request_kind, receiver_pubkey, window); + if !ordered_poll_candidates.is_empty() { + return Ok(ordered_poll_candidates); + } + + tracing::warn!( + "Not enough relay buckets to keep POLL separate from POST, POLL may reuse POST-reserved buckets" + ); + Ok(ordered_candidates(candidates, request_kind, receiver_pubkey, window)) +} + +// Candidate for the main crate: clock-skew tolerant POST-reservation logic. +// Compute POST-reserved buckets for nearby windows so small sender/receiver +// clock drift does not make POST and POLL choose the same AS. +fn post_reserved_buckets( + candidates: &[RelayCandidate], + message: MessageKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, +) -> BTreeSet { + let mut reserved = BTreeSet::new(); + for offset in -CLOCK_SKEW_WINDOWS..=CLOCK_SKEW_WINDOWS { + let adjacent_window = window.saturating_offset(offset); + reserved.extend( + ranked_buckets( + candidates, + RequestKind::Post(message), + receiver_pubkey, + adjacent_window, + ) + .into_iter() + .take(POST_RESERVED_COUNT), + ); + } + reserved +} + +// Candidate for the main crate: pure deterministic ordering. It should operate +// on generic bucket IDs, not on DNS or ASMap directly. +// Order buckets by hash, order relays within each bucket by hash, then +// round-robin across buckets so one AS with many relays is not over-weighted. +fn ordered_candidates( + candidates: &[RelayCandidate], + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, +) -> Vec { + // Group relays by privacy bucket: ASN with ASMap, URL fallback otherwise. + let mut buckets = BTreeMap::>::new(); + for candidate in candidates { + buckets.entry(candidate.bucket.clone()).or_default().push(candidate.clone()); + } + + // Sort relays within each bucket, then sort the buckets themselves. + let mut bucket_entries = buckets + .into_iter() + .map(|(bucket, mut relays)| { + relays.sort_by_key(|candidate| { + relay_score(request_kind, receiver_pubkey, window, candidate) + }); + let bucket_score = bucket_score(request_kind, receiver_pubkey, window, &bucket); + (bucket_score, VecDeque::from(relays)) + }) + .collect::>(); + bucket_entries.sort_by_key(|(score, _)| *score); + + // Round-robin across buckets so one AS with many relays cannot dominate. + let mut ordered = vec![]; + loop { + let mut emitted = false; + for (_, bucket) in bucket_entries.iter_mut() { + if let Some(candidate) = bucket.pop_front() { + ordered.push(candidate); + emitted = true; + } + } + if !emitted { + break; + } + } + ordered +} + +fn ranked_buckets( + candidates: &[RelayCandidate], + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, +) -> Vec { + let mut buckets = candidates + .iter() + .map(|candidate| candidate.bucket.clone()) + .collect::>() + .into_iter() + .collect::>(); + buckets.sort_by_key(|bucket| bucket_score(request_kind, receiver_pubkey, window, bucket)); + buckets +} + +fn receiver_key_offset(receiver_pubkey: &HpkePublicKey) -> u64 { + let hash = sha256::Hash::hash(&receiver_pubkey.to_compressed_bytes()); + let mut bytes = [0u8; 8]; + bytes.copy_from_slice(&Hash::as_byte_array(&hash)[..8]); + u64::from_be_bytes(bytes) % WINDOW_SECS +} + +// Candidate for the main crate: pure hash scoring. +// Score an AS bucket or URL bucket for deterministic pseudo-random ordering. +fn bucket_score( + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, + bucket: &RelayBucket, +) -> [u8; 32] { + let payload = bucket.score_payload(); + selection_hash(request_kind, receiver_pubkey, window, b"bucket", &payload) +} + +// Candidate for the main crate: pure hash scoring. +// Score one relay within its bucket. +fn relay_score( + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, + candidate: &RelayCandidate, +) -> [u8; 32] { + selection_hash( + request_kind, + receiver_pubkey, + window, + b"relay", + candidate.resolved.url.as_str().as_bytes(), + ) +} + +// Candidate for the main crate: this is the core deterministic derivation. The +// tag may need a final protocol/library name before being stabilized. +// Domain-separated hash used for deterministic relay selection. +fn selection_hash( + request_kind: RequestKind, + receiver_pubkey: &HpkePublicKey, + window: TimeWindow, + label: &[u8], + payload: &[u8], +) -> [u8; 32] { + let mut engine = sha256::Hash::engine(); + engine.input(RELAY_SELECTION_TAG); + engine.input(request_kind.method_tag()); + engine.input(request_kind.message().tag()); + engine.input(&receiver_pubkey.to_compressed_bytes()); + engine.input(&window.0.to_be_bytes()); + engine.input(label); + engine.input(payload); + *sha256::Hash::from_engine(engine).as_byte_array() +} + +// Relay utils +// +// These helpers prepare clean relay/directory inputs for the selector. They are +// intentionally kept below the pure ordering code because they are CLI/app +// concerns: endpoint parsing, trusted-directory checks, DNS resolution, ASMap +// lookup, and reqwest address resolution. + +// CLI/app primitive: this joins DNS results with optional ASMap classification. +// It should not move wholesale to the main crate because DNS and ASMap loading +// are wallet/application responsibilities. +#[derive(Debug, Clone, PartialEq, Eq)] +struct ResolvedServer { + /// URL plus resolved socket addresses. + resolved: ResolvedUrl, + /// ASN is present only when the server was resolved through ASMap. + #[cfg(feature = "asmap")] + asn: Option, +} + +// CLI/app abstraction: keeps DNS and ASMap lookup testable without putting IO +// into the selection algorithm itself. +pub(crate) trait NetworkView { + /// Resolve hostnames behind a trait so tests can use deterministic DNS data. + fn resolve_host(&self, host: &str, port: u16) -> Result>; + #[cfg(feature = "asmap")] + /// Look up the ASN for an IP. Returns None when ASMap has no mapping. + fn lookup_asn(&self, ip: IpAddr) -> Result>; +} + +#[derive(Debug, Clone)] +pub(crate) struct SystemNetwork { + #[cfg(feature = "asmap")] + asmap: Option, +} + +impl SystemNetwork { + #[cfg(feature = "asmap")] + pub(crate) fn new(v2: &V2Config) -> Self { + Self { asmap: v2.asmap.as_ref().map(|cfg| cfg.asmap.clone()) } + } + + #[cfg(not(feature = "asmap"))] + pub(crate) fn new(_v2: &V2Config) -> Self { Self {} } +} + +impl NetworkView for SystemNetwork { + fn resolve_host(&self, host: &str, port: u16) -> Result> { + if let Ok(ip) = host.parse::() { + return Ok(vec![ip]); + } + + let resolved = (host, port) + .to_socket_addrs() + .with_context(|| format!("Failed to resolve host {host}:{port}"))? + .map(|addr| addr.ip()) + .collect::>(); + Ok(resolved) + } + + #[cfg(feature = "asmap")] + fn lookup_asn(&self, ip: IpAddr) -> Result> { + let Some(asmap) = &self.asmap else { + return Ok(None); + }; + let asn = asmap.lookup(ip); + Ok((asn != 0).then_some(asn)) + } +} + +// CLI/app code: chooses and resolves the receiver's directory before the +// receiver key exists. A main-crate selector cannot do this because it should +// not know about config, DNS, ASMap files, or OHTTP key fetching. +// This network selection can filter by user and directory ASNs, but request-time +// ordering waits until RelaySelector has the receiver pubkey. +pub(crate) fn choose_receiver_network_selection( + v2: &V2Config, + network: &impl NetworkView, +) -> Result { + let chosen_directory = choose_directory(v2, network)?; + #[cfg(feature = "asmap")] + let directory_asn = chosen_directory.asn; + let directory = chosen_directory.resolved; + let mut relays = relay_candidates( + v2, + network, + #[cfg(feature = "asmap")] + directory_asn, + )?; + + if relays.is_empty() { + bail!("No valid relays available for the selected directory {}", directory.url.as_str()); + } + + relays.shuffle(&mut thread_rng()); + Ok(ReceiverNetworkSelection { directory, relays }) +} + +pub(crate) fn relay_selector_from_endpoint( + v2: &V2Config, + endpoint: &str, + network: &impl NetworkView, +) -> Result { + let pj_param = parse_v2_pj_param(endpoint)?; + let directory_url = directory_from_endpoint(endpoint)?; + ensure_directory_trusted(v2, &directory_url)?; + #[cfg(feature = "asmap")] + let (directory, directory_asn) = if let Some(asmap) = &v2.asmap { + let user_asns = user_asns(asmap, network)?; + let directory = resolve_asn_server(network, &directory_url)?; + let directory_asn = directory.asn.expect("ASMap directory candidates carry a resolved ASN"); + if user_asns.contains(&directory_asn) { + bail!( + "Endpoint directory {} shares ASN {} with the user", + directory.resolved.url.as_str(), + directory_asn + ); + } + (directory.resolved, Some(directory_asn)) + } else { + (resolve_server(network, &directory_url)?.resolved, None) + }; + #[cfg(not(feature = "asmap"))] + let directory = resolve_server(network, &directory_url)?.resolved; + + let receiver_pubkey = receiver_pubkey_from_pj_param(&pj_param); + let relays = relay_candidates( + v2, + network, + #[cfg(feature = "asmap")] + directory_asn, + )?; + if relays.is_empty() { + bail!( + "No valid relays available after filtering user and directory ASNs for {}", + directory.url.as_str() + ); + } + + Ok(RelaySelector { relays, receiver_pubkey: receiver_pubkey.clone() }) +} + +// CLI/app code: directory trust and user-AS filtering are wallet policy. The +// main crate can provide selection primitives, but should not own the trusted +// directory list or public-IP/user-AS discovery. +// Choose a directory from the trusted set. With ASMap, reject directories that +// share an ASN with the user. Without ASMap, still resolve and pin the chosen +// directory so later network code uses the checked addresses. +fn choose_directory(v2: &V2Config, network: &impl NetworkView) -> Result { + #[cfg(feature = "asmap")] + if let Some(asmap) = &v2.asmap { + let user_asns = user_asns(asmap, network)?; + let mut directories = resolve_asn_servers(network, v2.trusted_directories())? + .into_iter() + .filter(|candidate| candidate.asn.map(|asn| !user_asns.contains(&asn)).unwrap_or(false)) + .collect::>(); + if directories.is_empty() { + bail!("No trusted directories remain after excluding the user's ASNs"); + } + let index = thread_rng().gen_range(0..directories.len()); + return Ok(directories.swap_remove(index)); + } + + let mut directories = v2.trusted_directories().to_vec(); + if directories.is_empty() { + bail!("At least one trusted directory must be configured"); + } + let index = thread_rng().gen_range(0..directories.len()); + let directory = directories.swap_remove(index); + resolve_server(network, &directory) +} + +// Bridge between CLI policy and protocol-like selection: the CLI resolves, +// filters, and buckets relays; the pure selector only consumes RelayCandidate. +// Build relay candidates for request selection. With ASMap this filters out +// relay ASNs that match the user or chosen directory. Without ASMap, it keeps +// all configured relays and buckets them by URL. +fn relay_candidates( + v2: &V2Config, + network: &impl NetworkView, + #[cfg(feature = "asmap")] directory_asn: Option, +) -> Result> { + #[cfg(feature = "asmap")] + if let Some(asmap) = &v2.asmap { + let user_asns = user_asns(asmap, network)?; + let directory_asn = directory_asn.expect("ASMap directory candidates carry a resolved ASN"); + return asn_relay_candidates( + resolve_asn_servers(network, &v2.ohttp_relays)?, + user_asns, + directory_asn, + ); + } + let mut relays = resolve_servers(network, &v2.ohttp_relays)? + .into_iter() + .map(|target| url_bucket_candidate(target.resolved)) + .collect::>(); + relays.sort_by(|left, right| left.resolved.url.as_str().cmp(right.resolved.url.as_str())); + Ok(relays) +} + +#[cfg(feature = "asmap")] +// CLI/app code: ASMap filtering policy. The main crate should not know how ASNs +// were obtained; it only needs bucket IDs after filtering. +// Convert ASN-resolved servers into relay candidates, dropping relays in ASNs +// that would overlap with the user or selected directory. +fn asn_relay_candidates( + candidates: Vec, + user_asns: BTreeSet, + directory_asn: Asn, +) -> Result> { + let mut filtered = vec![]; + for candidate in candidates { + let asn = + candidate.asn.ok_or_else(|| anyhow!("ASMap relay candidate lacks a resolved ASN"))?; + if asn != directory_asn && !user_asns.contains(&asn) { + filtered.push((candidate.resolved, asn)); + } + } + + if filtered.is_empty() { + return Ok(vec![]); + } + + filtered.sort_by(|(left, _), (right, _)| left.url.as_str().cmp(right.url.as_str())); + Ok(filtered + .into_iter() + .map(|(resolved, asn)| RelayCandidate::new(resolved, RelayBucket::Asn(asn))) + .collect()) +} + +pub(crate) fn ensure_directory_trusted(v2: &V2Config, directory: &Url) -> Result<()> { + if v2 + .trusted_directories() + .iter() + .any(|candidate| normalized_url(candidate) == normalized_url(directory)) + { + return Ok(()); + } + + bail!( + "The directory embedded in the BIP21 URI is not in the configured trusted directory set: {}", + directory.as_str() + ); +} + +fn normalized_url(url: &Url) -> String { + let scheme = url.scheme().to_ascii_lowercase(); + let host = url.host_str().to_ascii_lowercase(); + let default_port = known_default_port(url); + + let mut normalized = format!("{scheme}://{host}"); + if let Some(port) = url.port() { + if Some(port) != default_port { + normalized.push(':'); + normalized.push_str(&port.to_string()); + } + } + + let path = url.path().trim_end_matches('/'); + if !path.is_empty() && path != "/" { + normalized.push_str(path); + } + + normalized +} + +#[cfg(feature = "asmap")] +// CLI/app code: user ASNs come from local config or local public-IP discovery. +// This is intentionally outside the deterministic selector. +// Collect configured user ASNs and ASNs derived from configured public IPs. +fn user_asns(asmap: &AsmapConfig, network: &impl NetworkView) -> Result> { + let mut user_asns = asmap.user_asns.iter().copied().collect::>(); + for ip in &asmap.user_public_ips { + let asn = network.lookup_asn(*ip)?.ok_or_else(|| { + anyhow!("Failed to map user public IP {ip} to an ASN using the ASMap") + })?; + user_asns.insert(asn); + } + Ok(user_asns) +} + +#[cfg(feature = "asmap")] +// CLI/app code: DNS plus ASMap lookup. A future main-crate API could receive +// `RelayBucket::Asn(asn)` from the caller instead of doing this lookup. +// Resolve each server and require all returned IPs to map to exactly one ASN. +fn resolve_asn_servers(network: &impl NetworkView, urls: &[Url]) -> Result> { + urls.iter().map(|url| resolve_asn_server(network, url)).collect() +} + +// CLI/app code: plain DNS resolution for non-ASMap mode. +// Resolve each server without ASMap classification. +fn resolve_servers(network: &impl NetworkView, urls: &[Url]) -> Result> { + urls.iter().map(|url| resolve_server(network, url)).collect() +} + +// CLI/app code: concrete socket addresses are reqwest/network specific. +// DNS resolution plus socket address capture. ASMap is not consulted here. +fn resolve_server(network: &impl NetworkView, url: &Url) -> Result { + Ok(ResolvedServer { + resolved: resolved_url(url, &resolved_ips(network, url)?)?, + #[cfg(feature = "asmap")] + asn: None, + }) +} + +#[cfg(feature = "asmap")] +// CLI/app code: conservative ASMap resolver. Mixed-ASN rejection is policy that +// prepares clean ASN buckets for the pure selector. +// DNS resolution plus ASMap lookup. Mixed-ASN hostnames are rejected because the +// selector reasons about one AS bucket per server. +fn resolve_asn_server(network: &impl NetworkView, url: &Url) -> Result { + let ips = resolved_ips(network, url)?; + + let mut asns = BTreeSet::new(); + for ip in &ips { + let asn = network.lookup_asn(*ip)?.ok_or_else(|| { + anyhow!("{} resolved to {ip}, which could not be mapped to an ASN", url.as_str()) + })?; + asns.insert(asn); + } + + match asns.len() { + 1 => Ok(ResolvedServer { + resolved: resolved_url(url, &ips)?, + asn: Some(*asns.first().expect("checked len")), + }), + 0 => bail!("{} resolved to no ASN-mapped addresses", url.as_str()), + _ => bail!( + "{} resolves to multiple ASNs {:?}; mixed-ASN hostnames are rejected", + url.as_str(), + asns + ), + } +} + +// CLI/app code: concrete DNS result processing. +// Resolve to unique IPs before storing socket addresses or doing ASMap lookup. +fn resolved_ips(network: &impl NetworkView, url: &Url) -> Result> { + let port = relay_port(url)?; + let host = url.host_str(); + let mut ips = if let Some(ip) = parse_ip_literal(&host) { + vec![ip] + } else { + network.resolve_host(&host, port)? + }; + if ips.is_empty() { + bail!("{} resolved to no IP addresses", url.as_str()); + } + ips.sort(); + ips.dedup(); + Ok(ips) +} + +// CLI/app code: used by reqwest `resolve_to_addrs`, not a protocol primitive. +// Preserve the exact addresses that were resolved and checked. +fn resolved_url(url: &Url, ips: &[IpAddr]) -> Result { + let port = relay_port(url)?; + let socket_addrs = ips.iter().copied().map(|ip| SocketAddr::new(ip, port)).collect(); + Ok(ResolvedUrl::new(url.clone(), socket_addrs)) +} + +fn url_bucket_candidate(resolved: ResolvedUrl) -> RelayCandidate { + let bucket = RelayBucket::Url(resolved.url.as_str().to_owned()); + RelayCandidate::new(resolved, bucket) +} + +fn parse_v2_pj_param(endpoint: &str) -> Result { + match PjParam::parse(endpoint)? { + pj_param @ PjParam::V2(_) => Ok(pj_param), + #[cfg(feature = "v1")] + PjParam::V1(_) => bail!("Expected a BIP77 endpoint, got a BIP78 endpoint"), + _ => bail!("Expected a BIP77 endpoint"), + } +} + +fn receiver_pubkey_from_endpoint(endpoint: &str) -> Result { + let pj_param = parse_v2_pj_param(endpoint)?; + Ok(receiver_pubkey_from_pj_param(&pj_param).clone()) +} + +fn receiver_pubkey_from_pj_param(pj_param: &PjParam) -> &HpkePublicKey { + match pj_param { + PjParam::V2(pj_param) => pj_param.receiver_pubkey(), + #[cfg(feature = "v1")] + PjParam::V1(_) => unreachable!("parse_v2_pj_param only returns BIP77 endpoints"), + _ => unreachable!("parse_v2_pj_param only returns BIP77 endpoints"), + } +} + +pub(crate) fn directory_from_endpoint(endpoint: &str) -> Result { + let endpoint = Url::parse(endpoint)?; + let mut raw = format!("{}://{}", endpoint.scheme(), endpoint.host_str()); + if let Some(port) = endpoint.port() { + raw.push(':'); + raw.push_str(&port.to_string()); + } + + let mut segments = endpoint + .path_segments() + .expect("payjoin::Url path_segments() is always available") + .collect::>(); + if segments.is_empty() { + bail!("The BIP77 endpoint has no session path segment"); + } + segments.pop(); + + if segments.is_empty() { + raw.push('/'); + } else { + raw.push('/'); + raw.push_str(&segments.join("/")); + } + + Url::parse(&raw) + .with_context(|| format!("Failed to derive the directory from endpoint {endpoint}")) +} + +fn relay_port(url: &Url) -> Result { + url.port().or_else(|| known_default_port(url)).ok_or_else(|| { + anyhow!("Unsupported scheme {} for relay/directory URL {}", url.scheme(), url.as_str()) + }) +} + +fn known_default_port(url: &Url) -> Option { + match url.scheme() { + "https" => Some(443), + "http" => Some(80), + _ => None, + } +} + +fn parse_ip_literal(host: &str) -> Option { + host.parse::() + .ok() + .or_else(|| host.strip_prefix('[')?.strip_suffix(']')?.parse::().ok()) +} diff --git a/payjoin-cli/tests/e2e.rs b/payjoin-cli/tests/e2e.rs index fe761e1fe..091abd208 100644 --- a/payjoin-cli/tests/e2e.rs +++ b/payjoin-cli/tests/e2e.rs @@ -285,6 +285,8 @@ mod e2e { .arg(&sender_db_path) .arg("--ohttp-relays") .arg(ohttp_relay) + .arg("--pj-directory") + .arg(directory) .arg("send") .arg(&bip21) .arg("--fee-rate") @@ -306,6 +308,8 @@ mod e2e { .arg(&receiver_db_path) .arg("--ohttp-relays") .arg(ohttp_relay) + .arg("--pj-directory") + .arg(directory) .arg("resume") .stdout(Stdio::piped()) .stderr(Stdio::inherit()) @@ -324,6 +328,8 @@ mod e2e { .arg(&sender_db_path) .arg("--ohttp-relays") .arg(ohttp_relay) + .arg("--pj-directory") + .arg(directory) .arg("send") .arg(&bip21) .arg("--fee-rate") @@ -355,6 +361,8 @@ mod e2e { .arg(&receiver_db_path) .arg("--ohttp-relays") .arg(ohttp_relay) + .arg("--pj-directory") + .arg(directory) .arg("resume") .stdout(Stdio::piped()) .stderr(Stdio::inherit()) @@ -374,6 +382,8 @@ mod e2e { .arg(&receiver_db_path) .arg("--ohttp-relays") .arg(ohttp_relay) + .arg("--pj-directory") + .arg(directory) .arg("resume") .stdout(Stdio::piped()) .stderr(Stdio::inherit()) @@ -696,6 +706,8 @@ mod e2e { .arg(&sender_db_path) .arg("--ohttp-relays") .arg(ohttp_relay) + .arg("--pj-directory") + .arg(directory) .arg("send") .arg(&bip21) .arg("--fee-rate")