Skip to content
Open
31 changes: 22 additions & 9 deletions docs/streams/developer-guide/interactive-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,33 @@ Supported
Not supported (you must configure)
</td> </tr> </table>

# Querying local state stores for an app instance
# Querying local state stores for an app instance {#querying-local-state-stores-for-an-app-instance}

A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance is only a subset of the [application's entire state](../architecture.html#streams-architecture-state). Querying the local stores on an instance will only return data locally available on that particular instance.

The method `KafkaStreams#store(...)` finds an application instance's local state stores by name and type. Note that interactive queries are not supported for [versioned state stores](processor-api.html#streams-developer-guide-state-store-versioned) at this time.

Headers-aware state stores added in Kafka 4.3 ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) use the same `QueryableStoreTypes` helpers as the corresponding stores without headers. For example, `timestampedKeyValueStore()` and `timestampedWindowStore()` also match `TimestampedKeyValueStoreWithHeaders` and `TimestampedWindowStoreWithHeaders`, and `sessionStore()` matches `SessionStoreWithHeaders`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are talking about QueryableStoreTypes, but they are introduced later in the doc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mixed terminology: "Headers-aware" vs. "stores without headers". Pick one please.


The result returned by `KafkaStreams#store(...)` depends on the `QueryableStoreType`. The built-in `QueryableStoreTypes` facades (`keyValueStore()`, `timestampedKeyValueStore()`, `windowStore()`, `timestampedWindowStore()`, `sessionStore()`) expose read-only views that return values, or `ValueAndTimestamp` for timestamped key-value and window stores. Record headers are not included in these results. A custom `QueryableStoreType` can expose the underlying header-aware store; see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling the static factories "facades" is technically off — they're factory methods that return QueryableStoreType matchers. Suggest "factory methods" or just "helpers". Also the sentence is parenthetical-heavy; split it.

