From 9c5706688b18bd62789f16cdecc5c12df843319b Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 27 May 2026 21:37:05 +0800 Subject: [PATCH] feat: support Spark Structured Streaming writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #246. Adds a Spark Structured Streaming sink for Lance. Each non-empty micro-batch produces a single Lance transaction stamped with (streamingQueryId, epochId) in its transaction properties; replay dedupe scans recent transaction history via DatasetDelta.listTransactions for an existing pair and skips the commit if it finds one. Empty epochs issue no transaction. Append, Complete, and Update output modes are routed through SparkWriteBuilder (now also implementing SupportsStreamingUpdateAsAppend). Complete maps to a Lance Overwrite per epoch; Update is append-only per Spark's marker contract. CTAS / staged-commit flows are rejected with an actionable error since per-epoch commits are incompatible with the single-shot staged commit cadence. User surface: - `streamingQueryId` (required) — globally unique idempotency key. Two queries sharing the same id would dedupe each other's epochs. - `lance.streaming.dedupe.lookback.versions` (default 100, max 10000) — how far back the replay scan looks before assuming the epoch is new. Raise it on high-churn tables; lower it to bound restart-time scans. Transaction-property keys `lance.streaming.queryId` and `lance.streaming.epochId` are stamped on every commit and are part of the stability contract — external tooling can read them straight from Lance transaction history. Notes / non-goals: - Requires lance-core with the DatasetDelta JNI binding rename (lance-format/lance#6963); pom bumps lance.version to 7.1.0-beta.4. - Streaming reads (MicroBatchStream) are not implemented yet — tracked separately. - Row-level UPDATE/DELETE via position-delta is not exposed on the streaming path. - The target Lance table must exist before the query starts; the sink does not auto-create. - The sink does NOT pin the dataset version on the writer — every commit opens at the current latest so the dedupe scan window and the transaction's readVersion both reflect on-disk reality. A multi-epoch regression test (testMultipleEpochsOnSameSinkAdvanceVersionMonotonically) uses maxFilesPerTrigger=1 to share one sink across three epochs and asserts versions advance monotonically. Test coverage: - BaseStreamingWriteTest: 5 cases covering append happy path, missing streamingQueryId failure, replay dedupe, multi-epoch on one sink, empty-epoch no-op. - SparkWriteTest: toStreaming returns LanceStreamingWrite when streamingQueryId is provided, throws IAE without it, rejects staged commits. User-facing doc at docs/src/streaming.md covers semantics, output modes, exactly-once contract, bounded at-least-once fallback, and OPTIMIZE cadence guidance. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/src/streaming.md | 97 +++++ .../spark/streaming/TestStreamingWrite.java | 16 + .../java/org/lance/spark/LanceDataset.java | 5 +- .../lance/spark/LanceSparkWriteOptions.java | 72 +++- .../spark/write/LanceStreamingWrite.java | 181 +++++++++ .../org/lance/spark/write/SparkWrite.java | 38 +- .../spark/write/StreamingCommitProtocol.java | 189 ++++++++++ .../streaming/BaseStreamingWriteTest.java | 343 ++++++++++++++++++ .../org/lance/spark/write/SparkWriteTest.java | 65 +++- pom.xml | 2 +- 10 files changed, 983 insertions(+), 25 deletions(-) create mode 100644 docs/src/streaming.md create mode 100644 lance-spark-3.5_2.12/src/test/java/org/lance/spark/streaming/TestStreamingWrite.java create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/write/LanceStreamingWrite.java create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/write/StreamingCommitProtocol.java create mode 100644 lance-spark-base_2.12/src/test/java/org/lance/spark/streaming/BaseStreamingWriteTest.java diff --git a/docs/src/streaming.md b/docs/src/streaming.md new file mode 100644 index 000000000..381d54831 --- /dev/null +++ b/docs/src/streaming.md @@ -0,0 +1,97 @@ +# Spark Structured Streaming + +The Lance Spark connector supports writing data to Lance tables from a Spark Structured Streaming +query. Each micro-batch lands as one Lance append (or overwrite, in Complete output mode), so the +table is queryable between batches and time-travel reads see one snapshot per epoch. + +> Streaming support requires Spark **3.5+** (`StreamingWrite.useCommitCoordinator` was added in +> 3.5). Spark 3.4 keeps batch-only. + +## Quick start + +```python +(spark.readStream + .schema(schema) + .parquet("/path/to/source") + .writeStream + .format("lance") + .option("streamingQueryId", "my-query-v1") # required, see below + .option("checkpointLocation", "/path/to/checkpoint") + .trigger(availableNow=True) + .toTable("lance_ns.default.events")) +``` + +The target table must already exist; the streaming sink does not auto-create. Create it first with +plain SQL DDL: + +```sql +CREATE TABLE lance_ns.default.events (id BIGINT, name STRING); +``` + +## Required option: `streamingQueryId` + +`streamingQueryId` is the **idempotency key** for the query. It must be: + +- **Globally unique** across all concurrent streaming queries that write to the same Lance + table. Two queries sharing a `streamingQueryId` will dedupe each other's epochs and produce + incorrect results. +- **Stable across restarts** — picking a different value on restart loses replay protection for + the prior epoch. + +The connector fails fast at query start if the option is absent. + +## Output modes + +| Spark mode | Lance behavior | +|---|---| +| `append` (default) | Each epoch issues a Lance `Append` operation. | +| `complete` | Each epoch issues a Lance `Overwrite` (replacing all existing rows). Requires the upstream query to be a streaming aggregation. | +| `update` | Update-mode rows are routed through the append path — **delta rows are appended, not merged**. This matches the `SupportsStreamingUpdateAsAppend` contract; native MERGE-style upsert is not implemented. | + +## Exactly-once semantics + +Each non-empty micro-batch performs **one** Lance transaction stamped with `lance.streaming.queryId` +and `lance.streaming.epochId` in the transaction properties. On replay, the connector scans recent +transaction history (`DatasetDelta.listTransactions`) for an existing `(queryId, epochId)` pair — +finding one means the prior attempt already committed, so the current call is a no-op. + +**Empty epochs** are no-ops: no Lance transactions are issued. Spark's checkpoint advances +independently, and replays of empty epochs find no prior commit, see no fragments, and skip again. + +### Bounded at-least-once fallback + +A duplicate can only slip through if **more than `lance.streaming.dedupe.lookback.versions` +unrelated commits land between a crash and the restart**, moving the prior commit out of the +dedupe scan window. The default lookback is 100 versions; users with very high commit churn can +raise it up to 10 000. + +Mitigation: keep micro-batches small and periodically run `OPTIMIZE` to compact fragments. Manifest +growth dominates per-commit latency, so compacting fragments keeps the streaming write path fast. + +## Configuration + +| Option | Default | Purpose | +|---|---|---| +| `streamingQueryId` | — | Required. Globally unique idempotency key. | +| `lance.streaming.dedupe.lookback.versions` | 100 | Number of recent versions scanned for an already-committed `(queryId, epochId)` pair on replay. Max 10 000. Raise for high commit churn; lower to bound restart-time scans. | + +The two transaction-property keys (`lance.streaming.queryId`, `lance.streaming.epochId`) are part +of the stability contract — external tooling can rely on them when inspecting Lance transaction +history. + +## Limitations + +- **CTAS / `REPLACE TABLE`** is rejected. Stage commits commit exactly once, which is structurally + incompatible with the per-epoch streaming cadence. +- **Streaming reads** (`MicroBatchStream`) are not yet supported — only writes. Tracked + separately. +- **Row-level UPDATE/DELETE** via Spark's position-delta API is not supported in streaming. + Append-style writes only. +- The Lance table must exist before the streaming query starts; the sink does not auto-create. + +## OPTIMIZE cadence + +Each non-empty micro-batch advances the Lance manifest by one version. Manifest size grows linearly +with fragment count, and read-path performance degrades for very large manifests. For continuous +streams, schedule an `OPTIMIZE` on the target table at a cadence appropriate to your micro-batch +volume — for example, every 1000 epochs at a 5-second trigger. diff --git a/lance-spark-3.5_2.12/src/test/java/org/lance/spark/streaming/TestStreamingWrite.java b/lance-spark-3.5_2.12/src/test/java/org/lance/spark/streaming/TestStreamingWrite.java new file mode 100644 index 000000000..c3d1a0ce9 --- /dev/null +++ b/lance-spark-3.5_2.12/src/test/java/org/lance/spark/streaming/TestStreamingWrite.java @@ -0,0 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.streaming; + +public class TestStreamingWrite extends BaseStreamingWriteTest {} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java index c3e817dc9..a425b4d3d 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java @@ -58,7 +58,10 @@ public class LanceDataset private static final Set CAPABILITIES = ImmutableSet.of( - TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE); + TableCapability.BATCH_READ, + TableCapability.BATCH_WRITE, + TableCapability.STREAMING_WRITE, + TableCapability.TRUNCATE); public static final MetadataColumn FRAGMENT_ID_COLUMN = new MetadataColumn() { diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java index 0e0f6db06..925256028 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java @@ -59,6 +59,19 @@ public class LanceSparkWriteOptions implements Serializable { public static final String CONFIG_MAX_BATCH_BYTES = "max_batch_bytes"; public static final String CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD = "blob_pack_file_size_threshold"; + // Structured Streaming options (see docs/streaming.md). + public static final String CONFIG_STREAMING_QUERY_ID = "streamingQueryId"; + public static final String CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS = + "lance.streaming.dedupe.lookback.versions"; + + // Transaction-property keys stamped into the Lance commit. The streaming dedupe scan looks for + // these keys to detect a replay of an already-committed epoch. + public static final String STREAMING_QUERY_ID_PROP = "lance.streaming.queryId"; + public static final String STREAMING_EPOCH_ID_PROP = "lance.streaming.epochId"; + + public static final int DEFAULT_STREAMING_DEDUPE_LOOKBACK_VERSIONS = 100; + public static final int MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS = 10_000; + private static final WriteMode DEFAULT_WRITE_MODE = WriteMode.APPEND; private static final boolean DEFAULT_USE_QUEUED_WRITE_BUFFER = false; private static final int DEFAULT_QUEUE_DEPTH = 8; @@ -96,6 +109,18 @@ public class LanceSparkWriteOptions implements Serializable { /** Use this version to open the dataset and apply write if set. */ private final Long version; + /** + * Idempotency key for Structured Streaming. Required for the streaming write path; null on the + * batch path. Persisted as {@link #STREAMING_QUERY_ID_PROP} in each epoch's Lance transaction. + */ + private final String streamingQueryId; + + /** + * Number of versions to scan backwards when looking for a previously-committed (queryId, epochId) + * pair during replay. Bounded above by {@link #MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS}. + */ + private final int streamingDedupeLookbackVersions; + private LanceSparkWriteOptions(Builder builder) { this.datasetUri = builder.datasetUri; this.writeMode = builder.writeMode; @@ -114,6 +139,8 @@ private LanceSparkWriteOptions(Builder builder) { this.namespace = builder.namespace; this.tableId = builder.tableId; this.version = builder.version; + this.streamingQueryId = builder.streamingQueryId; + this.streamingDedupeLookbackVersions = builder.streamingDedupeLookbackVersions; } /** Creates a new builder for LanceSparkWriteOptions. */ @@ -214,6 +241,15 @@ public Long getVersion() { return version; } + /** Nullable: non-null only on the streaming write path. */ + public String getStreamingQueryId() { + return streamingQueryId; + } + + public int getStreamingDedupeLookbackVersions() { + return streamingDedupeLookbackVersions; + } + /** Returns a builder pre-populated with all fields from this instance. */ public Builder toBuilder() { return builder() @@ -233,7 +269,9 @@ public Builder toBuilder() { .storageOptions(storageOptions) .namespace(namespace) .tableId(tableId) - .version(version); + .version(version) + .streamingQueryId(streamingQueryId) + .streamingDedupeLookbackVersions(streamingDedupeLookbackVersions); } /** Returns a copy of these options with version set to the given version. */ @@ -325,7 +363,9 @@ public boolean equals(Object o) { && Objects.equals(blobPackFileSizeThreshold, that.blobPackFileSizeThreshold) && Objects.equals(storageOptions, that.storageOptions) && Objects.equals(tableId, that.tableId) - && Objects.equals(version, that.version); + && Objects.equals(version, that.version) + && Objects.equals(streamingQueryId, that.streamingQueryId) + && streamingDedupeLookbackVersions == that.streamingDedupeLookbackVersions; } @Override @@ -346,7 +386,9 @@ public int hashCode() { blobPackFileSizeThreshold, storageOptions, tableId, - version); + version, + streamingQueryId, + streamingDedupeLookbackVersions); } /** Builder for creating LanceSparkWriteOptions instances. */ @@ -368,6 +410,8 @@ public static class Builder { private LanceNamespace namespace; private List tableId; private Long version; + private String streamingQueryId; + private int streamingDedupeLookbackVersions = DEFAULT_STREAMING_DEDUPE_LOOKBACK_VERSIONS; private Builder() {} @@ -461,6 +505,21 @@ public Builder version(Long version) { return this; } + public Builder streamingQueryId(String streamingQueryId) { + this.streamingQueryId = streamingQueryId; + return this; + } + + public Builder streamingDedupeLookbackVersions(int versions) { + Preconditions.checkArgument( + versions > 0 && versions <= MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS, + "streamingDedupeLookbackVersions must be in (0, %s], got %s", + MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS, + versions); + this.streamingDedupeLookbackVersions = versions; + return this; + } + /** * Parses options from a map, extracting write-specific settings. * @@ -512,6 +571,13 @@ public Builder fromOptions(Map options) { Preconditions.checkArgument(parsed > 0, "blob_pack_file_size_threshold must be positive"); this.blobPackFileSizeThreshold = parsed; } + if (options.containsKey(CONFIG_STREAMING_QUERY_ID)) { + this.streamingQueryId = options.get(CONFIG_STREAMING_QUERY_ID); + } + if (options.containsKey(CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS)) { + streamingDedupeLookbackVersions( + Integer.parseInt(options.get(CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS))); + } return this; } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/LanceStreamingWrite.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/LanceStreamingWrite.java new file mode 100644 index 000000000..a92aa6a5c --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/LanceStreamingWrite.java @@ -0,0 +1,181 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.write; + +import org.lance.FragmentMetadata; +import org.lance.memwal.ShardingSpec; +import org.lance.spark.LanceSparkWriteOptions; +import org.lance.spark.utils.BlobSourceContext; + +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory; +import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.LanceArrowUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Spark Structured Streaming sink for Lance. + * + *

