Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions docs/src/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Spark Structured Streaming

The Lance Spark connector supports writing data to Lance tables from a Spark Structured Streaming
query. Each micro-batch lands as one Lance append (or overwrite, in Complete output mode), so the
table is queryable between batches and time-travel reads see one snapshot per epoch.

> Streaming support requires Spark **3.5+** (`StreamingWrite.useCommitCoordinator` was added in
> 3.5). Spark 3.4 keeps batch-only.

## Quick start

```python
(spark.readStream
.schema(schema)
.parquet("/path/to/source")
.writeStream
.format("lance")
.option("streamingQueryId", "my-query-v1") # required, see below
.option("checkpointLocation", "/path/to/checkpoint")
.trigger(availableNow=True)
.toTable("lance_ns.default.events"))
```

The target table must already exist; the streaming sink does not auto-create. Create it first with
plain SQL DDL:

```sql
CREATE TABLE lance_ns.default.events (id BIGINT, name STRING);
```

## Required option: `streamingQueryId`

`streamingQueryId` is the **idempotency key** for the query. It must be:

- **Globally unique** across all concurrent streaming queries that write to the same Lance
table. Two queries sharing a `streamingQueryId` will dedupe each other's epochs and produce
incorrect results.
- **Stable across restarts** — picking a different value on restart loses replay protection for
the prior epoch.

The connector fails fast at query start if the option is absent.

## Output modes

| Spark mode | Lance behavior |
|---|---|
| `append` (default) | Each epoch issues a Lance `Append` operation. |
| `complete` | Each epoch issues a Lance `Overwrite` (replacing all existing rows). Requires the upstream query to be a streaming aggregation. |
| `update` | Update-mode rows are routed through the append path — **delta rows are appended, not merged**. This matches the `SupportsStreamingUpdateAsAppend` contract; native MERGE-style upsert is not implemented. |

## Exactly-once semantics

Each non-empty micro-batch performs **one** Lance transaction stamped with `lance.streaming.queryId`
and `lance.streaming.epochId` in the transaction properties. On replay, the connector scans recent
transaction history (`DatasetDelta.listTransactions`) for an existing `(queryId, epochId)` pair —
finding one means the prior attempt already committed, so the current call is a no-op.

**Empty epochs** are no-ops: no Lance transactions are issued. Spark's checkpoint advances
independently, and replays of empty epochs find no prior commit, see no fragments, and skip again.

### Bounded at-least-once fallback

A duplicate can only slip through if **more than `lance.streaming.dedupe.lookback.versions`
unrelated commits land between a crash and the restart**, moving the prior commit out of the
dedupe scan window. The default lookback is 100 versions; users with very high commit churn can
raise it up to 10 000.

Mitigation: keep micro-batches small and periodically run `OPTIMIZE` to compact fragments. Manifest
growth dominates per-commit latency, so compacting fragments keeps the streaming write path fast.

## Configuration

| Option | Default | Purpose |
|---|---|---|
| `streamingQueryId` | — | Required. Globally unique idempotency key. |
| `lance.streaming.dedupe.lookback.versions` | 100 | Number of recent versions scanned for an already-committed `(queryId, epochId)` pair on replay. Max 10 000. Raise for high commit churn; lower to bound restart-time scans. |

The two transaction-property keys (`lance.streaming.queryId`, `lance.streaming.epochId`) are part
of the stability contract — external tooling can rely on them when inspecting Lance transaction
history.

## Limitations

- **CTAS / `REPLACE TABLE`** is rejected. Stage commits commit exactly once, which is structurally
incompatible with the per-epoch streaming cadence.
- **Streaming reads** (`MicroBatchStream`) are not yet supported — only writes. Tracked
separately.
- **Row-level UPDATE/DELETE** via Spark's position-delta API is not supported in streaming.
Append-style writes only.
- The Lance table must exist before the streaming query starts; the sink does not auto-create.

## OPTIMIZE cadence

Each non-empty micro-batch advances the Lance manifest by one version. Manifest size grows linearly
with fragment count, and read-path performance degrades for very large manifests. For continuous
streams, schedule an `OPTIMIZE` on the target table at a cadence appropriate to your micro-batch
volume — for example, every 1000 epochs at a 5-second trigger.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.streaming;

public class TestStreamingWrite extends BaseStreamingWriteTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public class LanceDataset

private static final Set<TableCapability> CAPABILITIES =
ImmutableSet.of(
TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE);
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
TableCapability.STREAMING_WRITE,
TableCapability.TRUNCATE);

