diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/UnsupportedProtocolVersionException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/UnsupportedProtocolVersionException.java index 9579543afa3..fec1cd35dc7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/UnsupportedProtocolVersionException.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/UnsupportedProtocolVersionException.java @@ -34,11 +34,15 @@ public enum ProtocolVersionType { } private final String tablePath; - private final int version; + private final int minReaderVersion; + private final int minWriterVersion; private final ProtocolVersionType versionType; public UnsupportedProtocolVersionException( - String tablePath, int version, ProtocolVersionType versionType) { + String tablePath, + int minReaderVersion, + int minWriterVersion, + ProtocolVersionType versionType) { super( String.format( "Unsupported Delta protocol %s version: table `%s` requires %s version %s " @@ -46,9 +50,10 @@ public UnsupportedProtocolVersionException( versionType.name().toLowerCase(), tablePath, versionType.name().toLowerCase(), - version)); + versionType == ProtocolVersionType.READER ? minReaderVersion : minWriterVersion)); this.tablePath = tablePath; - this.version = version; + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; this.versionType = versionType; } @@ -57,12 +62,22 @@ public String getTablePath() { return tablePath; } + /** @return the table's required minimum reader protocol version */ + public int getMinReaderVersion() { + return minReaderVersion; + } + + /** @return the table's required minimum writer protocol version */ + public int getMinWriterVersion() { + return minWriterVersion; + } + /** @return the unsupported protocol version */ public int getVersion() { - return version; + return versionType == ProtocolVersionType.READER ? minReaderVersion : minWriterVersion; } - /** @return the type of protocol version (READER or WRITER) */ + /** @return the type of protocol version that is unsupported */ public ProtocolVersionType getVersionType() { return versionType; } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 2ffa57b7e92..03aea798352 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -159,18 +159,20 @@ public static KernelException resolvedEndVersionAfterMaxCatalogVersion( /* ------------------------ PROTOCOL EXCEPTIONS ----------------------------- */ public static UnsupportedProtocolVersionException unsupportedReaderProtocol( - String tablePath, int tableReaderVersion) { + String tablePath, int minReaderVersion, int minWriterVersion) { return new UnsupportedProtocolVersionException( tablePath, - tableReaderVersion, + minReaderVersion, + minWriterVersion, UnsupportedProtocolVersionException.ProtocolVersionType.READER); } public static UnsupportedProtocolVersionException unsupportedWriterProtocol( - String tablePath, int tableWriterVersion) { + String tablePath, int minReaderVersion, int minWriterVersion) { return new UnsupportedProtocolVersionException( tablePath, - tableWriterVersion, + minReaderVersion, + minWriterVersion, UnsupportedProtocolVersionException.ProtocolVersionType.WRITER); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index 7e87f6b9080..0f9ba4c6ddd 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -774,7 +774,8 @@ public static Tuple2, Optional> extractFeatureProper /** Utility method to check if the table with given protocol is readable by the Kernel. */ public static void validateKernelCanReadTheTable(Protocol protocol, String tablePath) { if (protocol.getMinReaderVersion() > TABLE_FEATURES_MIN_READER_VERSION) { - throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion()); + throw DeltaErrors.unsupportedReaderProtocol( + tablePath, protocol.getMinReaderVersion(), protocol.getMinWriterVersion()); } Set unsupportedFeatures = @@ -798,7 +799,8 @@ public static void validateKernelCanWriteToTable( validateKernelCanReadTheTable(protocol, tablePath); if (protocol.getMinWriterVersion() > TABLE_FEATURES_MIN_WRITER_VERSION) { - throw unsupportedWriterProtocol(tablePath, protocol.getMinWriterVersion()); + throw unsupportedWriterProtocol( + tablePath, protocol.getMinReaderVersion(), protocol.getMinWriterVersion()); } Set unsupportedFeatures = diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/exceptions/ExceptionSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/exceptions/ExceptionSuite.scala index d7e9a096756..706e26b999e 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/exceptions/ExceptionSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/exceptions/ExceptionSuite.scala @@ -58,12 +58,15 @@ class ExceptionSuite extends AnyFunSuite { test("UnsupportedProtocolVersionException - reader version") { val tablePath = "/path/to/table" - val version = 3 + val minReaderVersion = 3 + val minWriterVersion = 2 - val ex = DeltaErrors.unsupportedReaderProtocol(tablePath, version) + val ex = DeltaErrors.unsupportedReaderProtocol(tablePath, minReaderVersion, minWriterVersion) assert(ex.getTablePath == tablePath) - assert(ex.getVersion == version) + assert(ex.getMinReaderVersion == minReaderVersion) + assert(ex.getMinWriterVersion == minWriterVersion) + assert(ex.getVersion == minReaderVersion) assert(ex.getVersionType == ProtocolVersionType.READER) assert(ex.getMessage.contains("reader")) assert(ex.getMessage.contains("version 3")) @@ -71,12 +74,15 @@ class ExceptionSuite extends AnyFunSuite { test("UnsupportedProtocolVersionException - writer version") { val tablePath = "/path/to/table" - val version = 7 + val minReaderVersion = 1 + val minWriterVersion = 7 - val ex = DeltaErrors.unsupportedWriterProtocol(tablePath, version) + val ex = DeltaErrors.unsupportedWriterProtocol(tablePath, minReaderVersion, minWriterVersion) assert(ex.getTablePath == tablePath) - assert(ex.getVersion == version) + assert(ex.getMinReaderVersion == minReaderVersion) + assert(ex.getMinWriterVersion == minWriterVersion) + assert(ex.getVersion == minWriterVersion) assert(ex.getVersionType == ProtocolVersionType.WRITER) assert(ex.getMessage.contains("writer")) assert(ex.getMessage.contains("version 7")) diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala index df3098777c9..5f07edfcc13 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala @@ -145,6 +145,8 @@ object DeltaV2SourceSuite { "deltaSourceIgnoreDeleteError contains removeFile, version, tablePath", "deltaSourceIgnoreChangesError contains changeInfo, version, tablePath", "excludeRegex throws good error on bad regex pattern", + "no schema should throw an exception", + "Delta sources should verify the protocol reader version", // ========== Misc tests ========== "a fast writer should not starve a Delta source", @@ -181,10 +183,6 @@ object DeltaV2SourceSuite { "incremental: commit file gap between versions, failOnDataLoss=false succeeds", // === Misc === - // TODO(#5900): fix exception mismatch - "no schema should throw an exception", - // TODO(#5900): fix exception mismatch - "Delta sources should verify the protocol reader version", // TODO(#5895): gracefully handle corrupt checkpoint "start from corrupt checkpoint", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 28ed3cb6cb4..bada96fced5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -24,7 +24,7 @@ import java.util.{ConcurrentModificationException, UUID} import scala.collection.JavaConverters._ import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBySpec} -import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, DeltaGenerateCommand} import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.hooks.AutoCompactType @@ -1389,6 +1389,30 @@ trait DeltaErrorsBase ) } + /** + * Throws [[schemaNotSetException]]. Returns `Nothing` so Java callers can invoke it as a + * statement to raise the checked [[DeltaAnalysisException]]. + */ + def throwSchemaNotSet(): Nothing = { + throw schemaNotSetException + } + + /** + * Java-friendly factory for [[InvalidProtocolVersionException]]. The supported reader/writer + * version sets are `private[delta]` so this must be built in Scala. + */ + def invalidProtocolVersionError( + tableNameOrPath: String, + readerRequiredVersion: Int, + writerRequiredVersion: Int): Throwable = { + InvalidProtocolVersionException( + tableNameOrPath, + readerRequiredVersion, + writerRequiredVersion, + Action.supportedReaderVersionNumbers.toSeq, + Action.supportedWriterVersionNumbers.toSeq) + } + def specifySchemaAtReadTimeException: Throwable = { new DeltaAnalysisException( errorClass = "DELTA_UNSUPPORTED_SCHEMA_DURING_READ", diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java index e7bc4a6cdcd..2b1cb617347 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java @@ -23,6 +23,7 @@ import io.delta.kernel.Snapshot; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.rowtracking.RowTracking; import io.delta.spark.internal.v2.read.MetadataEvolutionHandler; @@ -44,6 +45,7 @@ import java.util.Set; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.catalog.CatalogTable; @@ -62,6 +64,7 @@ import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.write.LogicalWriteInfo; import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.delta.DeltaErrors; import org.apache.spark.sql.delta.DeltaTableUtils; import org.apache.spark.sql.delta.RowCommitVersion$; import org.apache.spark.sql.delta.RowId$; @@ -199,7 +202,18 @@ private DeltaV2Table( this.kernelEngine = DefaultEngine.create(this.hadoopConf); this.snapshotManager = SnapshotManagerFactory.create(tablePath, kernelEngine, catalogTable); // Load the initial snapshot through the manager - this.initialSnapshot = snapshotManager.loadLatestSnapshot(); + Snapshot loadedSnapshot; + try { + loadedSnapshot = snapshotManager.loadLatestSnapshot(); + } catch (TableNotFoundException e) { + // The _delta_log directory exists but holds no commits: the location is an initialized but + // empty Delta table. + if (deltaLogDirExists()) { + DeltaErrors.throwSchemaNotSet(); + } + throw e; + } + this.initialSnapshot = loadedSnapshot; this.isCDCRead = CDCReader.isCDCRead(new CaseInsensitiveStringMap(this.options)); @@ -226,6 +240,17 @@ private DeltaV2Table( new SchemaProvider(SparkSession.active(), rawSchema, partitionColumnNames); } + /** Returns whether the table's {@code _delta_log} directory exists. */ + private boolean deltaLogDirExists() { + try { + Path logPath = new Path(tablePath, "_delta_log"); + FileSystem fs = logPath.getFileSystem(hadoopConf); + return fs.exists(logPath); + } catch (Exception ioe) { + return false; + } + } + /** * Helper method to decode URI path handling URL-encoded characters correctly. E.g., converts * "spark%25dir%25prefix" to "spark%dir%prefix" diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java index 3d019dec030..415de599bf6 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java @@ -28,6 +28,7 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.UnsupportedProtocolVersionException; import io.delta.kernel.exceptions.UnsupportedTableFeatureException; import io.delta.kernel.internal.DeltaHistoryManager; import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction; @@ -499,6 +500,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { if (interruptCause.isPresent()) { throw new UncheckedIOException(interruptCause.get()); } + // Translate Kernel's protocol-version error into Delta's InvalidProtocolVersionException. + Optional protocolCause = findUnsupportedProtocolCause(e); + if (protocolCause.isPresent()) { + UnsupportedProtocolVersionException p = protocolCause.get(); + throw (RuntimeException) + DeltaErrors.invalidProtocolVersionError( + p.getTablePath(), p.getMinReaderVersion(), p.getMinWriterVersion()); + } throw e; } } @@ -747,6 +756,22 @@ static Optional findClosedByInterruptCause(Throwable return Optional.empty(); } + /** + * Walks the cause chain of the given throwable looking for a Kernel {@link + * UnsupportedProtocolVersionException}. + */ + static Optional findUnsupportedProtocolCause(Throwable t) { + for (Throwable current = t; current != null; current = current.getCause()) { + if (current instanceof UnsupportedProtocolVersionException) { + return Optional.of((UnsupportedProtocolVersionException) current); + } + if (current.getCause() == current) { + break; + } + } + return Optional.empty(); + } + private boolean isExcludedPath(String path) { return excludeRegex.isPresent() && excludeRegex.get().findFirstIn(path).isDefined(); }