Skip to content
20 changes: 20 additions & 0 deletions docs/streams/developer-guide/config-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,26 @@ Defines a default state store implementation to be used by any stateful DSL oper
<tr>
<td>

dsl.store.format
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This belong to ticket KAFKA-20336 for which we already have a PR: #21905

I think we should remove it from this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

</td>
<td>

Low
</td>
<td>

Controls whether DSL operators materialize headers-aware state stores. Case-insensitive. Accepted values:

- **`default`** — Uses existing timestamped or plain store variants per operator and does not persist record headers in state. Storage and changelog footprint stay in line with releases before [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores).
- **`headers`** — Selects headers-aware stores (where supported) so that record headers are stored and propagated with state updates alongside values and timestamps. Changelog payloads and local RocksDB state are larger than under `default`, and workloads may see extra I/O or CPU when headers are large or frequent.
</td>
Comment thread
nileshkumar3 marked this conversation as resolved.
Outdated
<td>

`default`
</td> </tr>
<tr>
<td>

ensure.explicit.internal.resource.naming
</td>
<td>
Expand Down
10 changes: 6 additions & 4 deletions docs/streams/developer-guide/dsl-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,13 @@ Kafka Streams will manage the topic for `repartition()`. Generated topic is trea

Stateful transformations depend on state for processing inputs and producing outputs and require a [state store](../architecture.html#streams_architecture_state) associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per window. In join operations, a windowing state store is used to collect all of the records received so far within the defined window boundary.

**Note:** Following store types are used regardless of the possibly specified type (via the parameter `materialized`):
**Note:** Unless you provide a custom store via `Materialized`, the DSL picks built-in store implementations from your [`dsl.store.format`](config-streams.html) setting and the operator semantics below:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could config-streams.html benefit from anchors?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NA as we removed this from this pr.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

This PR should only update docs related to KIP-1271, which does not contain any DSL changes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


* non-windowed aggregations and non-windowed KTables use [TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)s or [VersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html)s, depending on whether the parameter `materialized` is versioned
* time-windowed aggregations and KStream-KStream joins use [TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)s
* session windowed aggregations use [SessionStore](/{version}/javadoc/org/apache/kafka/streams/state/SessionStore.html)s (there is no timestamped session store as of now)
* With **`dsl.store.format=default`** (the default):
* non-windowed aggregations and non-windowed KTables use [TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)s or [VersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html)s, depending on whether the parameter `materialized` is versioned
* time-windowed aggregations and KStream-KStream joins use [TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)s
* session windowed aggregations use [SessionStore](/{version}/javadoc/org/apache/kafka/streams/state/SessionStore.html)s (there is no timestamped session store as of now)
* With **`dsl.store.format=headers`**, operators that support the feature use headers-aware variants—for example [TimestampedKeyValueStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStoreWithHeaders.html), [TimestampedWindowStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStoreWithHeaders.html), and [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html)—so input record headers are stored with each update. Versioned KTables continue to use `VersionedKeyValueStore` because a headers-aware versioned store variant is not yet available. Compared with `default`, expect larger changelogs and local state when records carry headers. See [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores) and the [upgrade guide](../upgrade-guide.html#record-headers-in-state-stores-kip-1271) for format changes and rollout notes.



Expand Down
8 changes: 8 additions & 0 deletions docs/streams/developer-guide/processor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,15 @@ You can query timestamped state stores both with and without a timestamp.
* For DSL operators, store data is upgraded lazily in the background.
* No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write.

## Headers in state stores (KIP-1271)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Kafka version as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the version

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## Headers in state stores (KIP-1271)
## Headers in State Stores


Since Kafka 4.3, you can materialize state that includes Kafka [record headers](https://kafka.apache.org/documentation/#recordheaders) in addition to keys, values, and timestamps. This is useful when header metadata must survive stateful processing (aggregations, joins, and similar operations).

**Footprint and performance:** If you keep `dsl.store.format` at `default` and do not use headers-aware `Stores` suppliers, record headers are not persisted in state—behavior and storage remain comparable to pre-KIP-1271 DSL materializations. Headers-aware stores embed header bytes in each stored update, which grows changelog segments and local RocksDB usage and can increase write and restore I/O; impact scales with header size and update rate. Enable headers in state only when downstream logic needs that metadata.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not belong into the PAPI section of the docs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


In the **Processor API**, use [`Stores`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html) factory methods whose names end with `WithHeaders`, together with the matching `*WithHeadersBuilder` factory methods—for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreWithHeadersBuilder`, `persistentTimestampedWindowStoreWithHeaders` with `timestampedWindowStoreWithHeadersBuilder`, and `persistentSessionStoreWithHeaders` with `sessionStoreWithHeadersBuilder`. Key-value and window reads return [ValueTimestampHeaders](/{version}/javadoc/org/apache/kafka/streams/state/ValueTimestampHeaders.html). For sessions, [SessionStoreWithHeaders](/{version}/javadoc/org/apache/kafka/streams/state/SessionStoreWithHeaders.html) stores each aggregation together with Kafka record headers (see that interface for the session value type).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Processor API

Redundant. We are in the "processor API" section of the docs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.


Comment thread
nileshkumar3 marked this conversation as resolved.
Outdated
In the **DSL**, enable the corresponding built-in stores with the `dsl.store.format` application setting; see the [configuration reference](config-streams.html) and [DSL stateful operators](dsl-api.html#stateful-transformations). Details and migration notes are in [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could config-streams.html benefit from anchors?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NA as removed this part from the PR.


## Versioned Key-Value State Stores

Expand Down
8 changes: 8 additions & 0 deletions docs/streams/upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, a

Kafka Streams now allows to purge local state directories and checkpoint files during application startup if they have not been modified for a certain period of time. This can be configured via the new `state.cleanup.dir.max.age.ms` config. More details can be found in [KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup)

### Record headers in state stores (KIP-1271)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a headline for this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.


By default, DSL state stores retain keys, values, and timestamps (where applicable) but not record headers. You can opt in to **headers-aware** stores that also persist the headers from input records by setting `dsl.store.format` to `headers` (case-insensitive). The default `default` preserves the pre-4.3 store layout for each operator. Changing `dsl.store.format` alters changelog serialization and RocksDB layout for affected stores: **existing store data is rebuilt from changelog topics using the new format**, and **local state directories for those tasks may be removed during a rolling restart** so that restoration runs against the new layout—plan for restore duration and, when moving to `headers`, larger changelogs if records carry significant headers. Perform a rolling upgrade and allow tasks to finish restoring before relying on interactive queries or sink output. Interactive Queries (IQv1), `TopologyTestDriver`, and built-in upgrade paths support the new formats; for IQv2, queries run against headers-aware stores, but query results do not include record headers—see [KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores) and the [Processor API state store documentation](developer-guide/processor-api.html#headers-in-state-stores-kip-1271).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing dsl.store.format alters changelog serialization and RocksDB layout for affected stores: existing store data is rebuilt from changelog topics using the new format

This is not correct. The upgrade path is designed for in-place migration with zero downtime. Local state directories are NOT automatically removed during rolling restarts when changing the format. Instead:

  • RocksDB stores use a dual-column family approach for lazy migration
  • Legacy records are served with empty header sets (headers count = 0)
  • New writes use the new headers-aware format
  • On read, legacy data is lazily converted and migrated to the new column family

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is KIP-1285 again, not 1271.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, DSL state stores retain keys, values, and timestamps (where applicable) but not record headers.

DSL is not mentioned in kip-1271 at all. We have another ticket for kip-1285

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.


**Backward compatibility:** The feature is fully backward compatible. Existing applications that do not set `dsl.store.format` (or leave it at `default`) continue to work exactly as before—no store format changes, no changelog layout changes, and no action is required during upgrade. Switching from `default` to `headers` (or back) requires a store rebuild from changelogs; plan for the associated restore time. The `headers` format is supported only by the built-in `RocksDB` store suppliers; built-in `InMemory` suppliers ignore `dsl.store.format` and use non-headers-aware in-memory stores (the same effective behavior as `default` for that supplier). Custom `DslStoreSuppliers` and `*BytesStoreSupplier` implementations are unaffected unless they opt in explicitly.

Processor API applications can use the new `Stores` helpers and `*WithHeadersBuilder` factory methods (for example `persistentTimestampedKeyValueStoreWithHeaders` with `timestampedKeyValueStoreWithHeadersBuilder`) without setting `dsl.store.format`.

### Deprecation of streams-scala module (KIP-1244)

The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is deprecated in 4.3.0 and will be removed in 5.0.
Expand Down
Loading