Within the topology, processors can access headers through types such as [`ValueTimestampHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html) for key-value and window stores, and [`AggregationWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/AggregationWithHeaders.html) for session stores; see [Headers in State Stores](processor-api.html#headers-in-state-stores).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about Processor-API-side access, not interactive queries. It's a useful cross-reference, but it interrupts the IQ flow. A short trailing sentence with a link would be better than a full paragraph.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Within the topology, processors can access headers through types such as … — the processor doesn't access headers directly; the store returns objects of those types when accessed from the processor. Suggestion: "Inside processor code, header-aware stores expose record headers via …".


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 3 paragraphs discuss QueryableStoreTypes helpers before the doc has introduced what those helpers are. They wedge themselves between the KafkaStreams#store(...) paragraph and the architecture image at line 138. The image originally illustrated the previous paragraph; now it's separated by ~3 dense paragraphs and feels orphaned.

![](/43/images/streams-interactive-queries-api-01.png)

Every application instance can directly query any of its local state stores.

The _name_ of a state store is defined when you create the store. You can create the store explicitly by using the Processor API or implicitly by using stateful operations in the DSL.

The _type_ of a state store is defined by `QueryableStoreType`. You can access the built-in types via the class `QueryableStoreTypes`. Kafka Streams currently has two built-in types:

* A key-value store `QueryableStoreTypes#keyValueStore()`, see Querying local key-value stores.
* A window store `QueryableStoreTypes#windowStore()`, see Querying local window stores.
The _type_ of a state store is defined by `QueryableStoreType`. You can access the built-in store types via [`QueryableStoreTypes`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.html). The supported types are:

* **`QueryableStoreTypes#keyValueStore()`** — [`KeyValueStoreType`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.KeyValueStoreType.html). See [Querying local key-value stores](#querying-local-key-value-stores).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern method() — TypeLink. See sectionLink. reads as three loosely-related items. The relationship between the method and the type isn't explained (return type? matcher class?). The original two-bullet form was more readable; consider keeping the prose tight: keyValueStore() — see [Querying local key-value stores] and only link to the type if it adds value.

* **`QueryableStoreTypes#timestampedKeyValueStore()`** — [`TimestampedKeyValueStoreType`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.TimestampedKeyValueStoreType.html). See [Querying local key-value stores](#querying-local-key-value-stores).
* **`QueryableStoreTypes#windowStore()`** — [`WindowStoreType`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.WindowStoreType.html). See [Querying local window stores](#querying-local-window-stores).
* **`QueryableStoreTypes#timestampedWindowStore()`** — [`TimestampedWindowStoreType`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.TimestampedWindowStoreType.html). See [Querying local window stores](#querying-local-window-stores).
* **`QueryableStoreTypes#sessionStore()`** — [`SessionStoreType`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.SessionStoreType.html). See [Querying local window stores](#querying-local-window-stores).

Header-aware store variants (introduced in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) are queried using the same matchers as their corresponding timestamped or session stores.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Header-aware store variants (introduced in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) are queried using the same matchers as their corresponding timestamped or session stores.
Headers-aware store variants (introduced in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)) are queried using the same matchers as their corresponding timestamped or session stores.


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a near-duplicate of line 132. One of them should be removed.

You can also implement your own QueryableStoreType as described in section Querying local custom state stores.

Expand All @@ -150,8 +159,10 @@ Kafka Streams materializes one state store per stream partition. This means your

## Querying local key-value stores

To query a local key-value store, you must first create a topology with a key-value store. This example creates a key-value store named "CountsKeyValueStore". This store will hold the latest count for any word that is found on the topic "word-count-input".

To query key-value state, you first build a topology that includes a state store. This example uses the DSL `count()` operator on a grouped stream, which creates a timestamped key-value store named `CountsKeyValueStore`. That store holds the latest count for each word from the topic `word-count-input`.

Note: The snippets below still use `QueryableStoreTypes.keyValueStore()` and `ReadOnlyKeyValueStore<String, Long>`, which return values only (the counts). The state store keeps values together with their timestamps; use `QueryableStoreTypes.timestampedKeyValueStore()` and `ReadOnlyKeyValueStore<String, ValueAndTimestamp<Long>>` if you need access to timestamps via interactive queries.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snippets below still use..? why still? Did we introduce it back then in the doc? this is the first one in the section



Properties props = ...;
StreamsBuilder builder = ...;
Expand Down Expand Up @@ -212,8 +223,10 @@ You can also materialize the results of stateless operators by using the overloa

A window store will potentially have many results for any given key because the key can be present in multiple windows. However, there is only one result per window for a given key.

To query a local window store, you must first create a topology with a window store. This example creates a window store named "CountsWindowStore" that contains the counts for words in 1-minute windows.

To query a windowed store, you first build a topology with a windowed aggregation (for example, using `windowedBy` followed by `count()`). This example uses `count()` to create a timestamped window store named `CountsWindowStore` with 1-minute windows for per-word counts.

Note: The snippets below use `QueryableStoreTypes.windowStore()` and `ReadOnlyWindowStore<String, Long>`, which return values only per window. The state store keeps values together with their timestamps; use `QueryableStoreTypes.timestampedWindowStore()` and `ReadOnlyWindowStore<String, ValueAndTimestamp<Long>>` if you need access to timestamps via interactive queries.


StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;
Expand Down
7 changes: 6 additions & 1 deletion docs/streams/developer-guide/processor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ Yes (enabled by default)
* Stores its data on local disk.
* Storage capacity: managed local state can be larger than the memory (heap space) of an application instance, but must fit into the available local disk space.
* RocksDB settings can be fine-tuned, see [RocksDB configuration](config-streams.html#streams-developer-guide-rocksdb-config).
* Available [store variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)): timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store.
* Available [store variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html): timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store. Header-aware variants (since 4.3) are also described below.
* Use [persistentTimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\)) when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries.
* Use [persistentVersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)) when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations.
* Use [persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) or [persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively.
* Use [persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\)) when you need a persistent sessionWindowedKey-value store.
* **Headers (since 4.3, [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)):** To persist [record headers](https://kafka.apache.org/documentation/#recordheaders) in state, use [`persistentTimestampedKeyValueStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStoreWithHeaders\(java.lang.String\)) with [`timestampedKeyValueStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html), [`persistentTimestampedWindowStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStoreWithHeaders\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) with [`timestampedWindowStoreWithHeadersBuilder`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html), and [`persistentSessionStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStoreWithHeaders\(java.lang.String,java.time.Duration\)) with [`sessionStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html). See [Headers in State Stores](#headers-in-state-stores) below.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one giant bullet packing 6 javadoc links + 3 supplier/builder pairs into one sentence. Hard to scan.




Expand Down Expand Up @@ -299,7 +300,11 @@ You can query timestamped state stores both with and without a timestamp.
* For DSL operators, store data is upgraded lazily in the background.
* No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write.

## Headers in State Stores (Kafka 4.3+) {#headers-in-state-stores}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this subsection is too thin. Compare it to "Versioned Key-Value State Stores". Maybe add more here:
- explanation of what ValueTimestampHeaders actually contains (value + timestamp + Headers) or what AggregationWithHeaders is
- mention that only persistent (RocksDB) variants exist — there is no inMemory…
- pointer to the upgrade-guide entry that already covers compat / perf / changelog format.
Right now that material lives only in upgrade-guide.md; a one-line cross-link from here would help


Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)). This is useful when header metadata must survive stateful processing (for example aggregations or joins implemented with the Processor API).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must survive is a bit dramatic and slightly inaccurate (it's not that headers will be lost otherwise in every case, it's that without these stores the Processor API has no way to materialize them). Suggest: "Use this when downstream processing needs access to record headers from prior input — for example, when an aggregation or join implemented with the Processor API must propagate headers to its output."


Use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with record headers as [AggregationWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/AggregationWithHeaders.html).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too long sentence!


## Versioned Key-Value State Stores

Expand Down
2 changes: 2 additions & 0 deletions docs/streams/upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, a

Kafka Streams now allows to purge local state directories and checkpoint files during application startup if they have not been modified for a certain period of time. This can be configured via the new `state.cleanup.dir.max.age.ms` config. More details can be found in [KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup)

Kafka Streams 4.3 adds **headers-aware** state stores for the Processor API ([KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)): you can opt in by using the new `Stores` suppliers whose names end with `WithHeaders` and the matching `StoreBuilder` factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. See the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores). Existing applications that keep using the same headerless `Stores` suppliers and builders are unaffected—storage format, changelogs, and performance stay as before. For stores that adopt the headers-aware format, KIP-1271 defines a single rolling-bounce upgrade: the changelog topic format is unchanged, legacy rows are read with empty header sets until rewritten, and RocksDB-backed stores migrate data lazily on access; downgrading in place after migration is not supported except by clearing local store data and restoring from the changelog. Storing headers increases disk and serialization cost versus headerless stores; the KIP discusses lazy header parsing and other performance considerations. `TopologyTestDriver` and Interactive Queries support the new store types; the existing `store()` facades continue to return values (or `ValueAndTimestamp`) without exposing record headers. See the [interactive queries guide](developer-guide/interactive-queries.html#querying-local-state-stores-for-an-app-instance) and KIP-1271.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The section is right; the format is wrong. Look at the surrounding 4.3.0 entries: each is one short paragraph (KIP-1270 at 68, metrics at 70, KIP-1259 at 72) and the one really substantial feature — the streams-scala deprecation — gets its own ### Deprecation of streams-scala module (KIP-1244) subsection at line 76. Past releases follow the same pattern: anything "big" (KIP-1071 in 4.2.0 at line 84, KIP-1244, the Rebalance Protocol in 4.1.0 at line 111) earns a ### heading. KIP-1271 introduces a whole new store family with its own story, migration story, perf things, IQ, and TTD support — that is clearly a ###-subsection-sized topic, not a single inline paragraph. As-is, line 74 is the longest paragraph on the page.


### Deprecation of streams-scala module (KIP-1244)

The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is deprecated in 4.3.0 and will be removed in 5.0.
Expand Down
Loading