Skip to content

feat: support Spark Structured Streaming writes#564

Draft
LuciferYang wants to merge 1 commit into
lance-format:mainfrom
LuciferYang:feat/structured-streaming-write
Draft

feat: support Spark Structured Streaming writes#564
LuciferYang wants to merge 1 commit into
lance-format:mainfrom
LuciferYang:feat/structured-streaming-write

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang commented May 27, 2026

Closes #246.

Depends on lance-format/lance#6963 (JNI binding fix); draft until that lands.

Summary

Adds a Structured Streaming sink for Lance. Each non-empty micro-batch issues one Lance transaction stamped with lance.streaming.queryId and lance.streaming.epochId in its transaction properties; replay dedupe scans recent transaction history via DatasetDelta.listTransactions for a matching pair. Empty epochs issue no transaction.

SparkWriteBuilder now implements SupportsStreamingUpdateAsAppend and LanceDataset advertises TableCapability.STREAMING_WRITE. Append / Complete / Update flow through the existing V2 plumbing. CTAS and staged commits are rejected at toStreaming() with a clear error.

streamingQueryId is a required option — it's the idempotency key for the dedupe scan and must be globally unique across concurrent queries on the same table. lance.streaming.dedupe.lookback.versions (default 100, max 10 000) caps the scan depth.

Why one transaction per epoch

PR #399 used a two-transaction Append + UpdateConfig design, which doubled manifest growth and per-epoch latency at scale. Stamping the identity inside the Append moves the dedupe signal into transaction history and keeps writes to one manifest update per epoch. The upstream JNI fix (#6963) is what makes the DatasetDelta scan actually buildable from Java.

Why the sink doesn't pin the dataset version

LanceBatchWrite pins dataset.version() in its constructor because it commits exactly once. Streaming reuses one sink across many epochs, so a pinned version would go stale immediately — the dedupe scan would point at the wrong range and the transaction's readVersion would lag. Every commit opens at the current latest instead. testMultipleEpochsOnSameSinkAdvanceVersionMonotonically regresses this path.

Tests

BaseStreamingWriteTest covers append happy-path, missing streamingQueryId, replay dedupe, multi-epoch on one sink, and empty-epoch no-op. SparkWriteTest adds three assertions on toStreaming's contract: returns LanceStreamingWrite when configured, throws IllegalArgumentException without streamingQueryId, throws UnsupportedOperationException for staged commits.

Per-module clean runs against lance-core 7.1.0-beta.4:

  • 3.4: 680/680 (streaming code compiles into base but 3.4 isn't a supported streaming target)
  • 3.5: 755/755
  • 4.0: 771/771
  • 4.1: 773/773

make lint clean.

Out of scope

  • Streaming reads (MicroBatchStream) — follow-up.
  • Row-level UPDATE / DELETE on the streaming path.
  • Managed-versioning streaming test (code mirrors batch which is covered).
  • In-memory dedupe cache to skip the per-commit DatasetDelta scan.

Docs at docs/src/streaming.md.

Closes lance-format#246.

Adds a Spark Structured Streaming sink for Lance. Each non-empty
micro-batch produces a single Lance transaction stamped with
(streamingQueryId, epochId) in its transaction properties; replay
dedupe scans recent transaction history via
DatasetDelta.listTransactions for an existing pair and skips the
commit if it finds one. Empty epochs issue no transaction.

Append, Complete, and Update output modes are routed through
SparkWriteBuilder (now also implementing SupportsStreamingUpdateAsAppend).
Complete maps to a Lance Overwrite per epoch; Update is append-only
per Spark's marker contract. CTAS / staged-commit flows are rejected
with an actionable error since per-epoch commits are incompatible with
the single-shot staged commit cadence.

User surface:

- `streamingQueryId` (required) — globally unique idempotency key.
  Two queries sharing the same id would dedupe each other's epochs.
- `lance.streaming.dedupe.lookback.versions` (default 100, max 10000)
  — how far back the replay scan looks before assuming the epoch is
  new. Raise it on high-churn tables; lower it to bound restart-time
  scans.

Transaction-property keys `lance.streaming.queryId` and
`lance.streaming.epochId` are stamped on every commit and are part of
the stability contract — external tooling can read them straight from
Lance transaction history.

Notes / non-goals:

- Requires lance-core with the DatasetDelta JNI binding rename
  (lance-format/lance#6963); pom bumps lance.version to 7.1.0-beta.4.
- Streaming reads (MicroBatchStream) are not implemented yet —
  tracked separately.
- Row-level UPDATE/DELETE via position-delta is not exposed on the
  streaming path.
- The target Lance table must exist before the query starts; the
  sink does not auto-create.
- The sink does NOT pin the dataset version on the writer — every
  commit opens at the current latest so the dedupe scan window and
  the transaction's readVersion both reflect on-disk reality. A
  multi-epoch regression test
  (testMultipleEpochsOnSameSinkAdvanceVersionMonotonically) uses
  maxFilesPerTrigger=1 to share one sink across three epochs and
  asserts versions advance monotonically.

Test coverage:

- BaseStreamingWriteTest: 5 cases covering append happy path,
  missing streamingQueryId failure, replay dedupe, multi-epoch on
  one sink, empty-epoch no-op.
- SparkWriteTest: toStreaming returns LanceStreamingWrite when
  streamingQueryId is provided, throws IAE without it, rejects
  staged commits.

User-facing doc at docs/src/streaming.md covers semantics, output
modes, exactly-once contract, bounded at-least-once fallback, and
OPTIMIZE cadence guidance.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the enhancement New feature or request label May 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Spark structured streaming

1 participant