Each micro-batch produces a single Lance transaction stamped with the streaming queryId and + * epochId. Replay dedupe scans recent transaction history for the same (queryId, epochId) pair; see + * {@link StreamingCommitProtocol}. + */ +public class LanceStreamingWrite implements StreamingWrite { + private static final Logger LOG = LoggerFactory.getLogger(LanceStreamingWrite.class); + + private final StructType schema; + private final LanceSparkWriteOptions writeOptions; + private final boolean overwrite; + private final Map initialStorageOptions; + private final String namespaceImpl; + private final Map namespaceProperties; + private final List tableId; + private final boolean managedVersioning; + private final ShardingSpec shardingSpec; + private final Map blobSourceContexts; + private final String queryId; + + public LanceStreamingWrite( + StructType schema, + LanceSparkWriteOptions writeOptions, + boolean overwrite, + Map initialStorageOptions, + String namespaceImpl, + Map namespaceProperties, + List tableId, + boolean managedVersioning, + ShardingSpec shardingSpec, + Map blobSourceContexts) { + this.queryId = requireStreamingQueryId(writeOptions.getStreamingQueryId()); + this.schema = schema; + // Streaming intentionally does NOT pin the dataset version. A long-running query produces + // many epochs; each commit must open the dataset at the current latest version so the + // dedupe scan window and the transaction's readVersion both reflect on-disk reality. + this.writeOptions = writeOptions; + this.overwrite = overwrite; + this.initialStorageOptions = initialStorageOptions; + this.namespaceImpl = namespaceImpl; + this.namespaceProperties = namespaceProperties; + this.tableId = tableId; + this.managedVersioning = managedVersioning; + this.shardingSpec = shardingSpec; + this.blobSourceContexts = + blobSourceContexts == null ? Collections.emptyMap() : blobSourceContexts; + } + + private static String requireStreamingQueryId(String value) { + if (value == null || value.trim().isEmpty()) { + throw new IllegalArgumentException( + "Structured Streaming writes to Lance require the '" + + LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID + + "' option (a stable, globally unique query id used as the idempotency key)."); + } + return value; + } + + @Override + public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) { + LanceDataWriter.WriterFactory inner = + new LanceDataWriter.WriterFactory( + schema, + writeOptions, + initialStorageOptions, + namespaceImpl, + namespaceProperties, + tableId, + shardingSpec, + blobSourceContexts); + return new EpochAwareWriterFactory(inner); + } + + @Override + public void commit(long epochId, WriterCommitMessage[] messages) { + List fragments = + Arrays.stream(messages) + .filter(m -> m != null) + .map(m -> (LanceBatchWrite.TaskCommit) m) + .map(LanceBatchWrite.TaskCommit::getFragments) + .map(LanceDataWriter::stripRowIdMeta) + .flatMap(List::stream) + .collect(Collectors.toList()); + + Schema arrowSchema = + LanceArrowUtils.toArrowSchema(schema, "UTC", true, writeOptions.isUseLargeVarTypes()); + boolean isOverwrite = overwrite || writeOptions.isOverwrite(); + + StreamingCommitProtocol.commitEpoch( + writeOptions, + fragments, + arrowSchema, + isOverwrite, + queryId, + epochId, + namespaceImpl, + namespaceProperties, + tableId, + managedVersioning); + } + + @Override + public void abort(long epochId, WriterCommitMessage[] messages) { + // No driver-side resources to release; per-task writers clean up via their own close(). + LOG.debug("streaming epoch aborted queryId={} epochId={}", queryId, epochId); + } + + /** + * Lance commits are atomic via {@link org.lance.CommitBuilder}; we do not need Spark's commit + * coordinator. Returning {@code false} here keeps the commit path identical to {@link + * LanceBatchWrite#useCommitCoordinator()}. + * + *

Annotation intentionally omitted: this method is a {@code default} on {@link StreamingWrite} + * in Spark 3.5+ but not declared at all in 3.4. Omitting {@code @Override} keeps this class + * compilable across the supported version matrix. + */ + public boolean useCommitCoordinator() { + return false; + } + + /** + * Adapter that wraps the existing batch {@link LanceDataWriter.WriterFactory} so it can fulfill + * the streaming factory contract. The {@code epochId} is ignored on the writer side — Lance + * fragments produced by a task are written under fresh UUIDs each epoch and are surfaced back to + * the driver via the {@link WriterCommitMessage}; the streaming identity is applied at commit + * time, not per-row. + */ + static final class EpochAwareWriterFactory implements StreamingDataWriterFactory { + private static final long serialVersionUID = 1L; + private final LanceDataWriter.WriterFactory delegate; + + EpochAwareWriterFactory(LanceDataWriter.WriterFactory delegate) { + this.delegate = delegate; + } + + @Override + public DataWriter createWriter(int partitionId, long taskId, long epochId) { + return delegate.createWriter(partitionId, taskId); + } + } +} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java index 7e5f6bd30..73fe37f06 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.write.Write; import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend; import org.apache.spark.sql.types.StructType; import java.util.Collections; @@ -166,11 +167,27 @@ public BatchWrite toBatch() { @Override public StreamingWrite toStreaming() { - throw new UnsupportedOperationException(); + if (stagedCommit != null) { + throw new UnsupportedOperationException( + "Structured Streaming writes are incompatible with staged commits (CTAS / REPLACE " + + "TABLE). Create the target Lance table first, then start the streaming query."); + } + return new LanceStreamingWrite( + schema, + writeOptions, + overwrite, + initialStorageOptions, + namespaceImpl, + namespaceProperties, + tableId, + managedVersioning, + shardingSpec(), + blobSourceContexts); } /** Spark write builder. */ - public static class SparkWriteBuilder implements SupportsTruncate, WriteBuilder { + public static class SparkWriteBuilder + implements SupportsTruncate, SupportsStreamingUpdateAsAppend, WriteBuilder { private final LanceSparkWriteOptions writeOptions; private final StructType schema; private boolean overwrite = false; @@ -242,22 +259,7 @@ public Write build() { LanceSparkWriteOptions options = !overwrite ? writeOptions - : LanceSparkWriteOptions.builder() - .storageOptions(writeOptions.getStorageOptions()) - .namespace(writeOptions.getNamespace()) - .tableId(writeOptions.getTableId()) - .batchSize(writeOptions.getBatchSize()) - .datasetUri(writeOptions.getDatasetUri()) - .fileFormatVersion(writeOptions.getFileFormatVersion()) - .maxBytesPerFile(writeOptions.getMaxBytesPerFile()) - .maxRowsPerFile(writeOptions.getMaxRowsPerFile()) - .maxRowsPerGroup(writeOptions.getMaxRowsPerGroup()) - .queueDepth(writeOptions.getQueueDepth()) - .useQueuedWriteBuffer(writeOptions.isUseQueuedWriteBuffer()) - .useLargeVarTypes(writeOptions.isUseLargeVarTypes()) - .enableStableRowIds(writeOptions.getEnableStableRowIds()) - .writeMode(WriteParams.WriteMode.OVERWRITE) - .build(); + : writeOptions.toBuilder().writeMode(WriteParams.WriteMode.OVERWRITE).build(); return new SparkWrite( schema, diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/StreamingCommitProtocol.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/StreamingCommitProtocol.java new file mode 100644 index 000000000..ee3aaf01f --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/StreamingCommitProtocol.java @@ -0,0 +1,189 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.write; + +import org.lance.CommitBuilder; +import org.lance.Dataset; +import org.lance.FragmentMetadata; +import org.lance.Transaction; +import org.lance.delta.DatasetDelta; +import org.lance.delta.DatasetDeltaBuilder; +import org.lance.namespace.LanceNamespace; +import org.lance.operation.Append; +import org.lance.operation.Operation; +import org.lance.operation.Overwrite; +import org.lance.spark.LanceRuntime; +import org.lance.spark.LanceSparkWriteOptions; +import org.lance.spark.utils.Utils; + +import org.apache.arrow.vector.types.pojo.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Driver-side commit helper for Structured Streaming. + * + *

Each non-empty micro-batch is committed in a single Lance {@link Transaction} stamped + * with {@link LanceSparkWriteOptions#STREAMING_QUERY_ID_PROP} and {@link + * LanceSparkWriteOptions#STREAMING_EPOCH_ID_PROP} in the transaction properties. Replay dedupe + * scans recent transaction history via {@link DatasetDelta#listTransactions()} for an existing + * (queryId, epochId) pair — finding one means the previous attempt already committed, so the + * current call is a no-op. + * + *

Empty epochs are skipped entirely (no transaction): Spark's checkpoint advances regardless, + * and replays of empty epochs find no prior commit, see no fragments, and skip again. + * + *

Failure window: a duplicate can only slip through if more than {@code + * streamingDedupeLookbackVersions} unrelated commits land between a crash and the restart, moving + * the prior commit out of the scan window. Users with very high commit churn can raise the lookback + * up to {@link LanceSparkWriteOptions#MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS}. + */ +final class StreamingCommitProtocol { + private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitProtocol.class); + + private StreamingCommitProtocol() {} + + /** + * Commits a streaming micro-batch. Empty {@code fragments} are skipped entirely. + * + * @return {@code true} if a transaction was issued, {@code false} on dedupe or empty-batch skip. + */ + static boolean commitEpoch( + LanceSparkWriteOptions writeOptions, + List fragments, + Schema arrowSchema, + boolean isOverwrite, + String queryId, + long epochId, + String namespaceImpl, + Map namespaceProperties, + List tableId, + boolean managedVersioning) { + Boolean enableStableRowIds = writeOptions.getEnableStableRowIds(); + + try (Dataset ds = Utils.openDatasetBuilder(writeOptions).build()) { + long currentVersion = ds.version(); + + Optional replayVersion = findReplay(ds, currentVersion, queryId, epochId, writeOptions); + if (replayVersion.isPresent()) { + LOG.info( + "streaming epoch already committed (replay); queryId={} epochId={} foundInVersion={}", + queryId, + epochId, + replayVersion.get()); + return false; + } + + if (fragments.isEmpty()) { + LOG.info( + "streaming empty epoch skipped (no fragments); queryId={} epochId={}", + queryId, + epochId); + return false; + } + + Operation operation; + if (isOverwrite) { + operation = Overwrite.builder().fragments(fragments).schema(arrowSchema).build(); + } else { + operation = Append.builder().fragments(fragments).build(); + } + + Map txnProps = new HashMap<>(2); + txnProps.put(LanceSparkWriteOptions.STREAMING_QUERY_ID_PROP, queryId); + txnProps.put(LanceSparkWriteOptions.STREAMING_EPOCH_ID_PROP, Long.toString(epochId)); + + CommitBuilder commitBuilder = + new CommitBuilder(ds).writeParams(writeOptions.getStorageOptions()); + if (enableStableRowIds != null) { + commitBuilder.useStableRowIds(enableStableRowIds); + } + if (managedVersioning) { + LanceNamespace namespace = + LanceRuntime.getOrCreateNamespace(namespaceImpl, namespaceProperties); + commitBuilder + .namespaceClient(namespace) + .tableId(tableId) + .namespaceClientManagedVersioning(true); + } + + try (Transaction txn = + new Transaction.Builder() + .readVersion(currentVersion) + .operation(operation) + .transactionProperties(txnProps) + .build(); + Dataset committed = commitBuilder.execute(txn)) { + LOG.info( + "streaming epoch committed queryId={} epochId={} fragments={} version={}", + queryId, + epochId, + fragments.size(), + committed.version()); + return true; + } + } + } + + /** + * Scans transaction history backwards from {@code currentVersion} for at most {@code lookback} + * versions, returning the version where {@code (queryId, epochId)} was committed if found. + * + *

The Lance Transaction API does not expose the exact committed version, so the returned value + * is an upper-bound estimate within the scanned range — sufficient for logging and skip + * semantics. + */ + private static Optional findReplay( + Dataset ds, + long currentVersion, + String queryId, + long epochId, + LanceSparkWriteOptions writeOptions) { + int lookback = writeOptions.getStreamingDedupeLookbackVersions(); + if (lookback <= 0 || currentVersion <= 0) { + return Optional.empty(); + } + long beginExclusive = Math.max(0L, currentVersion - lookback); + if (beginExclusive >= currentVersion) { + return Optional.empty(); + } + String epochIdStr = Long.toString(epochId); + try (DatasetDelta delta = + new DatasetDeltaBuilder(ds) + .withBeginVersion(beginExclusive) + .withEndVersion(currentVersion) + .build()) { + List transactions = delta.listTransactions(); + // Walk newest-first (transactions are returned in ascending-version order). + for (int i = transactions.size() - 1; i >= 0; i--) { + Transaction txn = transactions.get(i); + Optional> props = txn.transactionProperties(); + if (!props.isPresent()) { + continue; + } + Map p = props.get(); + if (queryId.equals(p.get(LanceSparkWriteOptions.STREAMING_QUERY_ID_PROP)) + && epochIdStr.equals(p.get(LanceSparkWriteOptions.STREAMING_EPOCH_ID_PROP))) { + return Optional.of(beginExclusive + (long) i + 1L); + } + } + } + return Optional.empty(); + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/streaming/BaseStreamingWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/streaming/BaseStreamingWriteTest.java new file mode 100644 index 000000000..3b89be9eb --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/streaming/BaseStreamingWriteTest.java @@ -0,0 +1,343 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.streaming; + +import org.lance.Dataset; +import org.lance.spark.LanceDataset; +import org.lance.spark.LanceRuntime; +import org.lance.spark.LanceSparkReadOptions; +import org.lance.spark.LanceSparkWriteOptions; +import org.lance.spark.write.LanceStreamingWrite; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; +import org.apache.spark.sql.streaming.Trigger; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * End-to-end tests for {@link LanceStreamingWrite}. Concrete extensions live in each Spark + * version-specific test module so the same suite runs on Spark 3.5, 4.0, and 4.1. + * + *

The streaming sink is exercised both through Spark's streaming engine ({@code writeStream ... + * toTable}) and through direct {@code commit} calls — the latter lets us deterministically verify + * dedupe and empty-epoch behavior without depending on Spark's checkpoint advancement. + */ +public abstract class BaseStreamingWriteTest { + + protected SparkSession spark; + protected TableCatalog catalog; + protected final String catalogName = "lance_ns"; + + @TempDir protected Path tempDir; + + @BeforeEach + void setup() throws IOException { + spark = + SparkSession.builder() + .appName("lance-streaming-write-test") + .master("local") + .config( + "spark.sql.catalog." + catalogName, "org.lance.spark.LanceNamespaceSparkCatalog") + .config("spark.sql.catalog." + catalogName + ".impl", "dir") + .config("spark.sql.catalog." + catalogName + ".root", tempDir.toString()) + .config("spark.sql.session.timeZone", "UTC") + .getOrCreate(); + catalog = (TableCatalog) spark.sessionState().catalogManager().catalog(catalogName); + // Create the "default" namespace in manifest mode so deregisterTable works. + spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".default"); + } + + @AfterEach + void tearDown() { + if (spark != null) { + spark.stop(); + } + } + + // ---------- end-to-end tests ---------- + + @Test + public void testAppendHappyPath() throws Exception { + String tableName = uniqueTable("stream_append"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + String queryId = "qid-" + tableName; + + // Batch 1: 3 rows. + writeBatchParquet(src, new int[] {1, 2, 3}); + runStreamingBatch(src, checkpoint, fullName, queryId); + assertEquals(3L, countRows(fullName)); + + // Batch 2: 2 more rows on the same checkpoint. + writeBatchParquet(src, new int[] {4, 5}); + runStreamingBatch(src, checkpoint, fullName, queryId); + assertEquals(5L, countRows(fullName)); + } + + @Test + public void testMissingStreamingQueryIdFails() throws Exception { + String tableName = uniqueTable("stream_no_qid"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + writeBatchParquet(src, new int[] {1}); + + StreamingQueryException ex = + assertThrows( + StreamingQueryException.class, + () -> { + StreamingQuery q = + spark + .readStream() + .schema(rowSchema()) + .option("recursiveFileLookup", "true") + .parquet(src.toString()) + .writeStream() + .format("lance") + .option("checkpointLocation", checkpoint.toString()) + .trigger(Trigger.AvailableNow()) + .toTable(fullName); + q.processAllAvailable(); + q.stop(); + }); + assertTrue( + ex.getMessage().contains(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID), + "expected error to mention streamingQueryId, got: " + ex.getMessage()); + } + + @Test + public void testDedupeOnReplayedEpoch() throws Exception { + String tableName = uniqueTable("stream_dedupe"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + String queryId = "qid-" + tableName; + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + writeBatchParquet(src, new int[] {10, 20}); + runStreamingBatch(src, checkpoint, fullName, queryId); + assertEquals(2L, countRows(fullName)); + long versionAfterFirst = currentVersion(tableName); + + // Re-issue the SAME (queryId, epochId=0) directly — the very first epoch in a fresh + // checkpoint is 0, so this matches what the previous Spark query committed. The dedupe + // scan must find the prior commit and skip. + LanceStreamingWrite sink = directSink(tableName, queryId, 100); + sink.commit(0L, new WriterCommitMessage[0]); + assertEquals(2L, countRows(fullName)); + assertEquals(versionAfterFirst, currentVersion(tableName)); + } + + @Test + public void testMultipleEpochsOnSameSinkAdvanceVersionMonotonically() throws Exception { + // Regression for a stale-pinned-version bug: when one LanceStreamingWrite instance handled + // many epochs (a long-running continuous query), the second epoch would still read the + // dataset at the version captured during construction, breaking both the dedupe scan window + // and the commit's readVersion. maxFilesPerTrigger forces the file source to produce a + // separate micro-batch per file, so all epochs flow through the SAME sink instance. + String tableName = uniqueTable("stream_multi_epoch"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + String queryId = "qid-" + tableName; + + // Three separate parquet files → three epochs at maxFilesPerTrigger=1. + writeBatchParquet(src, new int[] {1}); + writeBatchParquet(src, new int[] {2}); + writeBatchParquet(src, new int[] {3}); + + long versionBefore = currentVersion(tableName); + StreamingQuery q = + spark + .readStream() + .schema(rowSchema()) + .option("recursiveFileLookup", "true") + .option("maxFilesPerTrigger", "1") + .parquet(src.toString()) + .writeStream() + .format("lance") + .option(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID, queryId) + .option("checkpointLocation", checkpoint.toString()) + .trigger(Trigger.AvailableNow()) + .toTable(fullName); + q.processAllAvailable(); + q.stop(); + + assertEquals(3L, countRows(fullName), "all three rows should be visible after the query"); + long versionAfter = currentVersion(tableName); + assertTrue( + versionAfter >= versionBefore + 3, + "each epoch must advance the dataset version; before=" + + versionBefore + + " after=" + + versionAfter); + } + + @Test + public void testEmptyEpochIsNoOp() throws Exception { + String tableName = uniqueTable("stream_empty"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + long versionBefore = currentVersion(tableName); + + String queryId = "qid-" + tableName; + LanceStreamingWrite sink = directSink(tableName, queryId, 100); + sink.commit(42L, new WriterCommitMessage[0]); + + // Empty epochs are skipped entirely — no Lance transaction is issued, so the dataset + // version does not advance. Replays of empty epochs are also no-ops. + assertEquals(0L, countRows(fullName)); + assertEquals(versionBefore, currentVersion(tableName)); + + sink.commit(42L, new WriterCommitMessage[0]); + assertEquals(versionBefore, currentVersion(tableName)); + } + + // ---------- helpers ---------- + + private String uniqueTable(String base) { + return base + "_" + UUID.randomUUID().toString().replace("-", ""); + } + + private String qualified(String tableName) { + return catalogName + ".default." + tableName; + } + + private StructType rowSchema() { + return new StructType( + new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("name", DataTypes.StringType, true, Metadata.empty()) + }); + } + + private void writeBatchParquet(Path dir, int[] ids) throws IOException { + Files.createDirectories(dir); + List rows = new ArrayList<>(ids.length); + for (int id : ids) { + rows.add(RowFactory.create(id, "row-" + id)); + } + Path target = dir.resolve("batch-" + UUID.randomUUID() + ".parquet"); + spark.createDataFrame(rows, rowSchema()).coalesce(1).write().parquet(target.toString()); + } + + private void runStreamingBatch(Path srcDir, Path checkpoint, String targetTable, String queryId) + throws Exception { + StreamingQuery q = + spark + .readStream() + .schema(rowSchema()) + // Spark's parquet writer creates per-batch sub-directories — let the streaming + // file source descend into them. + .option("recursiveFileLookup", "true") + .parquet(srcDir.toString()) + .writeStream() + .format("lance") + .option(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID, queryId) + .option("checkpointLocation", checkpoint.toString()) + .trigger(Trigger.AvailableNow()) + .toTable(targetTable); + q.processAllAvailable(); + q.stop(); + } + + private long countRows(String fullName) { + return spark.sql("SELECT * FROM " + fullName).count(); + } + + private long currentVersion(String tableName) throws Exception { + LanceDataset table = + (LanceDataset) catalog.loadTable(Identifier.of(new String[] {"default"}, tableName)); + LanceSparkReadOptions read = table.readOptions(); + try (Dataset ds = openDataset(read)) { + return ds.version(); + } + } + + private Dataset openDataset(LanceSparkReadOptions read) { + if (read.hasNamespace()) { + return Dataset.open() + .allocator(LanceRuntime.allocator()) + .namespaceClient(read.getNamespace()) + .tableId(read.getTableId()) + .readOptions(read.toReadOptions()) + .build(); + } + return Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(read.getDatasetUri()) + .readOptions(read.toReadOptions()) + .build(); + } + + private LanceStreamingWrite directSink(String tableName, String queryId, int lookback) + throws Exception { + LanceDataset table = + (LanceDataset) catalog.loadTable(Identifier.of(new String[] {"default"}, tableName)); + LanceSparkReadOptions read = table.readOptions(); + StructType schema = table.schema(); + + LanceSparkWriteOptions writeOptions = + LanceSparkWriteOptions.builder() + .datasetUri(read.getDatasetUri()) + .storageOptions(read.getStorageOptions()) + .streamingQueryId(queryId) + .streamingDedupeLookbackVersions(lookback) + .build(); + + return new LanceStreamingWrite( + schema, + writeOptions, + false, + Collections.emptyMap(), + null, + Collections.emptyMap(), + null, + false, + null, + Collections.emptyMap()); + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java index 39e2e79af..aa8170fa0 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java @@ -101,10 +101,71 @@ public void testToBatchReturnsLanceBatchWrite(TestInfo testInfo) { } @Test - public void testToStreamingThrowsUnsupportedOperationException(TestInfo testInfo) { + public void testToStreamingRequiresStreamingQueryId(TestInfo testInfo) { String datasetUri = createDataset(testInfo.getTestMethod().get().getName()); Write write = createBuilder(datasetUri).build(); - assertThrows(UnsupportedOperationException.class, write::toStreaming); + // Without the streamingQueryId option the constructor fails fast — exercises the option + // validation in LanceStreamingWrite#requireStreamingQueryId. + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, write::toStreaming); + assertTrue( + ex.getMessage().contains(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID), + "Expected error message to mention streamingQueryId, got: " + ex.getMessage()); + } + + @Test + public void testToStreamingReturnsLanceStreamingWrite(TestInfo testInfo) { + String datasetUri = createDataset(testInfo.getTestMethod().get().getName()); + LanceSparkWriteOptions writeOptions = + LanceSparkWriteOptions.builder() + .datasetUri(datasetUri) + .streamingQueryId("test-query-id") + .build(); + SparkWrite.SparkWriteBuilder builder = + new SparkWrite.SparkWriteBuilder( + SPARK_SCHEMA, + writeOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Arrays.asList("default", "test_table"), + false, + null, + Collections.emptyMap()); + Write write = builder.build(); + assertInstanceOf(LanceStreamingWrite.class, write.toStreaming()); + } + + @Test + public void testToStreamingRejectsStagedCommit(TestInfo testInfo) { + String datasetUri = createDataset(testInfo.getTestMethod().get().getName()); + LanceSparkWriteOptions writeOptions = + LanceSparkWriteOptions.builder() + .datasetUri(datasetUri) + .streamingQueryId("test-query-id") + .build(); + SparkWrite.SparkWriteBuilder builder = + new SparkWrite.SparkWriteBuilder( + SPARK_SCHEMA, + writeOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Arrays.asList("default", "test_table"), + false, + null, + Collections.emptyMap()); + // Simulate a CTAS / REPLACE TABLE flow that staged the commit before toStreaming. + builder.setStagedCommit( + StagedCommit.forNewTable( + ARROW_SCHEMA, + datasetUri, + StagedCommitOptions.pathBased(Collections.emptyMap(), false))); + Write write = builder.build(); + UnsupportedOperationException ex = + assertThrows(UnsupportedOperationException.class, write::toStreaming); + assertTrue( + ex.getMessage().contains("staged"), + "expected error to mention staged commits, got: " + ex.getMessage()); } @Test diff --git a/pom.xml b/pom.xml index d10f34856..505bce1d2 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 0.5.0-beta.1 - 7.0.0-rc.1 + 7.1.0-beta.4 0.7.5 0.3.0