-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-20335: Document KIP-1271 headers-aware state stores #21840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 13 commits
d46c1dd
1d47647
0567ed9
d7794da
d970ca0
1849113
863cdfc
2b3a2c8
352be39
0996f65
09d83d6
fe55476
59bf6d8
3cc4fb6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -123,12 +123,14 @@ Supported | |
| Not supported (you must configure) | ||
| </td> </tr> </table> | ||
|
|
||
| # Querying local state stores for an app instance | ||
| # Querying local state stores for an app instance {#querying-local-state-stores-for-an-app-instance} | ||
|
|
||
| A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance is only a subset of the [application's entire state](../architecture.html#streams-architecture-state). Querying the local stores on an instance will only return data locally available on that particular instance. | ||
|
|
||
| The method `KafkaStreams#store(...)` finds an application instance's local state stores by name and type. Note that interactive queries are not supported for [versioned state stores](processor-api.html#streams-developer-guide-state-store-versioned) at this time. | ||
|
|
||
| Headers-aware state stores added in Kafka 4.3 ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) work with the same **`QueryableStoreTypes`** helpers as headerless stores—for example `timestampedKeyValueStore()` and `timestampedWindowStore()` also match the corresponding `*WithHeaders` implementations, and `sessionStore()` applies to `SessionStoreWithHeaders`. However, **`KafkaStreams#store(...)`** still exposes the usual read-only shapes: plain values or **`ValueAndTimestamp`** for timestamped key-value and window stores; **record headers are not included** in those IQv1 results (facades strip them). Session queries return only the aggregation value, not header metadata. **[`ValueTimestampHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html)** is what processors see when reading the store inside the topology. IQv2 may still omit headers in some query paths—see the KIP, release notes, and [Processor API: Headers in State Stores](processor-api.html#headers-in-state-stores). | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The 3 paragraphs discuss QueryableStoreTypes helpers before the doc has introduced what those helpers are. They wedge themselves between the KafkaStreams#store(...) paragraph and the architecture image at line 138. The image originally illustrated the previous paragraph; now it's separated by ~3 dense paragraphs and feels orphaned. |
||
|  | ||
|
|
||
| Every application instance can directly query any of its local state stores. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -178,6 +178,7 @@ Yes (enabled by default) | |
| * Use [persistentVersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)) when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations. | ||
| * Use [persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) or [persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively. | ||
| * Use [persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\)) when you need a persistent sessionWindowedKey-value store. | ||
| * **Headers (since 4.3, [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)):** To persist [record headers](https://kafka.apache.org/documentation/#recordheaders) in state, use [`persistentTimestampedKeyValueStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStoreWithHeaders\(java.lang.String\)) with [`timestampedKeyValueStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html), [`persistentTimestampedWindowStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStoreWithHeaders\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) with [`timestampedWindowStoreWithHeadersBuilder`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html), and [`persistentSessionStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStoreWithHeaders\(java.lang.String,java.time.Duration\)) with [`sessionStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html). See [Headers in State Stores](#headers-in-state-stores) below. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one giant bullet packing 6 javadoc links + 3 supplier/builder pairs into one sentence. Hard to scan. |
||
|
|
||
|
|
||
|
|
||
|
|
@@ -299,7 +300,11 @@ You can query timestamped state stores both with and without a timestamp. | |
| * For DSL operators, store data is upgraded lazily in the background. | ||
| * No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write. | ||
|
|
||
| ## Headers in State Stores (Kafka 4.3+) {#headers-in-state-stores} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this subsection is too thin. Compare it to "Versioned Key-Value State Stores". Maybe add more here: |
||
|
|
||
| Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)). This is useful when header metadata must survive stateful processing (for example aggregations or joins implemented with the Processor API). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| Use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. (The public method names are not parallel: key-value and session use `…BuilderWithHeaders`, while window uses `timestampedWindowStoreWithHeadersBuilder`—see javadoc.) Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with record headers (see that interface for the session value type). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar question as above about the dash
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we had a bug in the code at some point about inconsistent naming, but the KIP did specify consistent names, and it was fixed in the code already --if it was not fixed yet, we should fix it.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why not link to it, similar to what we do for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mash works fine. removed the inconsistency part. added the link.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docs updated to match the current shipping API. The KIP used a uniform
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we need to fix the naming, we can do as a separate follow up PR. |
||
|
|
||
| ## Versioned Key-Value State Stores | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,6 +71,8 @@ The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, a | |
|
|
||
| Kafka Streams now allows to purge local state directories and checkpoint files during application startup if they have not been modified for a certain period of time. This can be configured via the new `state.cleanup.dir.max.age.ms` config. More details can be found in [KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup) | ||
|
|
||
| Kafka Streams 4.3 adds **headers-aware** state stores for the Processor API ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)): you can opt in by using the new `Stores` suppliers whose names end with `WithHeaders` and the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders` (see javadoc; naming is not uniform across store types). See the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores). Existing applications that keep using the same headerless `Stores` suppliers and builders are unaffected—storage format, changelogs, and performance stay as before. For stores that adopt the headers-aware format, KIP-1271 defines a single rolling-bounce upgrade: the changelog topic format is unchanged, legacy rows are read with empty header sets until rewritten, and RocksDB-backed stores migrate data lazily on access; downgrading in place after migration is not supported except by clearing local store data and restoring from the changelog. Storing headers increases disk and serialization cost versus headerless stores; the KIP discusses lazy header parsing and other performance considerations. `TopologyTestDriver` and Interactive Queries (IQv1) support the new store types, but the usual `store()` facades still return values (or `ValueAndTimestamp`) **without** record headers; IQv2 may not expose headers in all query paths either—refer to the KIP, the [interactive queries guide](developer-guide/interactive-queries.html#querying-local-state-stores-for-an-app-instance), and release notes. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Same. If there is a inconsistency, we should fix it.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
m-dash ok?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Similar comment as elsewhere. Doesn't sound correct
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed the inconsistency part. m-dash renders fine. removed IQv2 part from the doc. |
||
|
|
||
| ### Deprecation of streams-scala module (KIP-1244) | ||
|
|
||
| The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is deprecated in 4.3.0 and will be removed in 5.0. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if the dash (which I believe is supposed to be an m-dash will render correctly? Can we confirm?
Well, depends what queryable-store-type you pick. You can also get the headers if you want. -- In combination with the later
**[ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html)** is what processors see when reading the store inside the topology.it seems you say that you cannot get headers at all with IQv1, but you can.->
for (timestamped) key-valueI would remove this -- implementation detail.
IQv2 is not implemented with AK 4.3 yet -- and we will need follow up KIPs to add support.
Maybe the whole paragraph can we simplified a little bit? It add good context that "richer" stores (ts, header) can we queried with "lower" queryable store types, but that's not header specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verified m-das works. Updated this part for IQV1 and removed any IQV2 reference here.