public static final MetadataColumn FRAGMENT_ID_COLUMN =
new MetadataColumn() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ public class LanceSparkWriteOptions implements Serializable {
public static final String CONFIG_MAX_BATCH_BYTES = "max_batch_bytes";
public static final String CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD = "blob_pack_file_size_threshold";

// Structured Streaming options (see docs/streaming.md).
public static final String CONFIG_STREAMING_QUERY_ID = "streamingQueryId";
public static final String CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS =
"lance.streaming.dedupe.lookback.versions";

// Transaction-property keys stamped into the Lance commit. The streaming dedupe scan looks for
// these keys to detect a replay of an already-committed epoch.
public static final String STREAMING_QUERY_ID_PROP = "lance.streaming.queryId";
public static final String STREAMING_EPOCH_ID_PROP = "lance.streaming.epochId";

public static final int DEFAULT_STREAMING_DEDUPE_LOOKBACK_VERSIONS = 100;
public static final int MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS = 10_000;

private static final WriteMode DEFAULT_WRITE_MODE = WriteMode.APPEND;
private static final boolean DEFAULT_USE_QUEUED_WRITE_BUFFER = false;
private static final int DEFAULT_QUEUE_DEPTH = 8;
Expand Down Expand Up @@ -96,6 +109,18 @@ public class LanceSparkWriteOptions implements Serializable {
/** Use this version to open the dataset and apply write if set. */
private final Long version;

/**
* Idempotency key for Structured Streaming. Required for the streaming write path; null on the
* batch path. Persisted as {@link #STREAMING_QUERY_ID_PROP} in each epoch's Lance transaction.
*/
private final String streamingQueryId;

/**
* Number of versions to scan backwards when looking for a previously-committed (queryId, epochId)
* pair during replay. Bounded above by {@link #MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS}.
*/
private final int streamingDedupeLookbackVersions;

private LanceSparkWriteOptions(Builder builder) {
this.datasetUri = builder.datasetUri;
this.writeMode = builder.writeMode;
Expand All @@ -114,6 +139,8 @@ private LanceSparkWriteOptions(Builder builder) {
this.namespace = builder.namespace;
this.tableId = builder.tableId;
this.version = builder.version;
this.streamingQueryId = builder.streamingQueryId;
this.streamingDedupeLookbackVersions = builder.streamingDedupeLookbackVersions;
}

/** Creates a new builder for LanceSparkWriteOptions. */
Expand Down Expand Up @@ -214,6 +241,15 @@ public Long getVersion() {
return version;
}

/** Nullable: non-null only on the streaming write path. */
public String getStreamingQueryId() {
return streamingQueryId;
}

public int getStreamingDedupeLookbackVersions() {
return streamingDedupeLookbackVersions;
}

/** Returns a builder pre-populated with all fields from this instance. */
public Builder toBuilder() {
return builder()
Expand All @@ -233,7 +269,9 @@ public Builder toBuilder() {
.storageOptions(storageOptions)
.namespace(namespace)
.tableId(tableId)
.version(version);
.version(version)
.streamingQueryId(streamingQueryId)
.streamingDedupeLookbackVersions(streamingDedupeLookbackVersions);
}

/** Returns a copy of these options with version set to the given version. */
Expand Down Expand Up @@ -325,7 +363,9 @@ public boolean equals(Object o) {
&& Objects.equals(blobPackFileSizeThreshold, that.blobPackFileSizeThreshold)
&& Objects.equals(storageOptions, that.storageOptions)
&& Objects.equals(tableId, that.tableId)
&& Objects.equals(version, that.version);
&& Objects.equals(version, that.version)
&& Objects.equals(streamingQueryId, that.streamingQueryId)
&& streamingDedupeLookbackVersions == that.streamingDedupeLookbackVersions;
}

@Override
Expand All @@ -346,7 +386,9 @@ public int hashCode() {
blobPackFileSizeThreshold,
storageOptions,
tableId,
version);
version,
streamingQueryId,
streamingDedupeLookbackVersions);
}

/** Builder for creating LanceSparkWriteOptions instances. */
Expand All @@ -368,6 +410,8 @@ public static class Builder {
private LanceNamespace namespace;
private List<String> tableId;
private Long version;
private String streamingQueryId;
private int streamingDedupeLookbackVersions = DEFAULT_STREAMING_DEDUPE_LOOKBACK_VERSIONS;

private Builder() {}

Expand Down Expand Up @@ -461,6 +505,21 @@ public Builder version(Long version) {
return this;
}

public Builder streamingQueryId(String streamingQueryId) {
this.streamingQueryId = streamingQueryId;
return this;
}

public Builder streamingDedupeLookbackVersions(int versions) {
Preconditions.checkArgument(
versions > 0 && versions <= MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS,
"streamingDedupeLookbackVersions must be in (0, %s], got %s",
MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS,
versions);
this.streamingDedupeLookbackVersions = versions;
return this;
}

/**
* Parses options from a map, extracting write-specific settings.
*
Expand Down Expand Up @@ -512,6 +571,13 @@ public Builder fromOptions(Map<String, String> options) {
Preconditions.checkArgument(parsed > 0, "blob_pack_file_size_threshold must be positive");
this.blobPackFileSizeThreshold = parsed;
}
if (options.containsKey(CONFIG_STREAMING_QUERY_ID)) {
this.streamingQueryId = options.get(CONFIG_STREAMING_QUERY_ID);
}
if (options.containsKey(CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS)) {
streamingDedupeLookbackVersions(
Integer.parseInt(options.get(CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS)));
}
return this;
}

Expand Down
Loading
Loading