From 8625c284d8c40c5f7e8ef60424d37c6b555e9421 Mon Sep 17 00:00:00 2001 From: Sotirios Kougiouris Date: Fri, 12 Jun 2026 17:52:34 +0000 Subject: [PATCH] [Spark][kernel] Translate Kernel exceptions to DSv1 Spark/Delta exceptions Signed-off-by: Sotirios Kougiouris --- .../sql/delta/test/DeltaV2SourceSuite.scala | 8 +- .../internal/v2/catalog/DeltaV2Table.java | 13 ++- .../exception/KernelExceptionConverter.java | 95 ++++++++++++++++ .../internal/v2/exception/Operation.java | 28 +++++ .../v2/read/SparkMicroBatchStream.java | 11 ++ .../KernelExceptionConverterTest.java | 103 ++++++++++++++++++ 6 files changed, 251 insertions(+), 7 deletions(-) create mode 100644 spark/v2/src/main/java/io/delta/spark/internal/v2/exception/KernelExceptionConverter.java create mode 100644 spark/v2/src/main/java/io/delta/spark/internal/v2/exception/Operation.java create mode 100644 spark/v2/src/test/java/io/delta/spark/internal/v2/exception/KernelExceptionConverterTest.java diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala index df3098777c9..e5d10031482 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala @@ -160,7 +160,9 @@ object DeltaV2SourceSuite { "streaming read preserves percent-literal string partition value", "initial snapshot: checkpoint resume produces all rows without duplicates", "initial snapshot: Trigger.AvailableNow processes all data and terminates", - "initial snapshot: checkpoint resume after new commits produces all rows" + "initial snapshot: checkpoint resume after new commits produces all rows", + "no schema should throw an exception", + "Delta sources should verify the protocol reader version" ) val FailingTests: Set[String] = Set( @@ -181,10 +183,6 @@ object DeltaV2SourceSuite { "incremental: commit file gap between versions, failOnDataLoss=false succeeds", // === Misc === - // TODO(#5900): fix exception mismatch - "no schema should throw an exception", - // TODO(#5900): fix exception mismatch - "Delta sources should verify the protocol reader version", // TODO(#5895): gracefully handle corrupt checkpoint "start from corrupt checkpoint", diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java index 8a1e904951d..b1308fcb7ad 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java @@ -23,8 +23,11 @@ import io.delta.kernel.Snapshot; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.rowtracking.RowTracking; +import io.delta.spark.internal.v2.exception.KernelExceptionConverter; +import io.delta.spark.internal.v2.exception.Operation; import io.delta.spark.internal.v2.read.MetadataEvolutionHandler; import io.delta.spark.internal.v2.read.SparkScanBuilder; import io.delta.spark.internal.v2.read.cdc.CDCSchemaContext; @@ -197,8 +200,14 @@ private DeltaV2Table( SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options)); this.kernelEngine = DefaultEngine.create(this.hadoopConf); this.snapshotManager = SnapshotManagerFactory.create(tablePath, kernelEngine, catalogTable); - // Load the initial snapshot through the manager - this.initialSnapshot = snapshotManager.loadLatestSnapshot(); + Snapshot loadedSnapshot; + try { + loadedSnapshot = snapshotManager.loadLatestSnapshot(); + } catch (KernelException e) { + // Surface the same exceptions the DSv1 connector raises (delta-io/delta#5900). + throw KernelExceptionConverter.translateAndThrow(e, tablePath, Operation.TABLE_RESOLUTION); + } + this.initialSnapshot = loadedSnapshot; this.isCDCRead = CDCReader.isCDCRead(new CaseInsensitiveStringMap(this.options)); diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/KernelExceptionConverter.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/KernelExceptionConverter.java new file mode 100644 index 00000000000..d1dc6a3fb11 --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/KernelExceptionConverter.java @@ -0,0 +1,95 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.exception; + +import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.exceptions.TableNotFoundException; +import io.delta.kernel.exceptions.UnsupportedProtocolVersionException; +import java.util.HashMap; +import java.util.Map; +import org.apache.spark.sql.delta.DeltaErrors; +import org.apache.spark.sql.delta.InvalidProtocolVersionException; +import org.apache.spark.sql.delta.actions.Action$; + +/** + * Translates Delta Kernel exceptions into the Spark/Delta exceptions. + * + *

Mappings live in a class-to-handler registry, lookup walks the exception's class hierarchy and + * kernel exceptions without a registered handler are returned unchanged. To add a mapping, register + * one more handler below. + */ +public final class KernelExceptionConverter { + + private KernelExceptionConverter() {} + + @FunctionalInterface + private interface Handler { + Throwable translate(KernelException e, String tableNameOrPath, Operation op); + } + + private static final Map, Handler> HANDLERS = new HashMap<>(); + + static { + HANDLERS.put(TableNotFoundException.class, KernelExceptionConverter::translateTableNotFound); + HANDLERS.put( + UnsupportedProtocolVersionException.class, + KernelExceptionConverter::translateUnsupportedProtocolVersion); + } + + /** Returns the Spark/Delta equivalent of {@code e}, or {@code e} unchanged. */ + public static Throwable convert(KernelException e, String tableNameOrPath, Operation op) { + for (Class c = e.getClass(); + c != null && KernelException.class.isAssignableFrom(c); + c = c.getSuperclass()) { + Handler handler = HANDLERS.get(c); + if (handler != null) { + return handler.translate(e, tableNameOrPath, op); + } + } + return e; + } + + /** Converts {@code e} and throws the result. */ + public static RuntimeException translateAndThrow( + KernelException e, String tableNameOrPath, Operation op) { + throw sneakyThrow(convert(e, tableNameOrPath, op)); + } + + private static Throwable translateTableNotFound( + KernelException e, String tableNameOrPath, Operation op) { + return op == Operation.TABLE_RESOLUTION + ? DeltaErrors.schemaNotSetException() + : DeltaErrors.pathNotExistsException(tableNameOrPath); + } + + private static Throwable translateUnsupportedProtocolVersion( + KernelException e, String tableNameOrPath, Operation op) { + UnsupportedProtocolVersionException source = (UnsupportedProtocolVersionException) e; + boolean isReader = + source.getVersionType() == UnsupportedProtocolVersionException.ProtocolVersionType.READER; + return new InvalidProtocolVersionException( + tableNameOrPath, + isReader ? source.getVersion() : 0, + isReader ? 0 : source.getVersion(), + Action$.MODULE$.supportedReaderVersionNumbers().toList(), + Action$.MODULE$.supportedWriterVersionNumbers().toList()); + } + + @SuppressWarnings("unchecked") + private static E sneakyThrow(Throwable t) throws E { + throw (E) t; + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/Operation.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/Operation.java new file mode 100644 index 00000000000..cdf7b61401d --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/exception/Operation.java @@ -0,0 +1,28 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.exception; + +/** + * The connector operation during which a Kernel exception was raised. Lets {@link + * KernelExceptionConverter} map the same Kernel exception to different Spark/Delta exceptions per + * operation. + */ +public enum Operation { + /** Resolving a table reference. */ + TABLE_RESOLUTION, + /** Streaming micro-batch execution. */ + STREAMING_MICROBATCH +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java index 3d019dec030..cfbcba5a7da 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java @@ -28,6 +28,7 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.exceptions.UnsupportedTableFeatureException; import io.delta.kernel.internal.DeltaHistoryManager; import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction; @@ -44,6 +45,8 @@ import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.utils.CloseableIterator; import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter; +import io.delta.spark.internal.v2.exception.KernelExceptionConverter; +import io.delta.spark.internal.v2.exception.Operation; import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager; import io.delta.spark.internal.v2.utils.PartitionUtils; import io.delta.spark.internal.v2.utils.ScalaUtils; @@ -499,6 +502,10 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { if (interruptCause.isPresent()) { throw new UncheckedIOException(interruptCause.get()); } + if (e instanceof KernelException) { + throw KernelExceptionConverter.translateAndThrow( + (KernelException) e, tablePath, Operation.STREAMING_MICROBATCH); + } throw e; } } @@ -664,6 +671,10 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { if (interruptCause.isPresent()) { throw new UncheckedIOException(interruptCause.get()); } + if (e instanceof KernelException) { + throw KernelExceptionConverter.translateAndThrow( + (KernelException) e, tablePath, Operation.STREAMING_MICROBATCH); + } throw e; } diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/exception/KernelExceptionConverterTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/exception/KernelExceptionConverterTest.java new file mode 100644 index 00000000000..a034fb04d4c --- /dev/null +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/exception/KernelExceptionConverterTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.exception; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.exceptions.TableNotFoundException; +import io.delta.kernel.exceptions.UnsupportedProtocolVersionException; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.delta.InvalidProtocolVersionException; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link KernelExceptionConverter}. */ +public class KernelExceptionConverterTest { + + private static final String PATH = "/tmp/some/table"; + + @Test + public void tableNotFound_tableResolution_mapsToSchemaNotSet() { + Throwable translated = + KernelExceptionConverter.convert( + new TableNotFoundException(PATH), PATH, Operation.TABLE_RESOLUTION); + + AnalysisException ae = assertInstanceOf(AnalysisException.class, translated); + assertTrue( + ae.getMessage().contains("Table schema is not set"), + "expected 'Table schema is not set', got: " + ae.getMessage()); + assertTrue( + ae.getMessage().contains("CREATE TABLE"), + "expected 'CREATE TABLE', got: " + ae.getMessage()); + } + + @Test + public void tableNotFound_microBatch_mapsToPathDoesNotExist() { + Throwable translated = + KernelExceptionConverter.convert( + new TableNotFoundException(PATH), PATH, Operation.STREAMING_MICROBATCH); + + AnalysisException ae = assertInstanceOf(AnalysisException.class, translated); + assertTrue( + ae.getMessage().contains(PATH) && ae.getMessage().contains("doesn't exist"), + "expected a path-does-not-exist message, got: " + ae.getMessage()); + } + + @Test + public void unsupportedReaderProtocol_mapsToInvalidProtocolVersion() { + int badVersion = Integer.MAX_VALUE; + Throwable translated = + KernelExceptionConverter.convert( + new UnsupportedProtocolVersionException( + PATH, badVersion, UnsupportedProtocolVersionException.ProtocolVersionType.READER), + PATH, + Operation.STREAMING_MICROBATCH); + + InvalidProtocolVersionException ipve = + assertInstanceOf(InvalidProtocolVersionException.class, translated); + assertEquals(PATH, ipve.tableNameOrPath()); + assertEquals(badVersion, ipve.readerRequiredVersion()); + assertTrue( + ipve.supportedReaderVersions().nonEmpty(), "expected non-empty supported reader versions"); + } + + @Test + public void unregisteredKernelException_returnedUnchanged() { + KernelException original = new KernelException("some unmapped kernel failure"); + Throwable translated = + KernelExceptionConverter.convert(original, PATH, Operation.TABLE_RESOLUTION); + + assertSame(original, translated, "unmapped Kernel exceptions must pass through unchanged"); + } + + @Test + public void translateAndThrow_throwsTranslatedException() { + assertThrows( + InvalidProtocolVersionException.class, + () -> + KernelExceptionConverter.translateAndThrow( + new UnsupportedProtocolVersionException( + PATH, + Integer.MAX_VALUE, + UnsupportedProtocolVersionException.ProtocolVersionType.WRITER), + PATH, + Operation.STREAMING_MICROBATCH)); + } +}