Skip to content
17 changes: 17 additions & 0 deletions docs/streams/developer-guide/config-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,23 @@ Defines a default state store implementation to be used by any stateful DSL oper
<tr>
<td>

dsl.store.format
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This belong to ticket KAFKA-20336 for which we already have a PR: #21905

I think we should remove it from this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

</td>
<td>

Low
</td>
<td>

Controls whether DSL operators materialize headers-aware state stores. The value `default` keeps existing timestamped or plain store variants per operator. The value `headers` selects headers-aware stores (where supported) so that Kafka record headers are stored and propagated with state updates alongside values and timestamps. Case-insensitive.
</td>
Comment thread
nileshkumar3 marked this conversation as resolved.
Outdated
<td>

`default`
</td> </tr>
<tr>
<td>

ensure.explicit.internal.resource.naming
</td>
<td>
Expand Down
10 changes: 6 additions & 4 deletions docs/streams/developer-guide/dsl-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,13 @@ Kafka Streams will manage the topic for `repartition()`. Generated topic is trea

Stateful transformations depend on state for processing inputs and producing outputs and require a [state store](../architecture.html#streams_architecture_state) associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary.

**Note:** Following store types are used regardless of the possibly specified type (via the parameter `materialized`):
**Note:** Unless you provide a custom store via `Materialized`, the DSL picks built-in store implementations from your [`dsl.store.format`](config-streams.html) setting and the operator semantics below:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could config-streams.html benefit from anchors?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NA as we removed this from this pr.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

This PR should only update docs related to KIP-1271, which does not contain any DSL changes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


* non-windowed aggregations and non-windowed KTables use [TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)s or [VersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html)s, depending on whether the parameter `materialized` is versioned
* time-windowed aggregations and KStream-KStream joins use [TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)s
* session windowed aggregations use [SessionStore](/{version}/javadoc/org/apache/kafka/streams/state/SessionStore.html)s (there is no timestamped session store as of now)
* With **`dsl.store.format=default`** (the default):
* non-windowed aggregations and non-windowed KTables use [TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)s or [VersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html)s, depending on whether the parameter `materialized` is versioned
* time-windowed aggregations and KStream-KStream joins use [TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)s
* session windowed aggregations use [SessionStore](/{version}/javadoc/org/apache/kafka/streams/state/SessionStore.html)s (there is no timestamped session store as of now)
* With **`dsl.store.format=headers`**, operators that support the feature use headers-aware variants—for example [TimestampedKeyValueStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.html), [TimestampedWindowStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStoreWithHeaders.html), and [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html)—so input record headers are stored with each update. Versioned KTables continue to use `VersionedKeyValueStore`. See [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores).



Expand Down
6 changes: 6 additions & 0 deletions docs/streams/developer-guide/processor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,13 @@ You can query timestamped state stores both with and without a timestamp.
* For DSL operators, store data is upgraded lazily in the background.
* No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write.

## Headers in state stores (KIP-1271)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Kafka version as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the version

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## Headers in state stores (KIP-1271)
## Headers in State Stores


Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps. This is useful when header metadata must survive stateful processing (aggregations, joins, and similar operations).

In the **Processor API**, use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `*BuilderWithHeaders` builders—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. Key-value reads use [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html); session aggregation reads use [AggregationWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/AggregationWithHeaders.html).

Comment thread
nileshkumar3 marked this conversation as resolved.
Outdated
In the **DSL**, enable the corresponding built-in stores with the `dsl.store.format` application setting; see the [configuration reference](config-streams.html) and [DSL stateful operators](dsl-api.html#stateful-transformations). Details and migration notes are in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could config-streams.html benefit from anchors?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NA as removed this part from the PR.


## Versioned Key-Value State Stores

Expand Down
6 changes: 6 additions & 0 deletions docs/streams/upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, a

Kafka Streams now allows to purge local state directories and checkpoint files during application startup if they have not been modified for a certain period of time. This can be configured via the new `state.cleanup.dir.max.age.ms` config. More details can be found in [KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup)

### Record headers in state stores (KIP-1271)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a headline for this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.


By default, DSL state stores retain keys, values, and timestamps (where applicable) but not Kafka record headers. You can opt in to **headers-aware** stores that also persist the headers from input records by setting `dsl.store.format` to `headers` (case-insensitive). The default `default` preserves the pre-4.3 store layout for each operator. Changing this setting affects changelog serialization and RocksDB layout for the affected stores; perform a rolling upgrade and allow tasks to restore from changelogs. Interactive Queries, `TopologyTestDriver`, and built-in upgrade paths support the new formats—see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores) and the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores-kip-1271).

Processor API applications can use the new `Stores` helpers and `*BuilderWithHeaders` factory methods (for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreBuilderWithHeaders`) without setting `dsl.store.format`.

### Deprecation of streams-scala module (KIP-1244)

The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is deprecated in 4.3.0 and will be removed in 5.0.
Expand Down