diff --git a/clickhouse-admin/src/context.rs b/clickhouse-admin/src/context.rs index c55bd25dea0..cfcf8d15e90 100644 --- a/clickhouse-admin/src/context.rs +++ b/clickhouse-admin/src/context.rs @@ -715,7 +715,15 @@ async fn get_retention_policy( // // This is a WAG, but the computations we do today to report table usage are // pretty inexpensive. +// +// NOTE: We explicitly use a smaller value during testing. This is to avoid +// manipulating time with Tokio's test utils. That cannot work, because we run +// every query to ClickHouse with a timeout, and that causes the tokio timers to +// "auto-advance" to the end of that timeout when we pause in a test. +#[cfg(not(test))] const USAGE_UPDATE_INTERVAL: Duration = Duration::from_mins(2); +#[cfg(test)] +const USAGE_UPDATE_INTERVAL: Duration = Duration::from_millis(250); async fn long_running_usage_task( tx: watch::Sender, @@ -945,42 +953,57 @@ mod tests { let usage = context.database_usage(); println!("{usage:#?}"); assert!(usage.last_success.is_some()); - assert!( - usage - .last_success - .expect("Should have successfully computed something") - .tables - .is_empty() - ); - // Jump forward until we actually do compute the usage again. - tokio::time::pause(); - let now = tokio::time::Instant::now(); - while now.elapsed() < 2 * USAGE_UPDATE_INTERVAL { - tokio::time::advance(std::time::Duration::from_millis(10)).await; - } - tokio::time::resume(); - let usage = context.database_usage(); + // Wait until we actually do compute the usage again. + let usage = dev::poll::wait_for_condition( + || async { + let usage = context.database_usage(); + match &usage.last_success { + Some(success) => { + if success.tables.is_empty() { + Err(dev::poll::CondCheckError::<()>::NotYet) + } else { + Ok(usage) + } + } + None => Err(dev::poll::CondCheckError::<()>::NotYet), + } + }, + &std::time::Duration::from_millis(50), + &(2 * USAGE_UPDATE_INTERVAL), + ) + .await + .unwrap(); println!("{usage:#?}"); let tables = &usage .last_success .as_ref() .expect("Should have computed something") .tables; - tables.contains_key(&String::from("oximeter.measurements_u64")); - tables.contains_key(&String::from("oximeter.measurements_u64")); - let version = tables.get(&String::from("oximeter.version")).unwrap(); - assert_eq!(version.n_rows, 1); + assert!( + tables.contains_key(&String::from("oximeter.measurements_u64")) + ); + assert!( + tables.contains_key(&String::from("oximeter.measurements_f64")) + ); + assert!(tables.contains_key(&String::from("oximeter.version"))); - // Kill the database, and force another collection. + // Kill the database, and wait for another collection. This one should + // fail. clickhouse.cleanup().await.unwrap(); - tokio::time::pause(); - let now = tokio::time::Instant::now(); - while now.elapsed() < 2 * USAGE_UPDATE_INTERVAL { - tokio::time::advance(std::time::Duration::from_millis(10)).await; - } - tokio::time::resume(); - let usage = context.database_usage(); + let usage = dev::poll::wait_for_condition( + || async { + let usage = context.database_usage(); + match &usage.last_error { + Some(_) => Ok(usage), + None => Err(dev::poll::CondCheckError::<()>::NotYet), + } + }, + &std::time::Duration::from_millis(100), + &(2 * USAGE_UPDATE_INTERVAL), + ) + .await + .unwrap(); println!("{usage:#?}"); assert!( usage.last_success.is_some(), @@ -989,7 +1012,11 @@ mod tests { let Some(err) = usage.last_error.as_ref() else { panic!("expected an error to have occurred, but found None"); }; - assert!(err.error.starts_with("Failed to check out")); + let is_network_err = |msg: &str| -> bool { + msg.starts_with("Failed to check out") + || msg.starts_with("Native protocol error") + }; + assert!(is_network_err(&err.error), "Expected a network error error"); logctx.cleanup_successful(); }