schemaEvolution =
SparkTableShims$.MODULE$.schemaEvolutionCapability();
if (schemaEvolution.isDefined()) {
@@ -416,7 +417,8 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap scanOptions) {
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
requireNonNull(info, "write info is null");
- return new DeltaV2WriteBuilder(kernelEngine, tablePath, hadoopConf, initialSnapshot, info);
+ return new DeltaV2WriteBuilder(
+ kernelEngine, tablePath, hadoopConf, initialSnapshot, schemaProvider.getDataSchema(), info);
}
@Override
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2BatchWrite.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2BatchWrite.java
index e5742e2fa7a..d2527cb7af0 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2BatchWrite.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2BatchWrite.java
@@ -21,159 +21,56 @@
import io.delta.kernel.TransactionCommitResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
-import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.utils.CloseableIterable;
-import io.delta.spark.internal.v2.read.DeltaParquetFileFormatV2;
-import io.delta.spark.internal.v2.utils.PartitionUtils;
-import io.delta.spark.internal.v2.utils.ScalaUtils;
-import io.delta.spark.internal.v2.utils.SchemaUtils;
-import io.delta.spark.internal.v2.utils.SerializableKernelRowWrapper;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriterFactory;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
-import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
-import org.apache.spark.sql.execution.datasources.OutputWriterFactory;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
/**
- * BatchWrite for DSv2 batch append using Spark's Parquet path. Creates a Kernel transaction on the
- * driver, obtains the target directory from the Kernel write context, creates a Spark Parquet
- * OutputWriterFactory via the shared {@link PartitionUtils#createDeltaParquetFileFormat} factory,
- * and serializes everything into a {@link DeltaV2DataWriterFactory} for executor transport.
- *
- * The {@link Transaction} object lives only on the driver and is never serialized. Executors
- * receive only serializable state: transaction state row, Hadoop conf, OutputWriterFactory, schema,
- * and target directory.
+ * BatchWrite for DSv2 batch append. The shared executor write state ({@link
+ * DeltaV2DataWriterFactory}) is built by {@link DeltaV2Write}; this class adds only the batch
+ * commit -- a single {@link Operation#WRITE} transaction over the {@code AddFile} actions reported
+ * by the writers.
*/
-class DeltaV2BatchWrite implements Write, BatchWrite {
+class DeltaV2BatchWrite implements BatchWrite {
- private static final Logger LOG = LoggerFactory.getLogger(DeltaV2BatchWrite.class);
+ private static final Logger logger = LoggerFactory.getLogger(DeltaV2BatchWrite.class);
private static String getEngineInfo() {
return "Delta-Spark-DSv2/" + org.apache.spark.package$.MODULE$.SPARK_VERSION();
}
- private final Transaction transaction;
private final Engine engine;
-
- private final String targetDirectory;
- private final SerializableConfiguration serializableHadoopConf;
- private final SerializableKernelRowWrapper serializedTxnState;
- private final StructType dataSchema;
- private final OutputWriterFactory outputWriterFactory;
+ private final Snapshot initialSnapshot;
+ private final DeltaV2DataWriterFactory dataWriterFactory;
DeltaV2BatchWrite(
- Engine engine,
- Configuration hadoopConf,
- String tablePath,
- Snapshot initialSnapshot,
- LogicalWriteInfo writeInfo) {
+ Engine engine, Snapshot initialSnapshot, DeltaV2DataWriterFactory dataWriterFactory) {
this.engine = engine;
- this.transaction =
- initialSnapshot
- .buildUpdateTableTransaction(getEngineInfo(), Operation.WRITE)
- .build(this.engine);
- Row txnState = transaction.getTransactionState(this.engine);
- this.serializedTxnState = new SerializableKernelRowWrapper(txnState);
-
- this.targetDirectory =
- Transaction.getWriteContext(this.engine, txnState, Collections.emptyMap())
- .getTargetDirectory();
-
- StructType tableSchema =
- SchemaUtils.convertKernelSchemaToSparkSchema(initialSnapshot.getSchema());
- java.util.Set partitionCols =
- new java.util.HashSet<>(initialSnapshot.getPartitionColumnNames());
- this.dataSchema =
- partitionCols.isEmpty()
- ? tableSchema
- : new StructType(
- java.util.Arrays.stream(tableSchema.fields())
- .filter(f -> !partitionCols.contains(f.name()))
- .toArray(org.apache.spark.sql.types.StructField[]::new));
-
- SparkSession session =
- SparkSession.getActiveSession()
- .getOrElse(
- () -> {
- throw new IllegalStateException(
- "SparkSession not active (batch write needs it for Parquet)");
- });
-
- Job job;
- try {
- job = Job.getInstance(hadoopConf);
- } catch (IOException e) {
- throw new UncheckedIOException("Failed to create Hadoop job for Parquet write", e);
- }
- // TODO: support write-time CDF on batch writes.
- DeltaParquetFileFormatV2 format =
- PartitionUtils.createDeltaParquetFileFormat(
- initialSnapshot,
- tablePath,
- /* optimizationsEnabled */ true,
- /* useMetadataRowIndex */ Option.empty(),
- /* isCDCRead */ false);
- org.apache.spark.sql.execution.datasources.DataSourceUtils.checkFieldNames(format, dataSchema);
- Map options = writeInfo.options().asCaseSensitiveMap();
- scala.collection.immutable.Map scalaOpts =
- ScalaUtils.toScalaMap(options != null ? options : Collections.emptyMap());
- this.outputWriterFactory = format.prepareWrite(session, job, scalaOpts, dataSchema);
- this.serializableHadoopConf = new SerializableConfiguration(job.getConfiguration());
- }
-
- @Override
- public BatchWrite toBatch() {
- return this;
+ this.initialSnapshot = initialSnapshot;
+ this.dataWriterFactory = dataWriterFactory;
}
@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo physicalWriteInfo) {
- return new DeltaV2DataWriterFactory(
- targetDirectory,
- serializableHadoopConf,
- serializedTxnState,
- dataSchema,
- outputWriterFactory);
+ return dataWriterFactory;
}
@Override
public void commit(WriterCommitMessage[] messages) {
- List allActionRows = new ArrayList<>();
- for (WriterCommitMessage msg : messages) {
- if (msg instanceof DeltaV2WriterCommitMessage) {
- for (SerializableKernelRowWrapper wrapper :
- ((DeltaV2WriterCommitMessage) msg).getActionRows()) {
- allActionRows.add(wrapper.getRow());
- }
- }
- }
-
- CloseableIterable dataActions =
- CloseableIterable.inMemoryIterable(Utils.toCloseableIterator(allActionRows.iterator()));
-
- TransactionCommitResult result = transaction.commit(engine, dataActions);
- LOG.info("DSv2 batch write committed at version {}", result.getVersion());
+ Transaction txn =
+ initialSnapshot.buildUpdateTableTransaction(getEngineInfo(), Operation.WRITE).build(engine);
+ CloseableIterable dataActions = DeltaV2WriterCommitMessage.toDataActions(messages);
+ TransactionCommitResult result = txn.commit(engine, dataActions);
+ logger.info("DSv2 batch write committed at version {}", result.getVersion());
}
@Override
public void abort(WriterCommitMessage[] messages) {
- LOG.warn(
+ logger.warn(
"DSv2 batch write aborted. {} task messages will not be committed. "
+ "Orphaned data files will be cleaned up by VACUUM.",
messages != null ? messages.length : 0);
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2DataWriterFactory.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2DataWriterFactory.java
index 223dc0e4ea7..91b244605e3 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2DataWriterFactory.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2DataWriterFactory.java
@@ -20,15 +20,20 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import org.apache.spark.sql.execution.datasources.OutputWriterFactory;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
/**
* Serializable factory sent to executors to create {@link DeltaV2DataWriter} instances. Carries the
- * serialized Hadoop config and Kernel transaction state needed by each writer.
+ * serialized Hadoop config and Kernel transaction state needed by each writer. Serves both batch
+ * and streaming writes -- the {@link DeltaV2DataWriter} is epoch-agnostic, so the streaming
+ * overload ignores {@code epochId} (the epoch only scopes the driver-side commit, not the per-task
+ * write).
*/
-class DeltaV2DataWriterFactory implements DataWriterFactory, Serializable {
+class DeltaV2DataWriterFactory
+ implements DataWriterFactory, StreamingDataWriterFactory, Serializable {
private final String targetDirectory;
private final SerializableConfiguration hadoopConf;
@@ -60,4 +65,9 @@ public DataWriter createWriter(int partitionId, long taskId) {
partitionId,
taskId);
}
+
+ @Override
+ public DataWriter createWriter(int partitionId, long taskId, long epochId) {
+ return createWriter(partitionId, taskId);
+ }
}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2StreamingWrite.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2StreamingWrite.java
new file mode 100644
index 00000000000..06766eaeb50
--- /dev/null
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2StreamingWrite.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import static java.util.Objects.requireNonNull;
+
+import io.delta.kernel.Operation;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Transaction;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.ConcurrentTransactionException;
+import io.delta.kernel.utils.CloseableIterable;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StreamingWrite for DSv2 streaming Append. The shared executor write state ({@link
+ * DeltaV2DataWriterFactory}) is built by {@link DeltaV2Write}; this class adds only the driver-side
+ * commit, which runs once per micro-batch (epoch) with an idempotent transaction.
+ *
+ * Idempotency: each epoch commits a transaction built with {@code
+ * withTransactionId(queryId, epochId)}. A duplicate epoch makes Kernel throw {@link
+ * ConcurrentTransactionException}, which is caught here and treated as a successful idempotent skip
+ * (the Kernel equivalent of the V1 sink's {@code SetTransaction} version check).
+ */
+class DeltaV2StreamingWrite implements StreamingWrite {
+
+ private static final Logger logger = LoggerFactory.getLogger(DeltaV2StreamingWrite.class);
+
+ private static String getEngineInfo() {
+ return "Delta-Spark-DSv2-Streaming/" + org.apache.spark.package$.MODULE$.SPARK_VERSION();
+ }
+
+ private final Engine engine;
+ private final Snapshot initialSnapshot;
+ private final String queryId;
+ private final DeltaV2DataWriterFactory dataWriterFactory;
+
+ DeltaV2StreamingWrite(
+ Engine engine,
+ Snapshot initialSnapshot,
+ String queryId,
+ DeltaV2DataWriterFactory dataWriterFactory) {
+ this.engine = requireNonNull(engine, "engine is null");
+ this.initialSnapshot = requireNonNull(initialSnapshot, "initialSnapshot is null");
+ this.queryId = requireNonNull(queryId, "queryId is null");
+ this.dataWriterFactory = requireNonNull(dataWriterFactory, "dataWriterFactory is null");
+ }
+
+ @Override
+ public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) {
+ return dataWriterFactory;
+ }
+
+ @Override
+ public boolean useCommitCoordinator() {
+ return false;
+ }
+
+ @Override
+ public void commit(long epochId, WriterCommitMessage[] messages) {
+ final Transaction txn;
+ try {
+ txn =
+ initialSnapshot
+ .buildUpdateTableTransaction(getEngineInfo(), Operation.STREAMING_UPDATE)
+ .withTransactionId(queryId, epochId)
+ .build(engine);
+ } catch (ConcurrentTransactionException e) {
+ logger.info("Skipping already committed epoch {} for query {}", epochId, queryId);
+ return;
+ }
+
+ CloseableIterable dataActions = DeltaV2WriterCommitMessage.toDataActions(messages);
+ try {
+ long version = txn.commit(engine, dataActions).getVersion();
+ logger.info(
+ "DSv2 streaming epoch {} for query {} committed at version {}",
+ epochId,
+ queryId,
+ version);
+ } catch (ConcurrentTransactionException e) {
+ logger.info("Skipping already committed epoch {} for query {}", epochId, queryId);
+ }
+ }
+
+ @Override
+ public void abort(long epochId, WriterCommitMessage[] messages) {
+ logger.warn(
+ "DSv2 streaming epoch {} for query {} aborted; {} task message(s) not committed. "
+ + "Orphaned data files will be cleaned up by VACUUM.",
+ epochId,
+ queryId,
+ messages != null ? messages.length : 0);
+ }
+}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2Write.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2Write.java
new file mode 100644
index 00000000000..c11e2a389e4
--- /dev/null
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2Write.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import static java.util.Objects.requireNonNull;
+
+import io.delta.kernel.Operation;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Transaction;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.engine.Engine;
+import io.delta.spark.internal.v2.read.DeltaParquetFileFormatV2;
+import io.delta.spark.internal.v2.utils.PartitionUtils;
+import io.delta.spark.internal.v2.utils.ScalaUtils;
+import io.delta.spark.internal.v2.utils.SerializableKernelRowWrapper;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.apache.spark.util.SerializableConfiguration;
+import scala.Option;
+
+/**
+ * The DSv2 {@link Write} for Delta. Builds the executor-side write state shared by batch and
+ * streaming -- the {@link DeltaV2DataWriterFactory} (serialized transaction state, target
+ * directory, Spark Parquet {@link OutputWriterFactory}, Hadoop conf) -- once, and hands it
+ * to the concrete {@link DeltaV2BatchWrite} / {@link DeltaV2StreamingWrite}, which add only their
+ * mode-specific commit. The transaction state is operation-independent (it carries schema /
+ * protocol / path, not the {@code Operation}), so the same state serves both batch and streaming.
+ *
+ * Only append is supported; the builder advertises no overwrite capability, so Spark does not
+ * route overwrite, truncate, Complete, or Update here.
+ */
+class DeltaV2Write implements Write {
+
+ // Streaming-write options the sink does not honor; rejected rather than silently ignored.
+ private static final List UNSUPPORTED_STREAMING_OPTIONS =
+ Arrays.asList(
+ "mergeSchema",
+ "overwriteSchema",
+ "replaceWhere",
+ "replaceOn",
+ "replaceUsing",
+ "partitionOverwriteMode",
+ "txnAppId",
+ "txnVersion",
+ "userMetadata");
+
+ private static String getEngineInfo() {
+ return "Delta-Spark-DSv2/" + org.apache.spark.package$.MODULE$.SPARK_VERSION();
+ }
+
+ private final Engine engine;
+ private final Configuration hadoopConf;
+ private final String tablePath;
+ private final Snapshot initialSnapshot;
+ private final StructType dataSchema;
+ private final String queryId;
+ private final LogicalWriteInfo writeInfo;
+
+ DeltaV2Write(
+ Engine engine,
+ Configuration hadoopConf,
+ String tablePath,
+ Snapshot initialSnapshot,
+ StructType dataSchema,
+ LogicalWriteInfo writeInfo) {
+ this.engine = requireNonNull(engine, "engine is null");
+ this.hadoopConf = requireNonNull(hadoopConf, "hadoopConf is null");
+ this.tablePath = requireNonNull(tablePath, "tablePath is null");
+ this.initialSnapshot = requireNonNull(initialSnapshot, "initialSnapshot is null");
+ this.dataSchema = requireNonNull(dataSchema, "dataSchema is null");
+ this.writeInfo = requireNonNull(writeInfo, "writeInfo is null");
+ this.queryId = requireNonNull(writeInfo.queryId(), "queryId is null");
+ }
+
+ @Override
+ public BatchWrite toBatch() {
+ return asBatchAppend();
+ }
+
+ @Override
+ public StreamingWrite toStreaming() {
+ rejectUnsupportedStreamingOptions();
+ return asStreamingAppend();
+ }
+
+ private BatchWrite asBatchAppend() {
+ return new DeltaV2BatchWrite(engine, initialSnapshot, buildDataWriterFactory(Operation.WRITE));
+ }
+
+ private StreamingWrite asStreamingAppend() {
+ return new DeltaV2StreamingWrite(
+ engine, initialSnapshot, queryId, buildDataWriterFactory(Operation.STREAMING_UPDATE));
+ }
+
+ /**
+ * Builds the executor-side write state shared by batch and streaming: the serialized transaction
+ * state and target directory from a Kernel write context, plus the Spark Parquet
+ * OutputWriterFactory. Built when the concrete write is selected (after streaming-option
+ * rejection), not in the constructor, so unsupported options are rejected before their values are
+ * parsed. The transaction built here is used only to extract that state and is then discarded --
+ * each concrete write builds its own transaction for the actual commit. {@code operation} is the
+ * write mode's operation ({@code WRITE} / {@code STREAMING_UPDATE}); the extracted state is
+ * operation-independent, so this only keeps the throwaway transaction consistent with its mode.
+ */
+ private DeltaV2DataWriterFactory buildDataWriterFactory(Operation operation) {
+ Transaction stateTxn =
+ requireNonNull(
+ initialSnapshot.buildUpdateTableTransaction(getEngineInfo(), operation).build(engine),
+ "stateTxn is null");
+ Row txnState = requireNonNull(stateTxn.getTransactionState(engine), "txnState is null");
+ SerializableKernelRowWrapper serializedTxnState = new SerializableKernelRowWrapper(txnState);
+ String targetDirectory =
+ requireNonNull(
+ Transaction.getWriteContext(engine, txnState, Collections.emptyMap())
+ .getTargetDirectory(),
+ "targetDirectory is null");
+
+ SparkSession session =
+ SparkSession.getActiveSession()
+ .getOrElse(
+ () -> {
+ throw new IllegalStateException(
+ "SparkSession not active (write needs it for Parquet)");
+ });
+
+ Job job;
+ try {
+ job = Job.getInstance(hadoopConf);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create Hadoop job for Parquet write", e);
+ }
+ DeltaParquetFileFormatV2 format =
+ PartitionUtils.createDeltaParquetFileFormat(
+ initialSnapshot,
+ tablePath,
+ /* optimizationsEnabled */ true,
+ /* useMetadataRowIndex */ Option.empty(),
+ /* isCDCRead */ false);
+ org.apache.spark.sql.execution.datasources.DataSourceUtils.checkFieldNames(format, dataSchema);
+ Map options = writeInfo.options().asCaseSensitiveMap();
+ scala.collection.immutable.Map scalaOpts =
+ ScalaUtils.toScalaMap(options != null ? options : Collections.emptyMap());
+ OutputWriterFactory outputWriterFactory =
+ requireNonNull(
+ format.prepareWrite(session, job, scalaOpts, dataSchema),
+ "outputWriterFactory is null");
+ SerializableConfiguration serializableHadoopConf =
+ new SerializableConfiguration(job.getConfiguration());
+
+ return new DeltaV2DataWriterFactory(
+ targetDirectory,
+ serializableHadoopConf,
+ serializedTxnState,
+ dataSchema,
+ outputWriterFactory);
+ }
+
+ private void rejectUnsupportedStreamingOptions() {
+ CaseInsensitiveStringMap options = writeInfo.options();
+ List rejected = new ArrayList<>();
+ for (String key : UNSUPPORTED_STREAMING_OPTIONS) {
+ if (options.containsKey(key)) {
+ rejected.add(key);
+ }
+ }
+ if (!rejected.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "DSv2 streaming writes to Delta do not support the option(s): "
+ + rejected
+ + ". Use the V1 write path (format(\"delta\")) if you need them.");
+ }
+ }
+}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriteBuilder.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriteBuilder.java
index dd15e2bf88c..a0593cf23ee 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriteBuilder.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriteBuilder.java
@@ -31,12 +31,16 @@
import org.apache.spark.sql.types.StructType;
/**
- * WriteBuilder for DSv2 batch writes to Delta tables. Mirrors the read-side {@code
- * SparkScanBuilder} pattern: takes table-level state and Spark's {@link LogicalWriteInfo}, and
- * builds a {@link DeltaV2BatchWrite} (which implements both Write and BatchWrite).
+ * WriteBuilder for DSv2 writes to Delta tables. Mirrors the read-side {@code SparkScanBuilder}
+ * pattern: takes table-level state and Spark's {@link LogicalWriteInfo}, and builds a {@link
+ * DeltaV2Write}.
*
* Schema validation uses the shared V1 utility {@code SchemaMergingUtils.mergeSchemas} to check
* type compatibility and reject duplicate columns before the write proceeds.
+ *
+ *
Only append is supported: this builder implements no overwrite capability ({@code
+ * SupportsTruncate} / {@code SupportsOverwrite} / {@code SupportsStreamingUpdateAsAppend}), so
+ * Spark rejects Complete, Update, and overwrite writes before reaching it.
*/
public class DeltaV2WriteBuilder implements WriteBuilder {
@@ -44,6 +48,7 @@ public class DeltaV2WriteBuilder implements WriteBuilder {
private final String tablePath;
private final Configuration hadoopConf;
private final Snapshot initialSnapshot;
+ private final StructType dataSchema;
private final LogicalWriteInfo writeInfo;
/**
@@ -51,6 +56,7 @@ public class DeltaV2WriteBuilder implements WriteBuilder {
* @param tablePath filesystem path to the Delta table root
* @param hadoopConf Hadoop configuration (with merged table options)
* @param initialSnapshot Kernel snapshot loaded at table construction time
+ * @param dataSchema the table's data (non-partition) schema, from DeltaV2Table's SchemaProvider
* @param writeInfo Spark's logical write info (schema, queryId, options)
*/
public DeltaV2WriteBuilder(
@@ -58,11 +64,13 @@ public DeltaV2WriteBuilder(
String tablePath,
Configuration hadoopConf,
Snapshot initialSnapshot,
+ StructType dataSchema,
LogicalWriteInfo writeInfo) {
this.engine = requireNonNull(engine, "engine is null");
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.hadoopConf = requireNonNull(hadoopConf, "hadoopConf is null");
this.initialSnapshot = requireNonNull(initialSnapshot, "initialSnapshot is null");
+ this.dataSchema = requireNonNull(dataSchema, "dataSchema is null");
this.writeInfo = requireNonNull(writeInfo, "writeInfo is null");
}
@@ -101,6 +109,6 @@ public Write build() {
TypeWideningMode.NoTypeWidening$.MODULE$,
/* caseSensitive */ false);
- return new DeltaV2BatchWrite(engine, hadoopConf, tablePath, initialSnapshot, writeInfo);
+ return new DeltaV2Write(engine, hadoopConf, tablePath, initialSnapshot, dataSchema, writeInfo);
}
}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriterCommitMessage.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriterCommitMessage.java
index a682216cfb0..b128c5c790a 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriterCommitMessage.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/write/DeltaV2WriterCommitMessage.java
@@ -15,7 +15,11 @@
*/
package io.delta.spark.internal.v2.write;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.utils.CloseableIterable;
import io.delta.spark.internal.v2.utils.SerializableKernelRowWrapper;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
@@ -36,4 +40,21 @@ class DeltaV2WriterCommitMessage implements WriterCommitMessage {
List getActionRows() {
return Collections.unmodifiableList(actionRows);
}
+
+ /**
+ * Flattens the Delta log action rows from every task's commit message into the {@link
+ * CloseableIterable} that {@link io.delta.kernel.Transaction#commit} expects. Shared by the batch
+ * and streaming commits.
+ */
+ static CloseableIterable toDataActions(WriterCommitMessage[] messages) {
+ List allActionRows = new ArrayList<>();
+ for (WriterCommitMessage msg : messages) {
+ if (msg instanceof DeltaV2WriterCommitMessage) {
+ for (SerializableKernelRowWrapper wrapper : ((DeltaV2WriterCommitMessage) msg).actionRows) {
+ allActionRows.add(wrapper.getRow());
+ }
+ }
+ }
+ return CloseableIterable.inMemoryIterable(Utils.toCloseableIterator(allActionRows.iterator()));
+ }
}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2BatchWriteTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2BatchWriteTest.java
new file mode 100644
index 00000000000..9f7ec9f9670
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2BatchWriteTest.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.kernel.Snapshot;
+import io.delta.spark.internal.v2.DeltaV2TestBase;
+import io.delta.spark.internal.v2.InternalRowTestUtils;
+import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
+import java.io.File;
+import java.util.List;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/** Unit tests for {@link DeltaV2BatchWrite}; the batch counterpart of DeltaV2StreamingWriteTest. */
+public class DeltaV2BatchWriteTest extends DeltaV2TestBase {
+
+ private static final StructType TABLE_SCHEMA =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("name", DataTypes.StringType, true)
+ });
+
+ @Test
+ public void testCreateBatchWriterFactory_returnsDeltaV2DataWriterFactory(@TempDir File tempDir) {
+ DeltaV2BatchWrite write = newWrite(createTable(tempDir, "batch_factory_type"));
+ assertTrue(
+ write.createBatchWriterFactory(WriteTestUtils.physicalWriteInfo(1))
+ instanceof DeltaV2DataWriterFactory);
+ }
+
+ @Test
+ public void testCommit_appendsData(@TempDir File tempDir) throws Exception {
+ String path = createTable(tempDir, "batch_commit");
+ DeltaV2BatchWrite write = newWrite(path);
+ WriterCommitMessage[] messages = {writeFile(write, 1, "Alice", 2, "Bob")};
+
+ write.commit(messages);
+
+ List rows = spark.read().format("delta").load(path).orderBy("id").collectAsList();
+ assertEquals(2, rows.size());
+ assertEquals("Alice", rows.get(0).getString(1));
+ assertEquals("Bob", rows.get(1).getString(1));
+ }
+
+ @Test
+ public void testAbort_doesNotCommit(@TempDir File tempDir) throws Exception {
+ String path = createTable(tempDir, "batch_abort");
+ DeltaV2BatchWrite write = newWrite(path);
+ WriterCommitMessage[] messages = {writeFile(write, 1, "Alice", 2, "Bob")};
+
+ write.abort(messages);
+
+ assertEquals(0L, spark.read().format("delta").load(path).count(), "abort must not commit data");
+ }
+
+ /** Runs the executor-side writer and returns its commit message. */
+ private WriterCommitMessage writeFile(DeltaV2BatchWrite write, Object... idNamePairs)
+ throws Exception {
+ DataWriter writer =
+ write.createBatchWriterFactory(WriteTestUtils.physicalWriteInfo(1)).createWriter(0, 0L);
+ for (int i = 0; i < idNamePairs.length; i += 2) {
+ writer.write(InternalRowTestUtils.row(idNamePairs[i], idNamePairs[i + 1]));
+ }
+ return writer.commit();
+ }
+
+ private String createTable(File tempDir, String tableName) {
+ String path = tempDir.getAbsolutePath();
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id INT, name STRING) USING delta LOCATION '%s'", tableName, path));
+ return path;
+ }
+
+ private DeltaV2BatchWrite newWrite(String path) {
+ Snapshot snapshot =
+ new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf())
+ .loadLatestSnapshot();
+ LogicalWriteInfo info =
+ WriteTestUtils.logicalWriteInfo(TABLE_SCHEMA, CaseInsensitiveStringMap.empty());
+ DeltaV2Write write =
+ new DeltaV2Write(
+ defaultEngine,
+ spark.sessionState().newHadoopConf(),
+ path,
+ snapshot,
+ TABLE_SCHEMA,
+ info);
+ return (DeltaV2BatchWrite) write.toBatch();
+ }
+}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2StreamingWriteTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2StreamingWriteTest.java
new file mode 100644
index 00000000000..6ba92bb8ac3
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2StreamingWriteTest.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.kernel.Snapshot;
+import io.delta.spark.internal.v2.DeltaV2TestBase;
+import io.delta.spark.internal.v2.InternalRowTestUtils;
+import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
+import java.io.File;
+import java.util.List;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Unit tests for {@link DeltaV2StreamingWrite}; the write-side counterpart of
+ * SparkMicroBatchStreamTest.
+ */
+public class DeltaV2StreamingWriteTest extends DeltaV2TestBase {
+
+ private static final StructType TABLE_SCHEMA =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("name", DataTypes.StringType, true)
+ });
+
+ @Test
+ public void testUseCommitCoordinator_isFalse(@TempDir File tempDir) {
+ DeltaV2StreamingWrite write = newWrite(createTable(tempDir, "streaming_no_coordinator"));
+ assertFalse(write.useCommitCoordinator());
+ }
+
+ @Test
+ public void testCreateStreamingWriterFactory_returnsDeltaV2DataWriterFactory(
+ @TempDir File tempDir) {
+ DeltaV2StreamingWrite write = newWrite(createTable(tempDir, "streaming_factory_type"));
+ StreamingDataWriterFactory factory =
+ write.createStreamingWriterFactory(WriteTestUtils.physicalWriteInfo(1));
+ assertTrue(factory instanceof DeltaV2DataWriterFactory);
+ }
+
+ /**
+ * Distinct epochs each append: epoch 0 and epoch 1 accumulate, and the rows are the ones written.
+ */
+ @Test
+ public void testCommit_distinctEpochsAppend(@TempDir File tempDir) throws Exception {
+ String path = createTable(tempDir, "streaming_distinct_epochs");
+ DeltaV2StreamingWrite write = newWrite(path);
+
+ write.commit(0L, new WriterCommitMessage[] {writeEpoch(write, 0L, 1, "Alice", 2, "Bob")});
+ write.commit(1L, new WriterCommitMessage[] {writeEpoch(write, 1L, 3, "Carol", 4, "Dave")});
+
+ List rows = spark.read().format("delta").load(path).orderBy("id").collectAsList();
+ assertEquals(4, rows.size(), "distinct epochs must accumulate");
+ assertEquals("Alice", rows.get(0).getString(1));
+ assertEquals("Bob", rows.get(1).getString(1));
+ assertEquals("Carol", rows.get(2).getString(1));
+ assertEquals("Dave", rows.get(3).getString(1));
+ }
+
+ /**
+ * Replaying an already-committed epoch must be an idempotent no-op: the data is committed once,
+ * and re-committing the same epoch does not duplicate it (the {@code withTransactionId} guard).
+ */
+ @Test
+ public void testCommit_isIdempotentOnEpochReplay(@TempDir File tempDir) throws Exception {
+ String path = createTable(tempDir, "streaming_idempotent_replay");
+ DeltaV2StreamingWrite write = newWrite(path);
+ WriterCommitMessage[] messages = {writeEpoch(write, 0L, 1, "Alice", 2, "Bob")};
+
+ write.commit(0L, messages);
+ long rowsAfterFirstCommit = spark.read().format("delta").load(path).count();
+
+ write.commit(0L, messages); // replay the same epoch
+ long rowsAfterReplay = spark.read().format("delta").load(path).count();
+
+ assertEquals(2L, rowsAfterFirstCommit);
+ assertEquals(
+ 2L, rowsAfterReplay, "replaying an already-committed epoch must not duplicate data");
+ }
+
+ /**
+ * Aborting an epoch commits nothing: the written files are left orphaned, not added to the table.
+ */
+ @Test
+ public void testAbort_doesNotCommit(@TempDir File tempDir) throws Exception {
+ String path = createTable(tempDir, "streaming_abort");
+ DeltaV2StreamingWrite write = newWrite(path);
+ WriterCommitMessage[] messages = {writeEpoch(write, 0L, 1, "Alice", 2, "Bob")};
+
+ write.abort(0L, messages);
+
+ assertEquals(0L, spark.read().format("delta").load(path).count(), "abort must not commit data");
+ }
+
+ /**
+ * Runs the executor-side writer for one epoch and returns its commit message. {@code idNamePairs}
+ * is a flat list of (int id, String name) values.
+ */
+ private WriterCommitMessage writeEpoch(
+ DeltaV2StreamingWrite write, long epochId, Object... idNamePairs) throws Exception {
+ DataWriter writer =
+ write
+ .createStreamingWriterFactory(WriteTestUtils.physicalWriteInfo(1))
+ .createWriter(0, 0L, epochId);
+ for (int i = 0; i < idNamePairs.length; i += 2) {
+ writer.write(InternalRowTestUtils.row(idNamePairs[i], idNamePairs[i + 1]));
+ }
+ return writer.commit();
+ }
+
+ private String createTable(File tempDir, String tableName) {
+ String path = tempDir.getAbsolutePath();
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id INT, name STRING) USING delta LOCATION '%s'", tableName, path));
+ return path;
+ }
+
+ private DeltaV2StreamingWrite newWrite(String path) {
+ Snapshot snapshot =
+ new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf())
+ .loadLatestSnapshot();
+ LogicalWriteInfo info =
+ WriteTestUtils.logicalWriteInfo(TABLE_SCHEMA, CaseInsensitiveStringMap.empty());
+ DeltaV2Write write =
+ new DeltaV2Write(
+ defaultEngine,
+ spark.sessionState().newHadoopConf(),
+ path,
+ snapshot,
+ TABLE_SCHEMA,
+ info);
+ return (DeltaV2StreamingWrite) write.toStreaming();
+ }
+}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriteBuilderTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriteBuilderTest.java
new file mode 100644
index 00000000000..f8a79b6435f
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriteBuilderTest.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.kernel.Snapshot;
+import io.delta.spark.internal.v2.DeltaV2TestBase;
+import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
+import java.io.File;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Unit tests for {@link DeltaV2WriteBuilder}; the write-side counterpart of SparkScanBuilderTest.
+ */
+public class DeltaV2WriteBuilderTest extends DeltaV2TestBase {
+
+ private static final StructType TABLE_SCHEMA =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("name", DataTypes.StringType, true)
+ });
+
+ @Test
+ public void testBuild_returnsWrite(@TempDir File tempDir) {
+ String path = tempDir.getAbsolutePath();
+ spark.sql(
+ String.format(
+ "CREATE TABLE build_returns_write (id INT, name STRING) USING delta LOCATION '%s'",
+ path));
+
+ Write write = newBuilder(path, TABLE_SCHEMA, CaseInsensitiveStringMap.empty()).build();
+
+ assertNotNull(write);
+ assertTrue(write instanceof DeltaV2Write);
+ }
+
+ @Test
+ public void testBuild_rejectsColumnMappedTable(@TempDir File tempDir) {
+ String path = tempDir.getAbsolutePath();
+ spark.sql(
+ String.format(
+ "CREATE TABLE build_rejects_cm (id INT, name STRING) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES ('delta.columnMapping.mode' = 'name')",
+ path));
+
+ DeltaV2WriteBuilder builder = newBuilder(path, TABLE_SCHEMA, CaseInsensitiveStringMap.empty());
+
+ assertThrows(UnsupportedOperationException.class, builder::build);
+ }
+
+ @Test
+ public void testBuild_rejectsIncompatibleSchema(@TempDir File tempDir) {
+ String path = tempDir.getAbsolutePath();
+ spark.sql(
+ String.format(
+ "CREATE TABLE build_rejects_schema (id INT, name STRING) USING delta LOCATION '%s'",
+ path));
+
+ // Write schema declares `id` as STRING while the table has INT -- an incompatible type with no
+ // implicit conversion, which mergeSchemas rejects.
+ StructType incompatible =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.StringType, true),
+ DataTypes.createStructField("name", DataTypes.StringType, true)
+ });
+ DeltaV2WriteBuilder builder = newBuilder(path, incompatible, CaseInsensitiveStringMap.empty());
+
+ assertThrows(AnalysisException.class, builder::build);
+ }
+
+ private DeltaV2WriteBuilder newBuilder(
+ String path, StructType writeSchema, CaseInsensitiveStringMap options) {
+ Snapshot snapshot =
+ new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf())
+ .loadLatestSnapshot();
+ return new DeltaV2WriteBuilder(
+ defaultEngine,
+ path,
+ spark.sessionState().newHadoopConf(),
+ snapshot,
+ TABLE_SCHEMA,
+ WriteTestUtils.logicalWriteInfo(writeSchema, options));
+ }
+}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriteTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriteTest.java
new file mode 100644
index 00000000000..db4cda20675
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriteTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.kernel.Snapshot;
+import io.delta.spark.internal.v2.DeltaV2TestBase;
+import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
+import java.io.File;
+import java.util.Map;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/** Unit tests for {@link DeltaV2Write}; the write-side counterpart of SparkScanTest. */
+public class DeltaV2WriteTest extends DeltaV2TestBase {
+
+ private static final StructType TABLE_SCHEMA =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("name", DataTypes.StringType, true)
+ });
+
+ /** Options that change write semantics and are rejected by the streaming append path. */
+ private static final String[] UNSUPPORTED_STREAMING_OPTIONS = {
+ "mergeSchema",
+ "overwriteSchema",
+ "replaceWhere",
+ "replaceOn",
+ "replaceUsing",
+ "partitionOverwriteMode",
+ "txnAppId",
+ "txnVersion",
+ "userMetadata"
+ };
+
+ @Test
+ public void testToBatch_returnsDeltaV2BatchWrite(@TempDir File tempDir) {
+ String path = createTable(tempDir, "write_to_batch");
+ DeltaV2Write write = newWrite(path, CaseInsensitiveStringMap.empty());
+
+ assertTrue(write.toBatch() instanceof DeltaV2BatchWrite);
+ }
+
+ @Test
+ public void testToStreaming_returnsDeltaV2StreamingWrite(@TempDir File tempDir) {
+ String path = createTable(tempDir, "write_to_streaming");
+ DeltaV2Write write = newWrite(path, CaseInsensitiveStringMap.empty());
+
+ assertTrue(write.toStreaming() instanceof DeltaV2StreamingWrite);
+ }
+
+ @Test
+ public void testToStreaming_rejectsUnsupportedOptions(@TempDir File tempDir) {
+ String path = createTable(tempDir, "write_rejects_options");
+
+ for (String option : UNSUPPORTED_STREAMING_OPTIONS) {
+ CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(Map.of(option, "x"));
+ DeltaV2Write write = newWrite(path, options);
+ assertThrows(
+ UnsupportedOperationException.class,
+ write::toStreaming,
+ "streaming write should reject option: " + option);
+ }
+ }
+
+ private String createTable(File tempDir, String tableName) {
+ String path = tempDir.getAbsolutePath();
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id INT, name STRING) USING delta LOCATION '%s'", tableName, path));
+ return path;
+ }
+
+ private DeltaV2Write newWrite(String path, CaseInsensitiveStringMap options) {
+ Snapshot snapshot =
+ new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf())
+ .loadLatestSnapshot();
+ LogicalWriteInfo info = WriteTestUtils.logicalWriteInfo(TABLE_SCHEMA, options);
+ return new DeltaV2Write(
+ defaultEngine, spark.sessionState().newHadoopConf(), path, snapshot, TABLE_SCHEMA, info);
+ }
+}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriterCommitMessageTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriterCommitMessageTest.java
new file mode 100644
index 00000000000..fed12e1fa69
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/DeltaV2WriterCommitMessageTest.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.spark.internal.v2.DeltaV2TestBase;
+import io.delta.spark.internal.v2.InternalRowTestUtils;
+import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
+import java.io.File;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/** Unit tests for {@link DeltaV2WriterCommitMessage}, focused on {@code toDataActions}. */
+public class DeltaV2WriterCommitMessageTest extends DeltaV2TestBase {
+
+ private static final StructType TABLE_SCHEMA =
+ DataTypes.createStructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.IntegerType, true),
+ DataTypes.createStructField("name", DataTypes.StringType, true)
+ });
+
+ @Test
+ public void testConstructor_nullActionRows_isEmpty() {
+ assertEquals(0, new DeltaV2WriterCommitMessage(null).getActionRows().size());
+ }
+
+ @Test
+ public void testToDataActions_noMessages_isEmpty() {
+ assertEquals(
+ 0, countActions(DeltaV2WriterCommitMessage.toDataActions(new WriterCommitMessage[0])));
+ }
+
+ @Test
+ public void testToDataActions_ignoresNonDeltaMessages() {
+ WriterCommitMessage other = new WriterCommitMessage() {};
+ assertEquals(
+ 0,
+ countActions(DeltaV2WriterCommitMessage.toDataActions(new WriterCommitMessage[] {other})));
+ }
+
+ @Test
+ public void testToDataActions_flattensActionsFromAllMessages(@TempDir File tempDir)
+ throws Exception {
+ String path = createTable(tempDir, "commit_msg_flatten");
+ DeltaV2DataWriterFactory factory = dataWriterFactory(path);
+
+ // Two tasks, one file (one AddFile action) each; toDataActions must combine both.
+ WriterCommitMessage m1 = writeFile(factory, 0, 1, "Alice");
+ WriterCommitMessage m2 = writeFile(factory, 1, 2, "Bob");
+
+ assertEquals(
+ 2,
+ countActions(DeltaV2WriterCommitMessage.toDataActions(new WriterCommitMessage[] {m1, m2})));
+ }
+
+ private static int countActions(CloseableIterable actions) {
+ int n = 0;
+ CloseableIterator it = actions.iterator();
+ try {
+ while (it.hasNext()) {
+ it.next();
+ n++;
+ }
+ } finally {
+ try {
+ it.close();
+ } catch (Exception ignored) {
+ // Test cleanup best-effort.
+ }
+ }
+ return n;
+ }
+
+ private WriterCommitMessage writeFile(
+ DeltaV2DataWriterFactory factory, int partitionId, Object... idNamePairs) throws Exception {
+ DataWriter writer = factory.createWriter(partitionId, 0L);
+ for (int i = 0; i < idNamePairs.length; i += 2) {
+ writer.write(InternalRowTestUtils.row(idNamePairs[i], idNamePairs[i + 1]));
+ }
+ return writer.commit();
+ }
+
+ private DeltaV2DataWriterFactory dataWriterFactory(String path) {
+ Snapshot snapshot =
+ new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf())
+ .loadLatestSnapshot();
+ DeltaV2Write write =
+ new DeltaV2Write(
+ defaultEngine,
+ spark.sessionState().newHadoopConf(),
+ path,
+ snapshot,
+ TABLE_SCHEMA,
+ WriteTestUtils.logicalWriteInfo(TABLE_SCHEMA, CaseInsensitiveStringMap.empty()));
+ return (DeltaV2DataWriterFactory)
+ write.toBatch().createBatchWriterFactory(WriteTestUtils.physicalWriteInfo(1));
+ }
+
+ private String createTable(File tempDir, String tableName) {
+ String path = tempDir.getAbsolutePath();
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id INT, name STRING) USING delta LOCATION '%s'", tableName, path));
+ return path;
+ }
+}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/write/WriteTestUtils.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/WriteTestUtils.java
new file mode 100644
index 00000000000..98dfadcc17c
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/write/WriteTestUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.write;
+
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/** Shared helpers for the write-side unit tests. */
+final class WriteTestUtils {
+
+ private WriteTestUtils() {}
+
+ /**
+ * Builds a {@link LogicalWriteInfo}. Spark's {@code LogicalWriteInfoImpl} is {@code private[sql]}
+ * and not reachable from this package, so we implement the interface directly.
+ */
+ static LogicalWriteInfo logicalWriteInfo(StructType schema, CaseInsensitiveStringMap options) {
+ return new LogicalWriteInfo() {
+ @Override
+ public String queryId() {
+ return "test-query-id";
+ }
+
+ @Override
+ public StructType schema() {
+ return schema;
+ }
+
+ @Override
+ public CaseInsensitiveStringMap options() {
+ return options;
+ }
+ };
+ }
+
+ static PhysicalWriteInfo physicalWriteInfo(int numPartitions) {
+ return () -> numPartitions;
+ }
+}