diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala
index df3098777c9..e5d10031482 100644
--- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala
+++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala
@@ -160,7 +160,9 @@ object DeltaV2SourceSuite {
"streaming read preserves percent-literal string partition value",
"initial snapshot: checkpoint resume produces all rows without duplicates",
"initial snapshot: Trigger.AvailableNow processes all data and terminates",
- "initial snapshot: checkpoint resume after new commits produces all rows"
+ "initial snapshot: checkpoint resume after new commits produces all rows",
+ "no schema should throw an exception",
+ "Delta sources should verify the protocol reader version"
)
val FailingTests: Set[String] = Set(
@@ -181,10 +183,6 @@ object DeltaV2SourceSuite {
"incremental: commit file gap between versions, failOnDataLoss=false succeeds",
// === Misc ===
- // TODO(#5900): fix exception mismatch
- "no schema should throw an exception",
- // TODO(#5900): fix exception mismatch
- "Delta sources should verify the protocol reader version",
// TODO(#5895): gracefully handle corrupt checkpoint
"start from corrupt checkpoint",
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java
index 8a1e904951d..b1308fcb7ad 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java
@@ -23,8 +23,11 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.rowtracking.RowTracking;
+import io.delta.spark.internal.v2.exception.KernelExceptionConverter;
+import io.delta.spark.internal.v2.exception.Operation;
import io.delta.spark.internal.v2.read.MetadataEvolutionHandler;
import io.delta.spark.internal.v2.read.SparkScanBuilder;
import io.delta.spark.internal.v2.read.cdc.CDCSchemaContext;
@@ -197,8 +200,14 @@ private DeltaV2Table(
SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
this.kernelEngine = DefaultEngine.create(this.hadoopConf);
this.snapshotManager = SnapshotManagerFactory.create(tablePath, kernelEngine, catalogTable);
- // Load the initial snapshot through the manager
- this.initialSnapshot = snapshotManager.loadLatestSnapshot();
+ Snapshot loadedSnapshot;
+ try {
+ loadedSnapshot = snapshotManager.loadLatestSnapshot();
+ } catch (KernelException e) {
+ // Surface the same exceptions the DSv1 connector raises (delta-io/delta#5900).
+ throw KernelExceptionConverter.translateAndThrow(e, tablePath, Operation.TABLE_RESOLUTION);
+ }
+ this.initialSnapshot = loadedSnapshot;
this.isCDCRead = CDCReader.isCDCRead(new CaseInsensitiveStringMap(this.options));
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/KernelExceptionConverter.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/KernelExceptionConverter.java
new file mode 100644
index 00000000000..d1dc6a3fb11
--- /dev/null
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/KernelExceptionConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.exception;
+
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.exceptions.TableNotFoundException;
+import io.delta.kernel.exceptions.UnsupportedProtocolVersionException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.spark.sql.delta.DeltaErrors;
+import org.apache.spark.sql.delta.InvalidProtocolVersionException;
+import org.apache.spark.sql.delta.actions.Action$;
+
+/**
+ * Translates Delta Kernel exceptions into the Spark/Delta exceptions.
+ *
+ *
Mappings live in a class-to-handler registry, lookup walks the exception's class hierarchy and
+ * kernel exceptions without a registered handler are returned unchanged. To add a mapping, register
+ * one more handler below.
+ */
+public final class KernelExceptionConverter {
+
+ private KernelExceptionConverter() {}
+
+ @FunctionalInterface
+ private interface Handler {
+ Throwable translate(KernelException e, String tableNameOrPath, Operation op);
+ }
+
+ private static final Map, Handler> HANDLERS = new HashMap<>();
+
+ static {
+ HANDLERS.put(TableNotFoundException.class, KernelExceptionConverter::translateTableNotFound);
+ HANDLERS.put(
+ UnsupportedProtocolVersionException.class,
+ KernelExceptionConverter::translateUnsupportedProtocolVersion);
+ }
+
+ /** Returns the Spark/Delta equivalent of {@code e}, or {@code e} unchanged. */
+ public static Throwable convert(KernelException e, String tableNameOrPath, Operation op) {
+ for (Class> c = e.getClass();
+ c != null && KernelException.class.isAssignableFrom(c);
+ c = c.getSuperclass()) {
+ Handler handler = HANDLERS.get(c);
+ if (handler != null) {
+ return handler.translate(e, tableNameOrPath, op);
+ }
+ }
+ return e;
+ }
+
+ /** Converts {@code e} and throws the result. */
+ public static RuntimeException translateAndThrow(
+ KernelException e, String tableNameOrPath, Operation op) {
+ throw sneakyThrow(convert(e, tableNameOrPath, op));
+ }
+
+ private static Throwable translateTableNotFound(
+ KernelException e, String tableNameOrPath, Operation op) {
+ return op == Operation.TABLE_RESOLUTION
+ ? DeltaErrors.schemaNotSetException()
+ : DeltaErrors.pathNotExistsException(tableNameOrPath);
+ }
+
+ private static Throwable translateUnsupportedProtocolVersion(
+ KernelException e, String tableNameOrPath, Operation op) {
+ UnsupportedProtocolVersionException source = (UnsupportedProtocolVersionException) e;
+ boolean isReader =
+ source.getVersionType() == UnsupportedProtocolVersionException.ProtocolVersionType.READER;
+ return new InvalidProtocolVersionException(
+ tableNameOrPath,
+ isReader ? source.getVersion() : 0,
+ isReader ? 0 : source.getVersion(),
+ Action$.MODULE$.supportedReaderVersionNumbers().toList(),
+ Action$.MODULE$.supportedWriterVersionNumbers().toList());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static E sneakyThrow(Throwable t) throws E {
+ throw (E) t;
+ }
+}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/Operation.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/Operation.java
new file mode 100644
index 00000000000..cdf7b61401d
--- /dev/null
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/Operation.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.exception;
+
+/**
+ * The connector operation during which a Kernel exception was raised. Lets {@link
+ * KernelExceptionConverter} map the same Kernel exception to different Spark/Delta exceptions per
+ * operation.
+ */
+public enum Operation {
+ /** Resolving a table reference. */
+ TABLE_RESOLUTION,
+ /** Streaming micro-batch execution. */
+ STREAMING_MICROBATCH
+}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
index 3d019dec030..cfbcba5a7da 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
@@ -28,6 +28,7 @@
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.exceptions.UnsupportedTableFeatureException;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction;
@@ -44,6 +45,8 @@
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
+import io.delta.spark.internal.v2.exception.KernelExceptionConverter;
+import io.delta.spark.internal.v2.exception.Operation;
import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
import io.delta.spark.internal.v2.utils.PartitionUtils;
import io.delta.spark.internal.v2.utils.ScalaUtils;
@@ -499,6 +502,10 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
if (interruptCause.isPresent()) {
throw new UncheckedIOException(interruptCause.get());
}
+ if (e instanceof KernelException) {
+ throw KernelExceptionConverter.translateAndThrow(
+ (KernelException) e, tablePath, Operation.STREAMING_MICROBATCH);
+ }
throw e;
}
}
@@ -664,6 +671,10 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) {
if (interruptCause.isPresent()) {
throw new UncheckedIOException(interruptCause.get());
}
+ if (e instanceof KernelException) {
+ throw KernelExceptionConverter.translateAndThrow(
+ (KernelException) e, tablePath, Operation.STREAMING_MICROBATCH);
+ }
throw e;
}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/exception/KernelExceptionConverterTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/exception/KernelExceptionConverterTest.java
new file mode 100644
index 00000000000..a034fb04d4c
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/exception/KernelExceptionConverterTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.exception;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.exceptions.TableNotFoundException;
+import io.delta.kernel.exceptions.UnsupportedProtocolVersionException;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.delta.InvalidProtocolVersionException;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link KernelExceptionConverter}. */
+public class KernelExceptionConverterTest {
+
+ private static final String PATH = "/tmp/some/table";
+
+ @Test
+ public void tableNotFound_tableResolution_mapsToSchemaNotSet() {
+ Throwable translated =
+ KernelExceptionConverter.convert(
+ new TableNotFoundException(PATH), PATH, Operation.TABLE_RESOLUTION);
+
+ AnalysisException ae = assertInstanceOf(AnalysisException.class, translated);
+ assertTrue(
+ ae.getMessage().contains("Table schema is not set"),
+ "expected 'Table schema is not set', got: " + ae.getMessage());
+ assertTrue(
+ ae.getMessage().contains("CREATE TABLE"),
+ "expected 'CREATE TABLE', got: " + ae.getMessage());
+ }
+
+ @Test
+ public void tableNotFound_microBatch_mapsToPathDoesNotExist() {
+ Throwable translated =
+ KernelExceptionConverter.convert(
+ new TableNotFoundException(PATH), PATH, Operation.STREAMING_MICROBATCH);
+
+ AnalysisException ae = assertInstanceOf(AnalysisException.class, translated);
+ assertTrue(
+ ae.getMessage().contains(PATH) && ae.getMessage().contains("doesn't exist"),
+ "expected a path-does-not-exist message, got: " + ae.getMessage());
+ }
+
+ @Test
+ public void unsupportedReaderProtocol_mapsToInvalidProtocolVersion() {
+ int badVersion = Integer.MAX_VALUE;
+ Throwable translated =
+ KernelExceptionConverter.convert(
+ new UnsupportedProtocolVersionException(
+ PATH, badVersion, UnsupportedProtocolVersionException.ProtocolVersionType.READER),
+ PATH,
+ Operation.STREAMING_MICROBATCH);
+
+ InvalidProtocolVersionException ipve =
+ assertInstanceOf(InvalidProtocolVersionException.class, translated);
+ assertEquals(PATH, ipve.tableNameOrPath());
+ assertEquals(badVersion, ipve.readerRequiredVersion());
+ assertTrue(
+ ipve.supportedReaderVersions().nonEmpty(), "expected non-empty supported reader versions");
+ }
+
+ @Test
+ public void unregisteredKernelException_returnedUnchanged() {
+ KernelException original = new KernelException("some unmapped kernel failure");
+ Throwable translated =
+ KernelExceptionConverter.convert(original, PATH, Operation.TABLE_RESOLUTION);
+
+ assertSame(original, translated, "unmapped Kernel exceptions must pass through unchanged");
+ }
+
+ @Test
+ public void translateAndThrow_throwsTranslatedException() {
+ assertThrows(
+ InvalidProtocolVersionException.class,
+ () ->
+ KernelExceptionConverter.translateAndThrow(
+ new UnsupportedProtocolVersionException(
+ PATH,
+ Integer.MAX_VALUE,
+ UnsupportedProtocolVersionException.ProtocolVersionType.WRITER),
+ PATH,
+ Operation.STREAMING_MICROBATCH));
+ }
+}