KAFKA-20335: Document KIP-1271 headers-aware state stores#21840
KAFKA-20335: Document KIP-1271 headers-aware state stores#21840nileshkumar3 wants to merge 14 commits intoapache:trunkfrom
Conversation
aliehsaeedii
left a comment
There was a problem hiding this comment.
I know the PR is still a draft. But I had some concerns/questions. Jot them here not to forget.
1- Should we mention performance consideration? mention of potential storage or performance implications of storing headers. Also fo the users who do not use headers?
2- For changing dsl.store.format: consider adding a sentence like: "Existing stores will
be rebuilt from changelogs using the new format; local state directories may be deleted during rolling restart." (upgrade clarity)
@aliehsaeedii Thanks for review. I will do the required changes. |
Added brief notes: default = no headers in state; headers = bigger changelog/state and possible extra I/O. Upgrade section now says format changes rebuild from changelogs and local state dirs may be cleared on rolling restart—plan for restore time. |
|
@mjsax @aliehsaeedii Would appreciate your review on this PR. |
|
@mjsax @aliehsaeedii No urgency — when it fits your queue, could you take time to review this. |
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @nileshkumar3 for creating the PR and following-up:
Looks very good. Just some nits on consistency:
- "record headers" vs "Kafka record headers"
- "headers-aware" vs "headers aware"
Also I think the backward compatability and upgrade paths listed in KIP-1271 must be mentioned as well. WDYT?
Co-authored-by: Alieh Saeedi <107070585+aliehsaeedii@users.noreply.github.com>
Thanks @aliehsaeedii for the review! I've addressed all the nits: "record headers" vs "Kafka record headers": Normalized to "record headers" throughout. "headers-aware" vs "headers aware": all instances already use the hyphenated "headers-aware" consistently. No Backward compatibility and upgrade paths: Added a dedicated Backward compatibility paragraph in the upgrade guide. |
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks for updating the PR. I left a comment and also maybe add that IQv2 is not supported yet.
Approved from my side (conditional).
thanks for review, done all the review comments. |
Thanks for the review, all the review comments have been addressed. Let me know if anything else is needed. |
|
|
||
| In the **Processor API**, use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `*WithHeadersBuilder` factory methods—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreWithHeadersBuilder`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreWithHeadersBuilder`. Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with Kafka record headers (see that interface for the session value type). | ||
|
|
||
| In the **DSL**, enable the corresponding built-in stores with the `dsl.store.format` application setting; see the [configuration reference](config-streams.html) and [DSL stateful operators](dsl-api.html#stateful-transformations). Details and migration notes are in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores). |
There was a problem hiding this comment.
Could config-streams.html benefit from anchors?
There was a problem hiding this comment.
NA as removed this part from the PR.
| Stateful transformations depend on state for processing inputs and producing outputs and require a [state store](../architecture.html#streams_architecture_state) associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary. | ||
|
|
||
| **Note:** Following store types are used regardless of the possibly specified type (via the parameter `materialized`): | ||
| **Note:** Unless you provide a custom store via `Materialized`, the DSL picks built-in store implementations from your [`dsl.store.format`](config-streams.html) setting and the operator semantics below: |
There was a problem hiding this comment.
Could config-streams.html benefit from anchors?
There was a problem hiding this comment.
NA as we removed this from this pr.
| * For DSL operators, store data is upgraded lazily in the background. | ||
| * No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write. | ||
|
|
||
| ## Headers in state stores (KIP-1271) |
There was a problem hiding this comment.
Add Kafka version as well?
There was a problem hiding this comment.
added the version
|
|
||
| ### Record headers in state stores (KIP-1271) | ||
|
|
||
| By default, DSL state stores retain keys, values, and timestamps (where applicable) but not record headers. You can opt in to **headers-aware** stores that also persist the headers from input records by setting `dsl.store.format` to `headers` (case-insensitive). The default `default` preserves the pre-4.3 store layout for each operator. Changing `dsl.store.format` alters changelog serialization and RocksDB layout for affected stores: **existing store data is rebuilt from changelog topics using the new format**, and **local state directories for those tasks may be removed during a rolling restart** so that restoration runs against the new layout—plan for restore duration and, when moving to `headers`, larger changelogs if records carry significant headers. Perform a rolling upgrade and allow tasks to finish restoring before relying on interactive queries or sink output. Interactive Queries (IQv1), `TopologyTestDriver`, and built-in upgrade paths support the new formats; for IQv2, queries run against headers-aware stores, but query results do not include record headers—see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores) and the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores-kip-1271). |
There was a problem hiding this comment.
Changing
dsl.store.formatalters changelog serialization and RocksDB layout for affected stores: existing store data is rebuilt from changelog topics using the new format
This is not correct. The upgrade path is designed for in-place migration with zero downtime. Local state directories are NOT automatically removed during rolling restarts when changing the format. Instead:
- RocksDB stores use a dual-column family approach for lazy migration
- Legacy records are served with empty header sets (headers count = 0)
- New writes use the new headers-aware format
- On read, legacy data is lazily converted and migrated to the new column family
There was a problem hiding this comment.
This is KIP-1285 again, not 1271.
|
|
||
| ### Record headers in state stores (KIP-1271) | ||
|
|
||
| By default, DSL state stores retain keys, values, and timestamps (where applicable) but not record headers. You can opt in to **headers-aware** stores that also persist the headers from input records by setting `dsl.store.format` to `headers` (case-insensitive). The default `default` preserves the pre-4.3 store layout for each operator. Changing `dsl.store.format` alters changelog serialization and RocksDB layout for affected stores: **existing store data is rebuilt from changelog topics using the new format**, and **local state directories for those tasks may be removed during a rolling restart** so that restoration runs against the new layout—plan for restore duration and, when moving to `headers`, larger changelogs if records carry significant headers. Perform a rolling upgrade and allow tasks to finish restoring before relying on interactive queries or sink output. Interactive Queries (IQv1), `TopologyTestDriver`, and built-in upgrade paths support the new formats; for IQv2, queries run against headers-aware stores, but query results do not include record headers—see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores) and the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores-kip-1271). |
There was a problem hiding this comment.
By default, DSL state stores retain keys, values, and timestamps (where applicable) but not record headers.
DSL is not mentioned in kip-1271 at all. We have another ticket for kip-1285
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks, @nileshkumar3. I think we may have slightly mixed up the two KIPs — KIP-1285 is already covered by this PR: #21905. Would it make more sense to focus this PR on KIP-1275 instead?
@aliehsaeedii thanks for reviewing again. I will do the changes in the weekend, assuming no urgency. |
There was a problem hiding this comment.
This PR should update the table in "Defining and creating a State Store" (https://kafka.apache.org/42/streams/developer-guide/processor-api/#defining-and-creating-a-state-store)
We might also want to update https://kafka.apache.org/42/streams/developer-guide/interactive-queries/ ?
| <tr> | ||
| <td> | ||
|
|
||
| dsl.store.format |
There was a problem hiding this comment.
This belong to ticket KAFKA-20336 for which we already have a PR: #21905
I think we should remove it from this PR.
| Stateful transformations depend on state for processing inputs and producing outputs and require a [state store](../architecture.html#streams_architecture_state) associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary. | ||
|
|
||
| **Note:** Following store types are used regardless of the possibly specified type (via the parameter `materialized`): | ||
| **Note:** Unless you provide a custom store via `Materialized`, the DSL picks built-in store implementations from your [`dsl.store.format`](config-streams.html) setting and the operator semantics below: |
There was a problem hiding this comment.
Same comment as above.
This PR should only update docs related to KIP-1271, which does not contain any DSL changes.
| * For DSL operators, store data is upgraded lazily in the background. | ||
| * No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write. | ||
|
|
||
| ## Headers in state stores (KIP-1271) |
There was a problem hiding this comment.
| ## Headers in state stores (KIP-1271) | |
| ## Headers in State Stores |
|
|
||
| Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps. This is useful when header metadata must survive stateful processing (aggregations, joins, and similar operations). | ||
|
|
||
| **Footprint and performance:** If you keep `dsl.store.format` at `default` and do not use headers-aware `Stores` suppliers, record headers are not persisted in state—behavior and storage remain comparable to pre-KIP-1271 DSL materializations. Headers-aware stores embed header bytes in each stored update, which grows changelog segments and local RocksDB usage and can increase write and restore I/O; impact scales with header size and update rate. Enable headers in state only when downstream logic needs that metadata. |
There was a problem hiding this comment.
This does not belong into the PAPI section of the docs
|
|
||
| **Footprint and performance:** If you keep `dsl.store.format` at `default` and do not use headers-aware `Stores` suppliers, record headers are not persisted in state—behavior and storage remain comparable to pre-KIP-1271 DSL materializations. Headers-aware stores embed header bytes in each stored update, which grows changelog segments and local RocksDB usage and can increase write and restore I/O; impact scales with header size and update rate. Enable headers in state only when downstream logic needs that metadata. | ||
|
|
||
| In the **Processor API**, use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `*WithHeadersBuilder` factory methods—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreWithHeadersBuilder`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreWithHeadersBuilder`. Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with Kafka record headers (see that interface for the session value type). |
There was a problem hiding this comment.
In the Processor API
Redundant. We are in the "processor API" section of the docs.
|
|
||
| In the **Processor API**, use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `*WithHeadersBuilder` factory methods—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreWithHeadersBuilder`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreWithHeadersBuilder`. Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with Kafka record headers (see that interface for the session value type). | ||
|
|
||
| In the **DSL**, enable the corresponding built-in stores with the `dsl.store.format` application setting; see the [configuration reference](config-streams.html) and [DSL stateful operators](dsl-api.html#stateful-transformations). Details and migration notes are in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores). |
|
|
||
| Kafka Streams now allows to purge local state directories and checkpoint files during application startup if they have not been modified for a certain period of time. This can be configured via the new `state.cleanup.dir.max.age.ms` config. More details can be found in [KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup) | ||
|
|
||
| ### Record headers in state stores (KIP-1271) |
There was a problem hiding this comment.
Do we really need a headline for this?
|
|
||
| ### Record headers in state stores (KIP-1271) | ||
|
|
||
| By default, DSL state stores retain keys, values, and timestamps (where applicable) but not record headers. You can opt in to **headers-aware** stores that also persist the headers from input records by setting `dsl.store.format` to `headers` (case-insensitive). The default `default` preserves the pre-4.3 store layout for each operator. Changing `dsl.store.format` alters changelog serialization and RocksDB layout for affected stores: **existing store data is rebuilt from changelog topics using the new format**, and **local state directories for those tasks may be removed during a rolling restart** so that restoration runs against the new layout—plan for restore duration and, when moving to `headers`, larger changelogs if records carry significant headers. Perform a rolling upgrade and allow tasks to finish restoring before relying on interactive queries or sink output. Interactive Queries (IQv1), `TopologyTestDriver`, and built-in upgrade paths support the new formats; for IQv2, queries run against headers-aware stores, but query results do not include record headers—see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores) and the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores-kip-1271). |
There was a problem hiding this comment.
This is KIP-1285 again, not 1271.
|
@mjsax @aliehsaeedii Thanks for the review and feedback. I’ve addressed the comments and updated the PR accordingly, Please take another look when you get a chance. |
mjsax
left a comment
There was a problem hiding this comment.
Thanks for the update. Couple of follow ups.
There was a problem hiding this comment.
Kafka Streams currently has two built-in types:
This need to get updated. "two" was outdated even before we added header stores. It's eight now with header stores.
There was a problem hiding this comment.
updated with all 8 here.
|
|
||
| The method `KafkaStreams#store(...)` finds an application instance's local state stores by name and type. Note that interactive queries are not supported for [versioned state stores](processor-api.html#streams-developer-guide-state-store-versioned) at this time. | ||
|
|
||
| Headers-aware state stores added in Kafka 4.3 ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) work with the same **`QueryableStoreTypes`** helpers as headerless stores—for example `timestampedKeyValueStore()` and `timestampedWindowStore()` also match the corresponding `*WithHeaders` implementations, and `sessionStore()` applies to `SessionStoreWithHeaders`. However, **`KafkaStreams#store(...)`** still exposes the usual read-only shapes: plain values or **`ValueAndTimestamp`** for timestamped key-value and window stores; **record headers are not included** in those IQv1 results (facades strip them). Session queries return only the aggregation value, not header metadata. **[`ValueTimestampHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html)** is what processors see when reading the store inside the topology. IQv2 may still omit headers in some query paths—see the KIP, release notes, and [Processor API: Headers in State Stores](processor-api.html#headers-in-state-stores). |
There was a problem hiding this comment.
helpers as headerless stores—for example
Not sure if the dash (which I believe is supposed to be an m-dash will render correctly? Can we confirm?
However,
KafkaStreams#store(...)still exposes the usual read-only shapes
Well, depends what queryable-store-type you pick. You can also get the headers if you want. -- In combination with the later **[ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html)** is what processors see when reading the store inside the topology. it seems you say that you cannot get headers at all with IQv1, but you can.
plain values or
ValueAndTimestampfor timestamped key-value and window stores
-> for (timestamped) key-value
(facades strip them)
I would remove this -- implementation detail.
IQv2 may still omit headers in some query paths—see the KIP, release notes, and Processor API: Headers in State Stores.
IQv2 is not implemented with AK 4.3 yet -- and we will need follow up KIPs to add support.
Maybe the whole paragraph can we simplified a little bit? It add good context that "richer" stores (ts, header) can we queried with "lower" queryable store types, but that's not header specific.
There was a problem hiding this comment.
verified m-das works. Updated this part for IQV1 and removed any IQV2 reference here.
There was a problem hiding this comment.
Seems this part is also outdated. This example creates a key-value store -- as it's a DSL example, it actually creates a timestamped store.
There was a problem hiding this comment.
Similar further below: This example creates a window store
There was a problem hiding this comment.
Guess we can still keep the code snippets as-is and query they stores for "plain values" only, but we might want to add a note about it.
|
|
||
| Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)). This is useful when header metadata must survive stateful processing (for example aggregations or joins implemented with the Processor API). | ||
|
|
||
| Use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. (The public method names are not parallel: key-value and session use `…BuilderWithHeaders`, while window uses `timestampedWindowStoreWithHeadersBuilder`—see javadoc.) Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with record headers (see that interface for the session value type). |
There was a problem hiding this comment.
Similar question as above about the dash
There was a problem hiding this comment.
The public method names are not parallel: key-value and session use
…BuilderWithHeaders, while window usestimestampedWindowStoreWithHeadersBuilder—see javadoc.)
I think we had a bug in the code at some point about inconsistent naming, but the KIP did specify consistent names, and it was fixed in the code already --if it was not fixed yet, we should fix it.
There was a problem hiding this comment.
(see that interface for the session value type)
Why not link to it, similar to what we do for ValueTimestampHeaders ?
There was a problem hiding this comment.
mash works fine. removed the inconsistency part. added the link.
There was a problem hiding this comment.
Docs updated to match the current shipping API. The KIP used a uniform *WithHeadersBuilder style, but trunk uses *BuilderWithHeaders for key-value/session and timestampedWindowStoreWithHeadersBuilder for window, so the docs reflect the implemented names.
There was a problem hiding this comment.
if we need to fix the naming, we can do as a separate follow up PR.
|
|
||
| Kafka Streams now allows to purge local state directories and checkpoint files during application startup if they have not been modified for a certain period of time. This can be configured via the new `state.cleanup.dir.max.age.ms` config. More details can be found in [KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup) | ||
|
|
||
| Kafka Streams 4.3 adds **headers-aware** state stores for the Processor API ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)): you can opt in by using the new `Stores` suppliers whose names end with `WithHeaders` and the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders` (see javadoc; naming is not uniform across store types). See the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores). Existing applications that keep using the same headerless `Stores` suppliers and builders are unaffected—storage format, changelogs, and performance stay as before. For stores that adopt the headers-aware format, KIP-1271 defines a single rolling-bounce upgrade: the changelog topic format is unchanged, legacy rows are read with empty header sets until rewritten, and RocksDB-backed stores migrate data lazily on access; downgrading in place after migration is not supported except by clearing local store data and restoring from the changelog. Storing headers increases disk and serialization cost versus headerless stores; the KIP discusses lazy header parsing and other performance considerations. `TopologyTestDriver` and Interactive Queries (IQv1) support the new store types, but the usual `store()` facades still return values (or `ValueAndTimestamp`) **without** record headers; IQv2 may not expose headers in all query paths either—refer to the KIP, the [interactive queries guide](developer-guide/interactive-queries.html#querying-local-state-stores-for-an-app-instance), and release notes. |
There was a problem hiding this comment.
(see javadoc; naming is not uniform across store types)
Same. If there is a inconsistency, we should fix it.
There was a problem hiding this comment.
unaffected—storage format, changelogs,
m-dash ok?
There was a problem hiding this comment.
but the usual
store()facades still return values (orValueAndTimestamp) without record headers; IQv2 may not expose headers in all query paths either—refer to the KIP, the interactive queries guide, and release notes.
Similar comment as elsewhere. Doesn't sound correct
There was a problem hiding this comment.
removed the inconsistency part. m-dash renders fine. removed IQv2 part from the doc.
|
@mjsax Thanks for the review. Fixed all the review comments.please have another round of review. |
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @nileshkumar3 for the update. You added new content as Matthias suggested. I left most of the comments there. My suggestion for interactive-queries.md: leave the existing intro untouched, and put all the new content into one subsection that lives right after the bullet list, where the reader already knows the vocabulary.
Also, please use AI tools to improve the quality of the sentences, as many of them are too long.
|
|
||
| The method `KafkaStreams#store(...)` finds an application instance's local state stores by name and type. Note that interactive queries are not supported for [versioned state stores](processor-api.html#streams-developer-guide-state-store-versioned) at this time. | ||
|
|
||
| Headers-aware state stores added in Kafka 4.3 ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) use the same `QueryableStoreTypes` helpers as the corresponding stores without headers. For example, `timestampedKeyValueStore()` and `timestampedWindowStore()` also match `TimestampedKeyValueStoreWithHeaders` and `TimestampedWindowStoreWithHeaders`, and `sessionStore()` matches `SessionStoreWithHeaders`. |
There was a problem hiding this comment.
We are talking about QueryableStoreTypes, but they are introduced later in the doc.
| The result returned by `KafkaStreams#store(...)` depends on the `QueryableStoreType`. The built-in `QueryableStoreTypes` facades (`keyValueStore()`, `timestampedKeyValueStore()`, `windowStore()`, `timestampedWindowStore()`, `sessionStore()`) expose read-only views that return values, or `ValueAndTimestamp` for timestamped key-value and window stores. Record headers are not included in these results. A custom `QueryableStoreType` can expose the underlying header-aware store; see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores). | ||
|
|
||
| Within the topology, processors can access headers through types such as [`ValueTimestampHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html) for key-value and window stores, and [`AggregationWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/AggregationWithHeaders.html) for session stores; see [Headers in State Stores](processor-api.html#headers-in-state-stores). | ||
|
|
There was a problem hiding this comment.
The 3 paragraphs discuss QueryableStoreTypes helpers before the doc has introduced what those helpers are. They wedge themselves between the KafkaStreams#store(...) paragraph and the architecture image at line 138. The image originally illustrated the previous paragraph; now it's separated by ~3 dense paragraphs and feels orphaned.
| * **`QueryableStoreTypes#sessionStore()`** — [`SessionStoreType`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.SessionStoreType.html). See [Querying local window stores](#querying-local-window-stores). | ||
|
|
||
| Header-aware store variants (introduced in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) are queried using the same matchers as their corresponding timestamped or session stores. | ||
|
|
There was a problem hiding this comment.
This is a near-duplicate of line 132. One of them should be removed.
|
|
||
| The result returned by `KafkaStreams#store(...)` depends on the `QueryableStoreType`. The built-in `QueryableStoreTypes` facades (`keyValueStore()`, `timestampedKeyValueStore()`, `windowStore()`, `timestampedWindowStore()`, `sessionStore()`) expose read-only views that return values, or `ValueAndTimestamp` for timestamped key-value and window stores. Record headers are not included in these results. A custom `QueryableStoreType` can expose the underlying header-aware store; see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores). | ||
|
|
||
| Within the topology, processors can access headers through types such as [`ValueTimestampHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html) for key-value and window stores, and [`AggregationWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/AggregationWithHeaders.html) for session stores; see [Headers in State Stores](processor-api.html#headers-in-state-stores). |
There was a problem hiding this comment.
This is about Processor-API-side access, not interactive queries. It's a useful cross-reference, but it interrupts the IQ flow. A short trailing sentence with a link would be better than a full paragraph.
|
|
||
| The method `KafkaStreams#store(...)` finds an application instance's local state stores by name and type. Note that interactive queries are not supported for [versioned state stores](processor-api.html#streams-developer-guide-state-store-versioned) at this time. | ||
|
|
||
| Headers-aware state stores added in Kafka 4.3 ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) use the same `QueryableStoreTypes` helpers as the corresponding stores without headers. For example, `timestampedKeyValueStore()` and `timestampedWindowStore()` also match `TimestampedKeyValueStoreWithHeaders` and `TimestampedWindowStoreWithHeaders`, and `sessionStore()` matches `SessionStoreWithHeaders`. |
There was a problem hiding this comment.
mixed terminology: "Headers-aware" vs. "stores without headers". Pick one please.
| * For DSL operators, store data is upgraded lazily in the background. | ||
| * No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write. | ||
|
|
||
| ## Headers in State Stores (Kafka 4.3+) {#headers-in-state-stores} |
There was a problem hiding this comment.
I think this subsection is too thin. Compare it to "Versioned Key-Value State Stores". Maybe add more here:
- explanation of what ValueTimestampHeaders actually contains (value + timestamp + Headers) or what AggregationWithHeaders is
- mention that only persistent (RocksDB) variants exist — there is no inMemory…
- pointer to the upgrade-guide entry that already covers compat / perf / changelog format.
Right now that material lives only in upgrade-guide.md; a one-line cross-link from here would help
|
|
||
| ## Headers in State Stores (Kafka 4.3+) {#headers-in-state-stores} | ||
|
|
||
| Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)). This is useful when header metadata must survive stateful processing (for example aggregations or joins implemented with the Processor API). |
There was a problem hiding this comment.
must survive is a bit dramatic and slightly inaccurate (it's not that headers will be lost otherwise in every case, it's that without these stores the Processor API has no way to materialize them). Suggest: "Use this when downstream processing needs access to record headers from prior input — for example, when an aggregation or join implemented with the Processor API must propagate headers to its output."
|
|
||
| Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)). This is useful when header metadata must survive stateful processing (for example aggregations or joins implemented with the Processor API). | ||
|
|
||
| Use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with record headers as [AggregationWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/AggregationWithHeaders.html). |
| * Use [persistentVersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)) when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations. | ||
| * Use [persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) or [persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively. | ||
| * Use [persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\)) when you need a persistent sessionWindowedKey-value store. | ||
| * **Headers (since 4.3, [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)):** To persist [record headers](https://kafka.apache.org/documentation/#recordheaders) in state, use [`persistentTimestampedKeyValueStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStoreWithHeaders\(java.lang.String\)) with [`timestampedKeyValueStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html), [`persistentTimestampedWindowStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStoreWithHeaders\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) with [`timestampedWindowStoreWithHeadersBuilder`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html), and [`persistentSessionStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStoreWithHeaders\(java.lang.String,java.time.Duration\)) with [`sessionStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html). See [Headers in State Stores](#headers-in-state-stores) below. |
There was a problem hiding this comment.
one giant bullet packing 6 javadoc links + 3 supplier/builder pairs into one sentence. Hard to scan.
|
|
||
| Kafka Streams now allows to purge local state directories and checkpoint files during application startup if they have not been modified for a certain period of time. This can be configured via the new `state.cleanup.dir.max.age.ms` config. More details can be found in [KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup) | ||
|
|
||
| Kafka Streams 4.3 adds **headers-aware** state stores for the Processor API ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)): you can opt in by using the new `Stores` suppliers whose names end with `WithHeaders` and the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. See the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores). Existing applications that keep using the same headerless `Stores` suppliers and builders are unaffected—storage format, changelogs, and performance stay as before. For stores that adopt the headers-aware format, KIP-1271 defines a single rolling-bounce upgrade: the changelog topic format is unchanged, legacy rows are read with empty header sets until rewritten, and RocksDB-backed stores migrate data lazily on access; downgrading in place after migration is not supported except by clearing local store data and restoring from the changelog. Storing headers increases disk and serialization cost versus headerless stores; the KIP discusses lazy header parsing and other performance considerations. `TopologyTestDriver` and Interactive Queries support the new store types; the existing `store()` facades continue to return values (or `ValueAndTimestamp`) without exposing record headers. See the [interactive queries guide](developer-guide/interactive-queries.html#querying-local-state-stores-for-an-app-instance) and KIP-1271. |
There was a problem hiding this comment.
The section is right; the format is wrong. Look at the surrounding 4.3.0 entries: each is one short paragraph (KIP-1270 at 68, metrics at 70, KIP-1259 at 72) and the one really substantial feature — the streams-scala deprecation — gets its own ### Deprecation of streams-scala module (KIP-1244) subsection at line 76. Past releases follow the same pattern: anything "big" (KIP-1071 in 4.2.0 at line 84, KIP-1244, the Rebalance Protocol in 4.1.0 at line 111) earns a ### heading. KIP-1271 introduces a whole new store family with its own story, migration story, perf things, IQ, and TTD support — that is clearly a ###-subsection-sized topic, not a single inline paragraph. As-is, line 74 is the longest paragraph on the page.
It documents the dsl.store.format setting in the Streams config guide
(default vs headers). The upgrade guide for 4.3.0 explains opting in to
headers-aware DSL stores and points to KIP-1271 and the Processor API
section. The DSL developer guide clarifies which built-in store types
are used for stateful operators when dsl.store.format is default versus
headers. The Processor API guide adds a short section on the WithHeaders
Stores helpers and related types.