diff --git a/docs/src/streaming.md b/docs/src/streaming.md new file mode 100644 index 000000000..381d54831 --- /dev/null +++ b/docs/src/streaming.md @@ -0,0 +1,97 @@ +# Spark Structured Streaming + +The Lance Spark connector supports writing data to Lance tables from a Spark Structured Streaming +query. Each micro-batch lands as one Lance append (or overwrite, in Complete output mode), so the +table is queryable between batches and time-travel reads see one snapshot per epoch. + +> Streaming support requires Spark **3.5+** (`StreamingWrite.useCommitCoordinator` was added in +> 3.5). Spark 3.4 keeps batch-only. + +## Quick start + +```python +(spark.readStream + .schema(schema) + .parquet("/path/to/source") + .writeStream + .format("lance") + .option("streamingQueryId", "my-query-v1") # required, see below + .option("checkpointLocation", "/path/to/checkpoint") + .trigger(availableNow=True) + .toTable("lance_ns.default.events")) +``` + +The target table must already exist; the streaming sink does not auto-create. Create it first with +plain SQL DDL: + +```sql +CREATE TABLE lance_ns.default.events (id BIGINT, name STRING); +``` + +## Required option: `streamingQueryId` + +`streamingQueryId` is the **idempotency key** for the query. It must be: + +- **Globally unique** across all concurrent streaming queries that write to the same Lance + table. Two queries sharing a `streamingQueryId` will dedupe each other's epochs and produce + incorrect results. +- **Stable across restarts** — picking a different value on restart loses replay protection for + the prior epoch. + +The connector fails fast at query start if the option is absent. + +## Output modes + +| Spark mode | Lance behavior | +|---|---| +| `append` (default) | Each epoch issues a Lance `Append` operation. | +| `complete` | Each epoch issues a Lance `Overwrite` (replacing all existing rows). Requires the upstream query to be a streaming aggregation. | +| `update` | Update-mode rows are routed through the append path — **delta rows are appended, not merged**. This matches the `SupportsStreamingUpdateAsAppend` contract; native MERGE-style upsert is not implemented. | + +## Exactly-once semantics + +Each non-empty micro-batch performs **one** Lance transaction stamped with `lance.streaming.queryId` +and `lance.streaming.epochId` in the transaction properties. On replay, the connector scans recent +transaction history (`DatasetDelta.listTransactions`) for an existing `(queryId, epochId)` pair — +finding one means the prior attempt already committed, so the current call is a no-op. + +**Empty epochs** are no-ops: no Lance transactions are issued. Spark's checkpoint advances +independently, and replays of empty epochs find no prior commit, see no fragments, and skip again. + +### Bounded at-least-once fallback + +A duplicate can only slip through if **more than `lance.streaming.dedupe.lookback.versions` +unrelated commits land between a crash and the restart**, moving the prior commit out of the +dedupe scan window. The default lookback is 100 versions; users with very high commit churn can +raise it up to 10 000. + +Mitigation: keep micro-batches small and periodically run `OPTIMIZE` to compact fragments. Manifest +growth dominates per-commit latency, so compacting fragments keeps the streaming write path fast. + +## Configuration + +| Option | Default | Purpose | +|---|---|---| +| `streamingQueryId` | — | Required. Globally unique idempotency key. | +| `lance.streaming.dedupe.lookback.versions` | 100 | Number of recent versions scanned for an already-committed `(queryId, epochId)` pair on replay. Max 10 000. Raise for high commit churn; lower to bound restart-time scans. | + +The two transaction-property keys (`lance.streaming.queryId`, `lance.streaming.epochId`) are part +of the stability contract — external tooling can rely on them when inspecting Lance transaction +history. + +## Limitations + +- **CTAS / `REPLACE TABLE`** is rejected. Stage commits commit exactly once, which is structurally + incompatible with the per-epoch streaming cadence. +- **Streaming reads** (`MicroBatchStream`) are not yet supported — only writes. Tracked + separately. +- **Row-level UPDATE/DELETE** via Spark's position-delta API is not supported in streaming. + Append-style writes only. +- The Lance table must exist before the streaming query starts; the sink does not auto-create. + +## OPTIMIZE cadence + +Each non-empty micro-batch advances the Lance manifest by one version. Manifest size grows linearly +with fragment count, and read-path performance degrades for very large manifests. For continuous +streams, schedule an `OPTIMIZE` on the target table at a cadence appropriate to your micro-batch +volume — for example, every 1000 epochs at a 5-second trigger. diff --git a/lance-spark-3.5_2.12/src/test/java/org/lance/spark/streaming/TestStreamingWrite.java b/lance-spark-3.5_2.12/src/test/java/org/lance/spark/streaming/TestStreamingWrite.java new file mode 100644 index 000000000..c3d1a0ce9 --- /dev/null +++ b/lance-spark-3.5_2.12/src/test/java/org/lance/spark/streaming/TestStreamingWrite.java @@ -0,0 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.streaming; + +public class TestStreamingWrite extends BaseStreamingWriteTest {} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java index c3e817dc9..a425b4d3d 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceDataset.java @@ -58,7 +58,10 @@ public class LanceDataset private static final Set CAPABILITIES = ImmutableSet.of( - TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE); + TableCapability.BATCH_READ, + TableCapability.BATCH_WRITE, + TableCapability.STREAMING_WRITE, + TableCapability.TRUNCATE); public static final MetadataColumn FRAGMENT_ID_COLUMN = new MetadataColumn() { diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java index 0e0f6db06..925256028 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/LanceSparkWriteOptions.java @@ -59,6 +59,19 @@ public class LanceSparkWriteOptions implements Serializable { public static final String CONFIG_MAX_BATCH_BYTES = "max_batch_bytes"; public static final String CONFIG_BLOB_PACK_FILE_SIZE_THRESHOLD = "blob_pack_file_size_threshold"; + // Structured Streaming options (see docs/streaming.md). + public static final String CONFIG_STREAMING_QUERY_ID = "streamingQueryId"; + public static final String CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS = + "lance.streaming.dedupe.lookback.versions"; + + // Transaction-property keys stamped into the Lance commit. The streaming dedupe scan looks for + // these keys to detect a replay of an already-committed epoch. + public static final String STREAMING_QUERY_ID_PROP = "lance.streaming.queryId"; + public static final String STREAMING_EPOCH_ID_PROP = "lance.streaming.epochId"; + + public static final int DEFAULT_STREAMING_DEDUPE_LOOKBACK_VERSIONS = 100; + public static final int MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS = 10_000; + private static final WriteMode DEFAULT_WRITE_MODE = WriteMode.APPEND; private static final boolean DEFAULT_USE_QUEUED_WRITE_BUFFER = false; private static final int DEFAULT_QUEUE_DEPTH = 8; @@ -96,6 +109,18 @@ public class LanceSparkWriteOptions implements Serializable { /** Use this version to open the dataset and apply write if set. */ private final Long version; + /** + * Idempotency key for Structured Streaming. Required for the streaming write path; null on the + * batch path. Persisted as {@link #STREAMING_QUERY_ID_PROP} in each epoch's Lance transaction. + */ + private final String streamingQueryId; + + /** + * Number of versions to scan backwards when looking for a previously-committed (queryId, epochId) + * pair during replay. Bounded above by {@link #MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS}. + */ + private final int streamingDedupeLookbackVersions; + private LanceSparkWriteOptions(Builder builder) { this.datasetUri = builder.datasetUri; this.writeMode = builder.writeMode; @@ -114,6 +139,8 @@ private LanceSparkWriteOptions(Builder builder) { this.namespace = builder.namespace; this.tableId = builder.tableId; this.version = builder.version; + this.streamingQueryId = builder.streamingQueryId; + this.streamingDedupeLookbackVersions = builder.streamingDedupeLookbackVersions; } /** Creates a new builder for LanceSparkWriteOptions. */ @@ -214,6 +241,15 @@ public Long getVersion() { return version; } + /** Nullable: non-null only on the streaming write path. */ + public String getStreamingQueryId() { + return streamingQueryId; + } + + public int getStreamingDedupeLookbackVersions() { + return streamingDedupeLookbackVersions; + } + /** Returns a builder pre-populated with all fields from this instance. */ public Builder toBuilder() { return builder() @@ -233,7 +269,9 @@ public Builder toBuilder() { .storageOptions(storageOptions) .namespace(namespace) .tableId(tableId) - .version(version); + .version(version) + .streamingQueryId(streamingQueryId) + .streamingDedupeLookbackVersions(streamingDedupeLookbackVersions); } /** Returns a copy of these options with version set to the given version. */ @@ -325,7 +363,9 @@ public boolean equals(Object o) { && Objects.equals(blobPackFileSizeThreshold, that.blobPackFileSizeThreshold) && Objects.equals(storageOptions, that.storageOptions) && Objects.equals(tableId, that.tableId) - && Objects.equals(version, that.version); + && Objects.equals(version, that.version) + && Objects.equals(streamingQueryId, that.streamingQueryId) + && streamingDedupeLookbackVersions == that.streamingDedupeLookbackVersions; } @Override @@ -346,7 +386,9 @@ public int hashCode() { blobPackFileSizeThreshold, storageOptions, tableId, - version); + version, + streamingQueryId, + streamingDedupeLookbackVersions); } /** Builder for creating LanceSparkWriteOptions instances. */ @@ -368,6 +410,8 @@ public static class Builder { private LanceNamespace namespace; private List tableId; private Long version; + private String streamingQueryId; + private int streamingDedupeLookbackVersions = DEFAULT_STREAMING_DEDUPE_LOOKBACK_VERSIONS; private Builder() {} @@ -461,6 +505,21 @@ public Builder version(Long version) { return this; } + public Builder streamingQueryId(String streamingQueryId) { + this.streamingQueryId = streamingQueryId; + return this; + } + + public Builder streamingDedupeLookbackVersions(int versions) { + Preconditions.checkArgument( + versions > 0 && versions <= MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS, + "streamingDedupeLookbackVersions must be in (0, %s], got %s", + MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS, + versions); + this.streamingDedupeLookbackVersions = versions; + return this; + } + /** * Parses options from a map, extracting write-specific settings. * @@ -512,6 +571,13 @@ public Builder fromOptions(Map options) { Preconditions.checkArgument(parsed > 0, "blob_pack_file_size_threshold must be positive"); this.blobPackFileSizeThreshold = parsed; } + if (options.containsKey(CONFIG_STREAMING_QUERY_ID)) { + this.streamingQueryId = options.get(CONFIG_STREAMING_QUERY_ID); + } + if (options.containsKey(CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS)) { + streamingDedupeLookbackVersions( + Integer.parseInt(options.get(CONFIG_STREAMING_DEDUPE_LOOKBACK_VERSIONS))); + } return this; } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/LanceStreamingWrite.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/LanceStreamingWrite.java new file mode 100644 index 000000000..a92aa6a5c --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/LanceStreamingWrite.java @@ -0,0 +1,181 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.write; + +import org.lance.FragmentMetadata; +import org.lance.memwal.ShardingSpec; +import org.lance.spark.LanceSparkWriteOptions; +import org.lance.spark.utils.BlobSourceContext; + +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory; +import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.LanceArrowUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Spark Structured Streaming sink for Lance. + * + *

