diff --git a/crates/activate/src/lib.rs b/crates/activate/src/lib.rs index e4be0f5c47f..23b1992183a 100644 --- a/crates/activate/src/lib.rs +++ b/crates/activate/src/lib.rs @@ -17,7 +17,7 @@ pub enum Change { } // JournalSplit describes a collection partition or a shard recovery log. -#[derive(Debug, Default, Clone, serde::Serialize)] +#[derive(Debug, Default, Clone, PartialEq, serde::Serialize)] pub struct JournalSplit { pub name: String, pub labels: LabelSet, @@ -177,6 +177,7 @@ pub async fn activate_collection( )?; let shards = apply_initial_splits(task_template, initial_splits, shards)?; + let partitions = apply_initial_partition_splits(task_spec, partitions)?; let changes_1 = partition_changes(partition_template, partitions)?; let changes_2 = task_changes(task_template, shards, recovery, ops_logs, ops_stats)?; @@ -767,13 +768,23 @@ fn apply_initial_splits<'a>( return Ok(shards); } // The task is being upsert-ed, it's not disabled, and no current shards - // have its template prefix. + // have its template prefix. Use the old-generation shard count if it + // exceeds `initial_splits`, preserving split parallelism across resets. + let effective_splits = shards.len().max(initial_splits); + + if shards.len() > initial_splits { + tracing::info!( + task = template.shard.id, + old_shards = shards.len(), + effective_splits, + "preserving split count from previous generation" + ); + } - // Invent `initial_splits` new shards. - for pivot in 0..initial_splits { + for pivot in 0..effective_splits { let range = flow::RangeSpec { - key_begin: ((1 << 32) * (pivot + 0) / initial_splits) as u32, - key_end: (((1 << 32) * (pivot + 1) / initial_splits) - 1) as u32, + key_begin: ((1 << 32) * (pivot + 0) / effective_splits) as u32, + key_end: (((1 << 32) * (pivot + 1) / effective_splits) - 1) as u32, r_clock_begin: 0, r_clock_end: u32::MAX, }; @@ -794,6 +805,79 @@ fn apply_initial_splits<'a>( Ok(shards) } +/// Produce the partition set to upsert when activating a collection. +/// Mirrors old-generation key-range splits into the new generation only +/// when both old and new generations are unpartitioned, preserving +/// parallelism across reset. +fn apply_initial_partition_splits( + task_spec: Option<&flow::CollectionSpec>, + mut partitions: Vec, +) -> anyhow::Result> { + // No spec: the collection is being deleted. + let Some(spec) = task_spec else { + return Ok(partitions); + }; + let Some(template) = spec.partition_template.as_ref() else { + return Ok(partitions); + }; + // Re-activation: new-generation journals already exist. + if partitions + .iter() + .any(|p| p.name.starts_with(&template.name)) + { + return Ok(partitions); + } + // First-time activation: nothing to mirror from. + if partitions.is_empty() { + return Ok(partitions); + } + // New generation is logically partitioned. The set of partition + // journals is determined by the combinations of partition field values + // observed in incoming data, which we can't know at activation time. + // The runtime mapper creates each partition journal when the first + // document with that field-value combination arrives. + if !spec.partition_fields.is_empty() { + return Ok(partitions); + } + // Old generation was logically partitioned, new one isn't. The old + // journal layout shouldn't carry over. + if partitions.iter().any(|p| { + p.labels + .labels + .iter() + .any(|l| l.name.starts_with(labels::FIELD_PREFIX)) + }) { + return Ok(partitions); + } + + tracing::info!( + collection = template.name, + old_partitions = partitions.len(), + "preserving partition splits from previous generation" + ); + + let old_count = partitions.len(); + for idx in 0..old_count { + let old = &partitions[idx]; + let mut new_labels = LabelSet::default(); + for label in &old.labels.labels { + if label.name == labels::KEY_BEGIN || label.name == labels::KEY_END { + new_labels = labels::add_value(new_labels, &label.name, &label.value); + } + } + let name = labels::partition::full_name(&template.name, &new_labels) + .context("building new-generation partition name from old-generation labels")?; + partitions.push(JournalSplit { + name, + labels: new_labels, + mod_revision: 0, + suspend: None, + }); + } + + Ok(partitions) +} + /// Map a parent JournalSplit into two subdivided splits. pub fn map_partition_to_split( parent: &JournalSplit, @@ -936,12 +1020,14 @@ mod test { .get_key(&models::Collection::new("example/collection")) .unwrap(); - let Some(flow::CollectionSpec { - partition_template: Some(partition_template), - partition_fields, - projections, - .. - }) = spec + let Some( + collection_spec @ flow::CollectionSpec { + partition_template: Some(partition_template), + partition_fields, + projections, + .. + }, + ) = spec else { unreachable!() }; @@ -1319,12 +1405,12 @@ mod test { insta::assert_json_snapshot!("delete", (partition_changes, task_changes)); } - // Case: test mixed deletion and creation. + // Case: test mixed deletion and creation, as happens during a reset. + // Simulates existing data-plane specs from an older generation being + // swapped out for a new generation. The fixture is logically + // partitioned, so partitions are not pre-created. Shard split counts + // are preserved: 3 old-gen shards with initial_splits=1 produce 3. { - // Simulate existing data-plane specs which were created under an - // older initial publication ID, and which are now being swapped out. - // This emulates a deletion followed by a re-creation, where we failed - // to activate the intermediary deletion. let mut all_partitions = all_partitions.clone(); let mut all_shards = all_shards.clone(); let mut all_recovery = all_recovery.clone(); @@ -1339,7 +1425,9 @@ mod test { *name = name.replace("2020202020202020", "replaced-pub-id"); } - let shards = apply_initial_splits(Some(task_template), 4, all_shards).unwrap(); + let shards = apply_initial_splits(Some(task_template), 1, all_shards).unwrap(); + let all_partitions = + apply_initial_partition_splits(Some(collection_spec), all_partitions).unwrap(); let partition_changes = partition_changes(Some(&partition_template), all_partitions).unwrap(); @@ -1363,6 +1451,64 @@ mod test { insta::assert_json_snapshot!("create_and_delete", (partition_changes, task_changes)); } + // Case: reset of an unpartitioned collection mirrors key-range + // splits into the new generation. Builds a CollectionSpec with no + // partition_fields and old-gen partitions whose labels carry only + // KEY_BEGIN/KEY_END (the shape of an unpartitioned collection). + { + let mut unpartitioned_spec = collection_spec.clone(); + unpartitioned_spec.partition_fields = Vec::new(); + + let make_unpartitioned = |key_begin: u32, key_end: u32| { + let labels = labels::partition::encode_key_range_labels( + LabelSet::default(), + key_begin, + key_end, + ); + let name = labels::partition::full_name(&partition_template.name, &labels) + .unwrap() + .replace("2020202020202020", "replaced-pub-id"); + JournalSplit { + name, + labels, + mod_revision: 222, + suspend: None, + } + }; + + let old_partitions = vec![ + make_unpartitioned(0, 0x7fffffff), + make_unpartitioned(0x80000000, u32::MAX), + ]; + + let new_partitions = + apply_initial_partition_splits(Some(&unpartitioned_spec), old_partitions).unwrap(); + + insta::assert_json_snapshot!("reset_unpartitioned", new_partitions); + } + + // Case: reset that removes logical partitioning. Old generation has + // field-labeled partitions, new generation has no partition_fields. + // The function must not mirror old per-field-value partitions onto + // the new unpartitioned topology — partition_changes will delete + // them and the runtime mapper creates the single new journal on + // first commit. + { + let mut unpartitioned_spec = collection_spec.clone(); + unpartitioned_spec.partition_fields = Vec::new(); + + let mut old_partitions = all_partitions.clone(); + for JournalSplit { name, .. } in old_partitions.iter_mut() { + *name = name.replace("2020202020202020", "replaced-pub-id"); + } + let expected = old_partitions.clone(); + + let result = + apply_initial_partition_splits(Some(&unpartitioned_spec), old_partitions).unwrap(); + + assert_eq!(result, expected); + } + // Case: split a shard on its key or clock. { let parent = all_shards.first().unwrap(); diff --git a/crates/activate/src/snapshots/activate__test__create_and_delete.snap b/crates/activate/src/snapshots/activate__test__create_and_delete.snap index 582848756a1..1e7c00d34c1 100644 --- a/crates/activate/src/snapshots/activate__test__create_and_delete.snap +++ b/crates/activate/src/snapshots/activate__test__create_and_delete.snap @@ -68,7 +68,7 @@ expression: "(partition_changes, task_changes)" }, { "name": "estuary.dev/key-end", - "value": "3fffffff" + "value": "55555554" }, { "name": "estuary.dev/log-level", @@ -151,7 +151,7 @@ expression: "(partition_changes, task_changes)" { "Shard": { "upsert": { - "id": "derivation/example/derivation/2020202020202020/40000000-00000000", + "id": "derivation/example/derivation/2020202020202020/55555555-00000000", "recoveryLogPrefix": "recovery", "hintPrefix": "/estuary/flow/hints", "hintBackups": 2, @@ -170,11 +170,11 @@ expression: "(partition_changes, task_changes)" }, { "name": "estuary.dev/key-begin", - "value": "40000000" + "value": "55555555" }, { "name": "estuary.dev/key-end", - "value": "7fffffff" + "value": "aaaaaaa9" }, { "name": "estuary.dev/log-level", @@ -214,7 +214,7 @@ expression: "(partition_changes, task_changes)" { "Journal": { "upsert": { - "name": "recovery/derivation/example/derivation/2020202020202020/40000000-00000000", + "name": "recovery/derivation/example/derivation/2020202020202020/55555555-00000000", "replication": 3, "labels": { "labels": [ @@ -257,7 +257,7 @@ expression: "(partition_changes, task_changes)" { "Shard": { "upsert": { - "id": "derivation/example/derivation/2020202020202020/80000000-00000000", + "id": "derivation/example/derivation/2020202020202020/aaaaaaaa-00000000", "recoveryLogPrefix": "recovery", "hintPrefix": "/estuary/flow/hints", "hintBackups": 2, @@ -276,113 +276,7 @@ expression: "(partition_changes, task_changes)" }, { "name": "estuary.dev/key-begin", - "value": "80000000" - }, - { - "name": "estuary.dev/key-end", - "value": "bfffffff" - }, - { - "name": "estuary.dev/log-level", - "value": "info" - }, - { - "name": "estuary.dev/logs-journal", - "value": "ops/tasks/BASE_NAME/logs/2020202020202020/kind=derivation/name=example%2Fderivation/pivot=00" - }, - { - "name": "estuary.dev/rclock-begin", - "value": "00000000" - }, - { - "name": "estuary.dev/rclock-end", - "value": "ffffffff" - }, - { - "name": "estuary.dev/stats-journal", - "value": "ops/tasks/BASE_NAME/stats/2020202020202020/kind=derivation/name=example%2Fderivation/pivot=00" - }, - { - "name": "estuary.dev/task-name", - "value": "example/derivation" - }, - { - "name": "estuary.dev/task-type", - "value": "derivation" - } - ] - }, - "ringBufferSize": 65536, - "readChannelSize": 4096 - } - } - }, - { - "Journal": { - "upsert": { - "name": "recovery/derivation/example/derivation/2020202020202020/80000000-00000000", - "replication": 3, - "labels": { - "labels": [ - { - "name": "app.gazette.dev/managed-by", - "value": "estuary.dev/flow" - }, - { - "name": "content-type", - "value": "application/x-gazette-recoverylog" - }, - { - "name": "estuary.dev/build", - "value": "0101010101010101" - }, - { - "name": "estuary.dev/task-name", - "value": "example/derivation" - }, - { - "name": "estuary.dev/task-type", - "value": "derivation" - } - ] - }, - "fragment": { - "length": "268435456", - "compressionCodec": "SNAPPY", - "stores": [ - "gs://example-bucket/" - ], - "refreshInterval": "300s", - "flushInterval": "172800s" - }, - "flags": 4, - "maxAppendRate": "4194304" - } - } - }, - { - "Shard": { - "upsert": { - "id": "derivation/example/derivation/2020202020202020/c0000000-00000000", - "recoveryLogPrefix": "recovery", - "hintPrefix": "/estuary/flow/hints", - "hintBackups": 2, - "maxTxnDuration": "60s", - "minTxnDuration": "0s", - "hotStandbys": 3, - "labels": { - "labels": [ - { - "name": "app.gazette.dev/managed-by", - "value": "estuary.dev/flow" - }, - { - "name": "estuary.dev/build", - "value": "0101010101010101" - }, - { - "name": "estuary.dev/key-begin", - "value": "c0000000" + "value": "aaaaaaaa" }, { "name": "estuary.dev/key-end", @@ -426,7 +320,7 @@ expression: "(partition_changes, task_changes)" { "Journal": { "upsert": { - "name": "recovery/derivation/example/derivation/2020202020202020/c0000000-00000000", + "name": "recovery/derivation/example/derivation/2020202020202020/aaaaaaaa-00000000", "replication": 3, "labels": { "labels": [ diff --git a/crates/activate/src/snapshots/activate__test__reset_unpartitioned.snap b/crates/activate/src/snapshots/activate__test__reset_unpartitioned.snap new file mode 100644 index 00000000000..81a29de9f54 --- /dev/null +++ b/crates/activate/src/snapshots/activate__test__reset_unpartitioned.snap @@ -0,0 +1,74 @@ +--- +source: crates/activate/src/lib.rs +expression: new_partitions +--- +[ + { + "name": "example/collection/replaced-pub-id/pivot=00", + "labels": { + "labels": [ + { + "name": "estuary.dev/key-begin", + "value": "00000000" + }, + { + "name": "estuary.dev/key-end", + "value": "7fffffff" + } + ] + }, + "mod_revision": 222, + "suspend": null + }, + { + "name": "example/collection/replaced-pub-id/pivot=80000000", + "labels": { + "labels": [ + { + "name": "estuary.dev/key-begin", + "value": "80000000" + }, + { + "name": "estuary.dev/key-end", + "value": "ffffffff" + } + ] + }, + "mod_revision": 222, + "suspend": null + }, + { + "name": "example/collection/2020202020202020/pivot=00", + "labels": { + "labels": [ + { + "name": "estuary.dev/key-begin", + "value": "00000000" + }, + { + "name": "estuary.dev/key-end", + "value": "7fffffff" + } + ] + }, + "mod_revision": 0, + "suspend": null + }, + { + "name": "example/collection/2020202020202020/pivot=80000000", + "labels": { + "labels": [ + { + "name": "estuary.dev/key-begin", + "value": "80000000" + }, + { + "name": "estuary.dev/key-end", + "value": "ffffffff" + } + ] + }, + "mod_revision": 0, + "suspend": null + } +] diff --git a/crates/dekaf/tests/e2e/not_ready.rs b/crates/dekaf/tests/e2e/not_ready.rs index 0407d70a1ee..b9aedfeba99 100644 --- a/crates/dekaf/tests/e2e/not_ready.rs +++ b/crates/dekaf/tests/e2e/not_ready.rs @@ -1,7 +1,11 @@ use super::{ DekafTestEnv, - raw_kafka::{TestKafkaClient, list_offsets_partition_error, metadata_leader_epoch}, + raw_kafka::{ + TestKafkaClient, fetch_partition_error, list_offsets_partition_error, + offset_for_epoch_result, + }, }; +use anyhow::Context; use kafka_protocol::ResponseError; use serde_json::json; use std::time::Duration; @@ -9,136 +13,64 @@ use std::time::Duration; const FIXTURE: &str = include_str!("fixtures/basic.flow.yaml"); const SPEC_REFRESH_TIMEOUT: Duration = Duration::from_secs(30); -/// Test that all partition-aware operations return LeaderNotAvailable when no journals exist. -/// -/// After a collection reset, there's a window where the new partition template exists -/// but the journal hasn't been created yet. Dekaf should return LeaderNotAvailable -/// for all operations that require partition data, signaling clients to retry. +/// A freshly-published collection has no partition journals until the capture +/// commits its first document (journals are created lazily by the runtime +/// mapper). During that window dekaf's binding lookup sees an empty partition +/// list and must return LeaderNotAvailable so consumers retry rather than +/// erroring out. #[tokio::test] async fn test_all_operations_return_leader_not_available_when_no_journals() -> anyhow::Result<()> { super::init_tracing(); let env = DekafTestEnv::setup("not_ready", FIXTURE).await?; let info = env.connection_info().await?; - - env.inject_documents("data", vec![json!({"id": "1", "value": "initial"})]) - .await?; - let token = env.dekaf_token()?; let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; - // Verify all operations work initially + let leader_not_available = ResponseError::LeaderNotAvailable.code(); + + // No documents have been injected yet, so no journals exist. All + // partition-aware operations should report LeaderNotAvailable. let metadata = client.metadata(&["test_topic"]).await?; - let initial_epoch = - metadata_leader_epoch(&metadata, "test_topic", 0).expect("should have epoch before reset"); + let topic = metadata + .topics + .iter() + .find(|t| t.name.as_ref().map(|n| n.as_str()) == Some("test_topic")) + .context("test_topic missing from metadata response")?; + assert_eq!( + topic.error_code, leader_not_available, + "Metadata should return LeaderNotAvailable when no journals exist" + ); let list_resp = client - .list_offsets_with_epoch("test_topic", 0, -1, initial_epoch) + .list_offsets_with_epoch("test_topic", 0, -1, -1) .await?; - assert!( - list_offsets_partition_error(&list_resp, "test_topic", 0) == Some(0), - "ListOffsets should succeed before reset" + assert_eq!( + list_offsets_partition_error(&list_resp, "test_topic", 0), + Some(leader_not_available), + "ListOffsets should return LeaderNotAvailable when no journals exist" ); - let fetch_resp = client - .fetch_with_epoch("test_topic", 0, 0, initial_epoch) - .await?; - assert!( - super::raw_kafka::fetch_partition_error(&fetch_resp, "test_topic", 0) == Some(0), - "Fetch should succeed before reset" + let fetch_resp = client.fetch_with_epoch("test_topic", 0, 0, -1).await?; + assert_eq!( + fetch_partition_error(&fetch_resp, "test_topic", 0), + Some(leader_not_available), + "Fetch should return LeaderNotAvailable when no journals exist" ); - let epoch_resp = client - .offset_for_leader_epoch("test_topic", 0, initial_epoch) - .await?; - assert!( - super::raw_kafka::offset_for_epoch_result(&epoch_resp, "test_topic", 0) - .map_or(false, |r| r.error_code == 0), - "OffsetForLeaderEpoch should succeed before reset" + let epoch_resp = client.offset_for_leader_epoch("test_topic", 0, 1).await?; + let epoch_result = offset_for_epoch_result(&epoch_resp, "test_topic", 0) + .context("test_topic partition missing from OffsetForLeaderEpoch response")?; + assert_eq!( + epoch_result.error_code, leader_not_available, + "OffsetForLeaderEpoch should return LeaderNotAvailable when no journals exist" ); - // Reset collection without injecting documents afterward - leaves journals uncreated - env.disable_capture().await?; - env.reset_collections().await?; - env.enable_capture().await?; - - let capture = env.capture_name().unwrap(); - env.wait_for_primary(capture, SPEC_REFRESH_TIMEOUT).await?; - - // Poll until we enter the NotReady state - let deadline = std::time::Instant::now() + SPEC_REFRESH_TIMEOUT; - let mut observed_not_ready = false; - - while std::time::Instant::now() < deadline { - let metadata = client.metadata(&["test_topic"]).await?; - - let topic = metadata - .topics - .iter() - .find(|t| t.name.as_ref().map(|n| n.as_str()) == Some("test_topic")); - - let Some(topic) = topic else { - tokio::time::sleep(Duration::from_millis(500)).await; - continue; - }; - - // Check if we're in NotReady state (metadata returns LeaderNotAvailable) - if topic.error_code == ResponseError::LeaderNotAvailable.code() { - observed_not_ready = true; - - // Verify ListOffsets also returns LeaderNotAvailable - let list_resp = client - .list_offsets_with_epoch("test_topic", 0, -1, -1) - .await?; - let list_error = list_offsets_partition_error(&list_resp, "test_topic", 0); - assert_eq!( - list_error, - Some(ResponseError::LeaderNotAvailable.code()), - "ListOffsets should return LeaderNotAvailable during NotReady state" - ); - - // Verify Fetch also returns LeaderNotAvailable - let fetch_resp = client.fetch_with_epoch("test_topic", 0, 0, -1).await?; - let fetch_error = super::raw_kafka::fetch_partition_error(&fetch_resp, "test_topic", 0); - assert_eq!( - fetch_error, - Some(ResponseError::LeaderNotAvailable.code()), - "Fetch should return LeaderNotAvailable during NotReady state" - ); - - // Verify OffsetForLeaderEpoch also returns LeaderNotAvailable - let epoch_resp = client.offset_for_leader_epoch("test_topic", 0, 1).await?; - let epoch_result = - super::raw_kafka::offset_for_epoch_result(&epoch_resp, "test_topic", 0); - assert!( - epoch_result.map_or(false, |r| r.error_code - == ResponseError::LeaderNotAvailable.code()), - "OffsetForLeaderEpoch should return LeaderNotAvailable during NotReady state" - ); - - break; - } - - // Check if journals were created before we could test - if let Some(new_epoch) = metadata_leader_epoch(&metadata, "test_topic", 0) { - if new_epoch > initial_epoch && !topic.partitions.is_empty() { - anyhow::bail!("Journals were created before we could test NotReady state"); - } - } - - tokio::time::sleep(Duration::from_millis(500)).await; - } - - assert!( - observed_not_ready, - "Test never observed NotReady state within timeout - unable to verify LeaderNotAvailable behavior" - ); - - // Verify that after injecting a document (creating journals), operations work again - env.inject_documents("data", vec![json!({"id": "2", "value": "post-reset"})]) + // Inject a document. The runtime mapper creates the partition journal on + // first commit, after which all operations should succeed. + env.inject_documents("data", vec![json!({"id": "1", "value": "first"})]) .await?; - // Poll until metadata returns successfully with partitions let deadline = std::time::Instant::now() + SPEC_REFRESH_TIMEOUT; loop { let metadata = client.metadata(&["test_topic"]).await?; @@ -159,5 +91,21 @@ async fn test_all_operations_return_leader_not_available_when_no_journals() -> a tokio::time::sleep(Duration::from_millis(500)).await; } + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, -1) + .await?; + assert_eq!( + list_offsets_partition_error(&list_resp, "test_topic", 0), + Some(0), + "ListOffsets should succeed once journals exist" + ); + + let fetch_resp = client.fetch_with_epoch("test_topic", 0, 0, -1).await?; + assert_eq!( + fetch_partition_error(&fetch_resp, "test_topic", 0), + Some(0), + "Fetch should succeed once journals exist" + ); + Ok(()) }