Each micro-batch produces a single Lance transaction stamped with the streaming queryId and + * epochId. Replay dedupe scans recent transaction history for the same (queryId, epochId) pair; see + * {@link StreamingCommitProtocol}. + */ +public class LanceStreamingWrite implements StreamingWrite { + private static final Logger LOG = LoggerFactory.getLogger(LanceStreamingWrite.class); + + private final StructType schema; + private final LanceSparkWriteOptions writeOptions; + private final boolean overwrite; + private final Map initialStorageOptions; + private final String namespaceImpl; + private final Map namespaceProperties; + private final List tableId; + private final boolean managedVersioning; + private final ShardingSpec shardingSpec; + private final Map blobSourceContexts; + private final String queryId; + + public LanceStreamingWrite( + StructType schema, + LanceSparkWriteOptions writeOptions, + boolean overwrite, + Map initialStorageOptions, + String namespaceImpl, + Map namespaceProperties, + List tableId, + boolean managedVersioning, + ShardingSpec shardingSpec, + Map blobSourceContexts) { + this.queryId = requireStreamingQueryId(writeOptions.getStreamingQueryId()); + this.schema = schema; + // Streaming intentionally does NOT pin the dataset version. A long-running query produces + // many epochs; each commit must open the dataset at the current latest version so the + // dedupe scan window and the transaction's readVersion both reflect on-disk reality. + this.writeOptions = writeOptions; + this.overwrite = overwrite; + this.initialStorageOptions = initialStorageOptions; + this.namespaceImpl = namespaceImpl; + this.namespaceProperties = namespaceProperties; + this.tableId = tableId; + this.managedVersioning = managedVersioning; + this.shardingSpec = shardingSpec; + this.blobSourceContexts = + blobSourceContexts == null ? Collections.emptyMap() : blobSourceContexts; + } + + private static String requireStreamingQueryId(String value) { + if (value == null || value.trim().isEmpty()) { + throw new IllegalArgumentException( + "Structured Streaming writes to Lance require the '" + + LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID + + "' option (a stable, globally unique query id used as the idempotency key)."); + } + return value; + } + + @Override + public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) { + LanceDataWriter.WriterFactory inner = + new LanceDataWriter.WriterFactory( + schema, + writeOptions, + initialStorageOptions, + namespaceImpl, + namespaceProperties, + tableId, + shardingSpec, + blobSourceContexts); + return new EpochAwareWriterFactory(inner); + } + + @Override + public void commit(long epochId, WriterCommitMessage[] messages) { + List fragments = + Arrays.stream(messages) + .filter(m -> m != null) + .map(m -> (LanceBatchWrite.TaskCommit) m) + .map(LanceBatchWrite.TaskCommit::getFragments) + .map(LanceDataWriter::stripRowIdMeta) + .flatMap(List::stream) + .collect(Collectors.toList()); + + Schema arrowSchema = + LanceArrowUtils.toArrowSchema(schema, "UTC", true, writeOptions.isUseLargeVarTypes()); + boolean isOverwrite = overwrite || writeOptions.isOverwrite(); + + StreamingCommitProtocol.commitEpoch( + writeOptions, + fragments, + arrowSchema, + isOverwrite, + queryId, + epochId, + namespaceImpl, + namespaceProperties, + tableId, + managedVersioning); + } + + @Override + public void abort(long epochId, WriterCommitMessage[] messages) { + // No driver-side resources to release; per-task writers clean up via their own close(). + LOG.debug("streaming epoch aborted queryId={} epochId={}", queryId, epochId); + } + + /** + * Lance commits are atomic via {@link org.lance.CommitBuilder}; we do not need Spark's commit + * coordinator. Returning {@code false} here keeps the commit path identical to {@link + * LanceBatchWrite#useCommitCoordinator()}. + * + *

Annotation intentionally omitted: this method is a {@code default} on {@link StreamingWrite} + * in Spark 3.5+ but not declared at all in 3.4. Omitting {@code @Override} keeps this class + * compilable across the supported version matrix. + */ + public boolean useCommitCoordinator() { + return false; + } + + /** + * Adapter that wraps the existing batch {@link LanceDataWriter.WriterFactory} so it can fulfill + * the streaming factory contract. The {@code epochId} is ignored on the writer side — Lance + * fragments produced by a task are written under fresh UUIDs each epoch and are surfaced back to + * the driver via the {@link WriterCommitMessage}; the streaming identity is applied at commit + * time, not per-row. + */ + static final class EpochAwareWriterFactory implements StreamingDataWriterFactory { + private static final long serialVersionUID = 1L; + private final LanceDataWriter.WriterFactory delegate; + + EpochAwareWriterFactory(LanceDataWriter.WriterFactory delegate) { + this.delegate = delegate; + } + + @Override + public DataWriter createWriter(int partitionId, long taskId, long epochId) { + return delegate.createWriter(partitionId, taskId); + } + } +} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java index 7e5f6bd30..73fe37f06 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/SparkWrite.java @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.write.Write; import org.apache.spark.sql.connector.write.WriteBuilder; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend; import org.apache.spark.sql.types.StructType; import java.util.Collections; @@ -166,11 +167,27 @@ public BatchWrite toBatch() { @Override public StreamingWrite toStreaming() { - throw new UnsupportedOperationException(); + if (stagedCommit != null) { + throw new UnsupportedOperationException( + "Structured Streaming writes are incompatible with staged commits (CTAS / REPLACE " + + "TABLE). Create the target Lance table first, then start the streaming query."); + } + return new LanceStreamingWrite( + schema, + writeOptions, + overwrite, + initialStorageOptions, + namespaceImpl, + namespaceProperties, + tableId, + managedVersioning, + shardingSpec(), + blobSourceContexts); } /** Spark write builder. */ - public static class SparkWriteBuilder implements SupportsTruncate, WriteBuilder { + public static class SparkWriteBuilder + implements SupportsTruncate, SupportsStreamingUpdateAsAppend, WriteBuilder { private final LanceSparkWriteOptions writeOptions; private final StructType schema; private boolean overwrite = false; @@ -242,22 +259,7 @@ public Write build() { LanceSparkWriteOptions options = !overwrite ? writeOptions - : LanceSparkWriteOptions.builder() - .storageOptions(writeOptions.getStorageOptions()) - .namespace(writeOptions.getNamespace()) - .tableId(writeOptions.getTableId()) - .batchSize(writeOptions.getBatchSize()) - .datasetUri(writeOptions.getDatasetUri()) - .fileFormatVersion(writeOptions.getFileFormatVersion()) - .maxBytesPerFile(writeOptions.getMaxBytesPerFile()) - .maxRowsPerFile(writeOptions.getMaxRowsPerFile()) - .maxRowsPerGroup(writeOptions.getMaxRowsPerGroup()) - .queueDepth(writeOptions.getQueueDepth()) - .useQueuedWriteBuffer(writeOptions.isUseQueuedWriteBuffer()) - .useLargeVarTypes(writeOptions.isUseLargeVarTypes()) - .enableStableRowIds(writeOptions.getEnableStableRowIds()) - .writeMode(WriteParams.WriteMode.OVERWRITE) - .build(); + : writeOptions.toBuilder().writeMode(WriteParams.WriteMode.OVERWRITE).build(); return new SparkWrite( schema, diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/StreamingCommitProtocol.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/StreamingCommitProtocol.java new file mode 100644 index 000000000..ee3aaf01f --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/StreamingCommitProtocol.java @@ -0,0 +1,189 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.write; + +import org.lance.CommitBuilder; +import org.lance.Dataset; +import org.lance.FragmentMetadata; +import org.lance.Transaction; +import org.lance.delta.DatasetDelta; +import org.lance.delta.DatasetDeltaBuilder; +import org.lance.namespace.LanceNamespace; +import org.lance.operation.Append; +import org.lance.operation.Operation; +import org.lance.operation.Overwrite; +import org.lance.spark.LanceRuntime; +import org.lance.spark.LanceSparkWriteOptions; +import org.lance.spark.utils.Utils; + +import org.apache.arrow.vector.types.pojo.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Driver-side commit helper for Structured Streaming. + * + *

Each non-empty micro-batch is committed in a single Lance {@link Transaction} stamped + * with {@link LanceSparkWriteOptions#STREAMING_QUERY_ID_PROP} and {@link + * LanceSparkWriteOptions#STREAMING_EPOCH_ID_PROP} in the transaction properties. Replay dedupe + * scans recent transaction history via {@link DatasetDelta#listTransactions()} for an existing + * (queryId, epochId) pair — finding one means the previous attempt already committed, so the + * current call is a no-op. + * + *

Empty epochs are skipped entirely (no transaction): Spark's checkpoint advances regardless, + * and replays of empty epochs find no prior commit, see no fragments, and skip again. + * + *

Failure window: a duplicate can only slip through if more than {@code + * streamingDedupeLookbackVersions} unrelated commits land between a crash and the restart, moving + * the prior commit out of the scan window. Users with very high commit churn can raise the lookback + * up to {@link LanceSparkWriteOptions#MAX_STREAMING_DEDUPE_LOOKBACK_VERSIONS}. + */ +final class StreamingCommitProtocol { + private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitProtocol.class); + + private StreamingCommitProtocol() {} + + /** + * Commits a streaming micro-batch. Empty {@code fragments} are skipped entirely. + * + * @return {@code true} if a transaction was issued, {@code false} on dedupe or empty-batch skip. + */ + static boolean commitEpoch( + LanceSparkWriteOptions writeOptions, + List fragments, + Schema arrowSchema, + boolean isOverwrite, + String queryId, + long epochId, + String namespaceImpl, + Map namespaceProperties, + List tableId, + boolean managedVersioning) { + Boolean enableStableRowIds = writeOptions.getEnableStableRowIds(); + + try (Dataset ds = Utils.openDatasetBuilder(writeOptions).build()) { + long currentVersion = ds.version(); + + Optional replayVersion = findReplay(ds, currentVersion, queryId, epochId, writeOptions); + if (replayVersion.isPresent()) { + LOG.info( + "streaming epoch already committed (replay); queryId={} epochId={} foundInVersion={}", + queryId, + epochId, + replayVersion.get()); + return false; + } + + if (fragments.isEmpty()) { + LOG.info( + "streaming empty epoch skipped (no fragments); queryId={} epochId={}", + queryId, + epochId); + return false; + } + + Operation operation; + if (isOverwrite) { + operation = Overwrite.builder().fragments(fragments).schema(arrowSchema).build(); + } else { + operation = Append.builder().fragments(fragments).build(); + } + + Map txnProps = new HashMap<>(2); + txnProps.put(LanceSparkWriteOptions.STREAMING_QUERY_ID_PROP, queryId); + txnProps.put(LanceSparkWriteOptions.STREAMING_EPOCH_ID_PROP, Long.toString(epochId)); + + CommitBuilder commitBuilder = + new CommitBuilder(ds).writeParams(writeOptions.getStorageOptions()); + if (enableStableRowIds != null) { + commitBuilder.useStableRowIds(enableStableRowIds); + } + if (managedVersioning) { + LanceNamespace namespace = + LanceRuntime.getOrCreateNamespace(namespaceImpl, namespaceProperties); + commitBuilder + .namespaceClient(namespace) + .tableId(tableId) + .namespaceClientManagedVersioning(true); + } + + try (Transaction txn = + new Transaction.Builder() + .readVersion(currentVersion) + .operation(operation) + .transactionProperties(txnProps) + .build(); + Dataset committed = commitBuilder.execute(txn)) { + LOG.info( + "streaming epoch committed queryId={} epochId={} fragments={} version={}", + queryId, + epochId, + fragments.size(), + committed.version()); + return true; + } + } + } + + /** + * Scans transaction history backwards from {@code currentVersion} for at most {@code lookback} + * versions, returning the version where {@code (queryId, epochId)} was committed if found. + * + *

The Lance Transaction API does not expose the exact committed version, so the returned value + * is an upper-bound estimate within the scanned range — sufficient for logging and skip + * semantics. + */ + private static Optional findReplay( + Dataset ds, + long currentVersion, + String queryId, + long epochId, + LanceSparkWriteOptions writeOptions) { + int lookback = writeOptions.getStreamingDedupeLookbackVersions(); + if (lookback <= 0 || currentVersion <= 0) { + return Optional.empty(); + } + long beginExclusive = Math.max(0L, currentVersion - lookback); + if (beginExclusive >= currentVersion) { + return Optional.empty(); + } + String epochIdStr = Long.toString(epochId); + try (DatasetDelta delta = + new DatasetDeltaBuilder(ds) + .withBeginVersion(beginExclusive) + .withEndVersion(currentVersion) + .build()) { + List transactions = delta.listTransactions(); + // Walk newest-first (transactions are returned in ascending-version order). + for (int i = transactions.size() - 1; i >= 0; i--) { + Transaction txn = transactions.get(i); + Optional> props = txn.transactionProperties(); + if (!props.isPresent()) { + continue; + } + Map p = props.get(); + if (queryId.equals(p.get(LanceSparkWriteOptions.STREAMING_QUERY_ID_PROP)) + && epochIdStr.equals(p.get(LanceSparkWriteOptions.STREAMING_EPOCH_ID_PROP))) { + return Optional.of(beginExclusive + (long) i + 1L); + } + } + } + return Optional.empty(); + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/streaming/BaseStreamingWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/streaming/BaseStreamingWriteTest.java new file mode 100644 index 000000000..3b89be9eb --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/streaming/BaseStreamingWriteTest.java @@ -0,0 +1,343 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.streaming; + +import org.lance.Dataset; +import org.lance.spark.LanceDataset; +import org.lance.spark.LanceRuntime; +import org.lance.spark.LanceSparkReadOptions; +import org.lance.spark.LanceSparkWriteOptions; +import org.lance.spark.write.LanceStreamingWrite; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.write.WriterCommitMessage; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.StreamingQueryException; +import org.apache.spark.sql.streaming.Trigger; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * End-to-end tests for {@link LanceStreamingWrite}. Concrete extensions live in each Spark + * version-specific test module so the same suite runs on Spark 3.5, 4.0, and 4.1. + * + *

The streaming sink is exercised both through Spark's streaming engine ({@code writeStream ... + * toTable}) and through direct {@code commit} calls — the latter lets us deterministically verify + * dedupe and empty-epoch behavior without depending on Spark's checkpoint advancement. + */ +public abstract class BaseStreamingWriteTest { + + protected SparkSession spark; + protected TableCatalog catalog; + protected final String catalogName = "lance_ns"; + + @TempDir protected Path tempDir; + + @BeforeEach + void setup() throws IOException { + spark = + SparkSession.builder() + .appName("lance-streaming-write-test") + .master("local") + .config( + "spark.sql.catalog." + catalogName, "org.lance.spark.LanceNamespaceSparkCatalog") + .config("spark.sql.catalog." + catalogName + ".impl", "dir") + .config("spark.sql.catalog." + catalogName + ".root", tempDir.toString()) + .config("spark.sql.session.timeZone", "UTC") + .getOrCreate(); + catalog = (TableCatalog) spark.sessionState().catalogManager().catalog(catalogName); + // Create the "default" namespace in manifest mode so deregisterTable works. + spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".default"); + } + + @AfterEach + void tearDown() { + if (spark != null) { + spark.stop(); + } + } + + // ---------- end-to-end tests ---------- + + @Test + public void testAppendHappyPath() throws Exception { + String tableName = uniqueTable("stream_append"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + String queryId = "qid-" + tableName; + + // Batch 1: 3 rows. + writeBatchParquet(src, new int[] {1, 2, 3}); + runStreamingBatch(src, checkpoint, fullName, queryId); + assertEquals(3L, countRows(fullName)); + + // Batch 2: 2 more rows on the same checkpoint. + writeBatchParquet(src, new int[] {4, 5}); + runStreamingBatch(src, checkpoint, fullName, queryId); + assertEquals(5L, countRows(fullName)); + } + + @Test + public void testMissingStreamingQueryIdFails() throws Exception { + String tableName = uniqueTable("stream_no_qid"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + writeBatchParquet(src, new int[] {1}); + + StreamingQueryException ex = + assertThrows( + StreamingQueryException.class, + () -> { + StreamingQuery q = + spark + .readStream() + .schema(rowSchema()) + .option("recursiveFileLookup", "true") + .parquet(src.toString()) + .writeStream() + .format("lance") + .option("checkpointLocation", checkpoint.toString()) + .trigger(Trigger.AvailableNow()) + .toTable(fullName); + q.processAllAvailable(); + q.stop(); + }); + assertTrue( + ex.getMessage().contains(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID), + "expected error to mention streamingQueryId, got: " + ex.getMessage()); + } + + @Test + public void testDedupeOnReplayedEpoch() throws Exception { + String tableName = uniqueTable("stream_dedupe"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + String queryId = "qid-" + tableName; + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + writeBatchParquet(src, new int[] {10, 20}); + runStreamingBatch(src, checkpoint, fullName, queryId); + assertEquals(2L, countRows(fullName)); + long versionAfterFirst = currentVersion(tableName); + + // Re-issue the SAME (queryId, epochId=0) directly — the very first epoch in a fresh + // checkpoint is 0, so this matches what the previous Spark query committed. The dedupe + // scan must find the prior commit and skip. + LanceStreamingWrite sink = directSink(tableName, queryId, 100); + sink.commit(0L, new WriterCommitMessage[0]); + assertEquals(2L, countRows(fullName)); + assertEquals(versionAfterFirst, currentVersion(tableName)); + } + + @Test + public void testMultipleEpochsOnSameSinkAdvanceVersionMonotonically() throws Exception { + // Regression for a stale-pinned-version bug: when one LanceStreamingWrite instance handled + // many epochs (a long-running continuous query), the second epoch would still read the + // dataset at the version captured during construction, breaking both the dedupe scan window + // and the commit's readVersion. maxFilesPerTrigger forces the file source to produce a + // separate micro-batch per file, so all epochs flow through the SAME sink instance. + String tableName = uniqueTable("stream_multi_epoch"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + + Path src = tempDir.resolve("src-" + tableName); + Path checkpoint = tempDir.resolve("ckpt-" + tableName); + String queryId = "qid-" + tableName; + + // Three separate parquet files → three epochs at maxFilesPerTrigger=1. + writeBatchParquet(src, new int[] {1}); + writeBatchParquet(src, new int[] {2}); + writeBatchParquet(src, new int[] {3}); + + long versionBefore = currentVersion(tableName); + StreamingQuery q = + spark + .readStream() + .schema(rowSchema()) + .option("recursiveFileLookup", "true") + .option("maxFilesPerTrigger", "1") + .parquet(src.toString()) + .writeStream() + .format("lance") + .option(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID, queryId) + .option("checkpointLocation", checkpoint.toString()) + .trigger(Trigger.AvailableNow()) + .toTable(fullName); + q.processAllAvailable(); + q.stop(); + + assertEquals(3L, countRows(fullName), "all three rows should be visible after the query"); + long versionAfter = currentVersion(tableName); + assertTrue( + versionAfter >= versionBefore + 3, + "each epoch must advance the dataset version; before=" + + versionBefore + + " after=" + + versionAfter); + } + + @Test + public void testEmptyEpochIsNoOp() throws Exception { + String tableName = uniqueTable("stream_empty"); + String fullName = qualified(tableName); + spark.sql("CREATE TABLE " + fullName + " (id INT NOT NULL, name STRING)"); + long versionBefore = currentVersion(tableName); + + String queryId = "qid-" + tableName; + LanceStreamingWrite sink = directSink(tableName, queryId, 100); + sink.commit(42L, new WriterCommitMessage[0]); + + // Empty epochs are skipped entirely — no Lance transaction is issued, so the dataset + // version does not advance. Replays of empty epochs are also no-ops. + assertEquals(0L, countRows(fullName)); + assertEquals(versionBefore, currentVersion(tableName)); + + sink.commit(42L, new WriterCommitMessage[0]); + assertEquals(versionBefore, currentVersion(tableName)); + } + + // ---------- helpers ---------- + + private String uniqueTable(String base) { + return base + "_" + UUID.randomUUID().toString().replace("-", ""); + } + + private String qualified(String tableName) { + return catalogName + ".default." + tableName; + } + + private StructType rowSchema() { + return new StructType( + new StructField[] { + new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), + new StructField("name", DataTypes.StringType, true, Metadata.empty()) + }); + } + + private void writeBatchParquet(Path dir, int[] ids) throws IOException { + Files.createDirectories(dir); + List rows = new ArrayList<>(ids.length); + for (int id : ids) { + rows.add(RowFactory.create(id, "row-" + id)); + } + Path target = dir.resolve("batch-" + UUID.randomUUID() + ".parquet"); + spark.createDataFrame(rows, rowSchema()).coalesce(1).write().parquet(target.toString()); + } + + private void runStreamingBatch(Path srcDir, Path checkpoint, String targetTable, String queryId) + throws Exception { + StreamingQuery q = + spark + .readStream() + .schema(rowSchema()) + // Spark's parquet writer creates per-batch sub-directories — let the streaming + // file source descend into them. + .option("recursiveFileLookup", "true") + .parquet(srcDir.toString()) + .writeStream() + .format("lance") + .option(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID, queryId) + .option("checkpointLocation", checkpoint.toString()) + .trigger(Trigger.AvailableNow()) + .toTable(targetTable); + q.processAllAvailable(); + q.stop(); + } + + private long countRows(String fullName) { + return spark.sql("SELECT * FROM " + fullName).count(); + } + + private long currentVersion(String tableName) throws Exception { + LanceDataset table = + (LanceDataset) catalog.loadTable(Identifier.of(new String[] {"default"}, tableName)); + LanceSparkReadOptions read = table.readOptions(); + try (Dataset ds = openDataset(read)) { + return ds.version(); + } + } + + private Dataset openDataset(LanceSparkReadOptions read) { + if (read.hasNamespace()) { + return Dataset.open() + .allocator(LanceRuntime.allocator()) + .namespaceClient(read.getNamespace()) + .tableId(read.getTableId()) + .readOptions(read.toReadOptions()) + .build(); + } + return Dataset.open() + .allocator(LanceRuntime.allocator()) + .uri(read.getDatasetUri()) + .readOptions(read.toReadOptions()) + .build(); + } + + private LanceStreamingWrite directSink(String tableName, String queryId, int lookback) + throws Exception { + LanceDataset table = + (LanceDataset) catalog.loadTable(Identifier.of(new String[] {"default"}, tableName)); + LanceSparkReadOptions read = table.readOptions(); + StructType schema = table.schema(); + + LanceSparkWriteOptions writeOptions = + LanceSparkWriteOptions.builder() + .datasetUri(read.getDatasetUri()) + .storageOptions(read.getStorageOptions()) + .streamingQueryId(queryId) + .streamingDedupeLookbackVersions(lookback) + .build(); + + return new LanceStreamingWrite( + schema, + writeOptions, + false, + Collections.emptyMap(), + null, + Collections.emptyMap(), + null, + false, + null, + Collections.emptyMap()); + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java index 39e2e79af..aa8170fa0 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/write/SparkWriteTest.java @@ -101,10 +101,71 @@ public void testToBatchReturnsLanceBatchWrite(TestInfo testInfo) { } @Test - public void testToStreamingThrowsUnsupportedOperationException(TestInfo testInfo) { + public void testToStreamingRequiresStreamingQueryId(TestInfo testInfo) { String datasetUri = createDataset(testInfo.getTestMethod().get().getName()); Write write = createBuilder(datasetUri).build(); - assertThrows(UnsupportedOperationException.class, write::toStreaming); + // Without the streamingQueryId option the constructor fails fast — exercises the option + // validation in LanceStreamingWrite#requireStreamingQueryId. + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, write::toStreaming); + assertTrue( + ex.getMessage().contains(LanceSparkWriteOptions.CONFIG_STREAMING_QUERY_ID), + "Expected error message to mention streamingQueryId, got: " + ex.getMessage()); + } + + @Test + public void testToStreamingReturnsLanceStreamingWrite(TestInfo testInfo) { + String datasetUri = createDataset(testInfo.getTestMethod().get().getName()); + LanceSparkWriteOptions writeOptions = + LanceSparkWriteOptions.builder() + .datasetUri(datasetUri) + .streamingQueryId("test-query-id") + .build(); + SparkWrite.SparkWriteBuilder builder = + new SparkWrite.SparkWriteBuilder( + SPARK_SCHEMA, + writeOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Arrays.asList("default", "test_table"), + false, + null, + Collections.emptyMap()); + Write write = builder.build(); + assertInstanceOf(LanceStreamingWrite.class, write.toStreaming()); + } + + @Test + public void testToStreamingRejectsStagedCommit(TestInfo testInfo) { + String datasetUri = createDataset(testInfo.getTestMethod().get().getName()); + LanceSparkWriteOptions writeOptions = + LanceSparkWriteOptions.builder() + .datasetUri(datasetUri) + .streamingQueryId("test-query-id") + .build(); + SparkWrite.SparkWriteBuilder builder = + new SparkWrite.SparkWriteBuilder( + SPARK_SCHEMA, + writeOptions, + Collections.emptyMap(), + null, + Collections.emptyMap(), + Arrays.asList("default", "test_table"), + false, + null, + Collections.emptyMap()); + // Simulate a CTAS / REPLACE TABLE flow that staged the commit before toStreaming. + builder.setStagedCommit( + StagedCommit.forNewTable( + ARROW_SCHEMA, + datasetUri, + StagedCommitOptions.pathBased(Collections.emptyMap(), false))); + Write write = builder.build(); + UnsupportedOperationException ex = + assertThrows(UnsupportedOperationException.class, write::toStreaming); + assertTrue( + ex.getMessage().contains("staged"), + "expected error to mention staged commits, got: " + ex.getMessage()); } @Test diff --git a/pom.xml b/pom.xml index d10f34856..505bce1d2 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 0.5.0-beta.1 - 7.0.0-rc.1 + 7.1.0-beta.4 0.7.5 0.3.0