Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,26 @@ public enum ProtocolVersionType {
}

private final String tablePath;
private final int version;
private final int minReaderVersion;
private final int minWriterVersion;
private final ProtocolVersionType versionType;

public UnsupportedProtocolVersionException(
String tablePath, int version, ProtocolVersionType versionType) {
String tablePath,
int minReaderVersion,
int minWriterVersion,
ProtocolVersionType versionType) {
super(
String.format(
"Unsupported Delta protocol %s version: table `%s` requires %s version %s "
+ "which is unsupported by this version of Delta Kernel.",
versionType.name().toLowerCase(),
tablePath,
versionType.name().toLowerCase(),
version));
versionType == ProtocolVersionType.READER ? minReaderVersion : minWriterVersion));
this.tablePath = tablePath;
this.version = version;
this.minReaderVersion = minReaderVersion;
this.minWriterVersion = minWriterVersion;
this.versionType = versionType;
}

Expand All @@ -57,12 +62,22 @@ public String getTablePath() {
return tablePath;
}

/** @return the table's required minimum reader protocol version */
public int getMinReaderVersion() {
return minReaderVersion;
}

/** @return the table's required minimum writer protocol version */
public int getMinWriterVersion() {
return minWriterVersion;
}

/** @return the unsupported protocol version */
public int getVersion() {
return version;
return versionType == ProtocolVersionType.READER ? minReaderVersion : minWriterVersion;
}

/** @return the type of protocol version (READER or WRITER) */
/** @return the type of protocol version that is unsupported */
public ProtocolVersionType getVersionType() {
return versionType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,20 @@ public static KernelException resolvedEndVersionAfterMaxCatalogVersion(

/* ------------------------ PROTOCOL EXCEPTIONS ----------------------------- */
public static UnsupportedProtocolVersionException unsupportedReaderProtocol(
String tablePath, int tableReaderVersion) {
String tablePath, int minReaderVersion, int minWriterVersion) {
return new UnsupportedProtocolVersionException(
tablePath,
tableReaderVersion,
minReaderVersion,
minWriterVersion,
UnsupportedProtocolVersionException.ProtocolVersionType.READER);
}

public static UnsupportedProtocolVersionException unsupportedWriterProtocol(
String tablePath, int tableWriterVersion) {
String tablePath, int minReaderVersion, int minWriterVersion) {
return new UnsupportedProtocolVersionException(
tablePath,
tableWriterVersion,
minReaderVersion,
minWriterVersion,
UnsupportedProtocolVersionException.ProtocolVersionType.WRITER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,8 @@ public static Tuple2<Set<TableFeature>, Optional<Metadata>> extractFeatureProper
/** Utility method to check if the table with given protocol is readable by the Kernel. */
public static void validateKernelCanReadTheTable(Protocol protocol, String tablePath) {
if (protocol.getMinReaderVersion() > TABLE_FEATURES_MIN_READER_VERSION) {
throw DeltaErrors.unsupportedReaderProtocol(tablePath, protocol.getMinReaderVersion());
throw DeltaErrors.unsupportedReaderProtocol(
tablePath, protocol.getMinReaderVersion(), protocol.getMinWriterVersion());
}

Set<TableFeature> unsupportedFeatures =
Expand All @@ -798,7 +799,8 @@ public static void validateKernelCanWriteToTable(
validateKernelCanReadTheTable(protocol, tablePath);

if (protocol.getMinWriterVersion() > TABLE_FEATURES_MIN_WRITER_VERSION) {
throw unsupportedWriterProtocol(tablePath, protocol.getMinWriterVersion());
throw unsupportedWriterProtocol(
tablePath, protocol.getMinReaderVersion(), protocol.getMinWriterVersion());
}

Set<TableFeature> unsupportedFeatures =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,31 @@ class ExceptionSuite extends AnyFunSuite {

test("UnsupportedProtocolVersionException - reader version") {
val tablePath = "/path/to/table"
val version = 3
val minReaderVersion = 3
val minWriterVersion = 2

val ex = DeltaErrors.unsupportedReaderProtocol(tablePath, version)
val ex = DeltaErrors.unsupportedReaderProtocol(tablePath, minReaderVersion, minWriterVersion)

assert(ex.getTablePath == tablePath)
assert(ex.getVersion == version)
assert(ex.getMinReaderVersion == minReaderVersion)
assert(ex.getMinWriterVersion == minWriterVersion)
assert(ex.getVersion == minReaderVersion)
assert(ex.getVersionType == ProtocolVersionType.READER)
assert(ex.getMessage.contains("reader"))
assert(ex.getMessage.contains("version 3"))
}

test("UnsupportedProtocolVersionException - writer version") {
val tablePath = "/path/to/table"
val version = 7
val minReaderVersion = 1
val minWriterVersion = 7

val ex = DeltaErrors.unsupportedWriterProtocol(tablePath, version)
val ex = DeltaErrors.unsupportedWriterProtocol(tablePath, minReaderVersion, minWriterVersion)

assert(ex.getTablePath == tablePath)
assert(ex.getVersion == version)
assert(ex.getMinReaderVersion == minReaderVersion)
assert(ex.getMinWriterVersion == minWriterVersion)
assert(ex.getVersion == minWriterVersion)
assert(ex.getVersionType == ProtocolVersionType.WRITER)
assert(ex.getMessage.contains("writer"))
assert(ex.getMessage.contains("version 7"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ object DeltaV2SourceSuite {
"deltaSourceIgnoreDeleteError contains removeFile, version, tablePath",
"deltaSourceIgnoreChangesError contains changeInfo, version, tablePath",
"excludeRegex throws good error on bad regex pattern",
"no schema should throw an exception",
"Delta sources should verify the protocol reader version",

// ========== Misc tests ==========
"a fast writer should not starve a Delta source",
Expand Down Expand Up @@ -181,10 +183,6 @@ object DeltaV2SourceSuite {
"incremental: commit file gap between versions, failOnDataLoss=false succeeds",

// === Misc ===
// TODO(#5900): fix exception mismatch
"no schema should throw an exception",
// TODO(#5900): fix exception mismatch
"Delta sources should verify the protocol reader version",
// TODO(#5895): gracefully handle corrupt checkpoint
"start from corrupt checkpoint",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.{ConcurrentModificationException, UUID}
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBySpec}
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, DeltaGenerateCommand}
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.AutoCompactType
Expand Down Expand Up @@ -1389,6 +1389,30 @@ trait DeltaErrorsBase
)
}

/**
* Throws [[schemaNotSetException]]. Returns `Nothing` so Java callers can invoke it as a
* statement to raise the checked [[DeltaAnalysisException]].
*/
def throwSchemaNotSet(): Nothing = {
throw schemaNotSetException
}

/**
* Java-friendly factory for [[InvalidProtocolVersionException]]. The supported reader/writer
* version sets are `private[delta]` so this must be built in Scala.
*/
def invalidProtocolVersionError(
tableNameOrPath: String,
readerRequiredVersion: Int,
writerRequiredVersion: Int): Throwable = {
InvalidProtocolVersionException(
tableNameOrPath,
readerRequiredVersion,
writerRequiredVersion,
Action.supportedReaderVersionNumbers.toSeq,
Action.supportedWriterVersionNumbers.toSeq)
}

def specifySchemaAtReadTimeException: Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_SCHEMA_DURING_READ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.spark.internal.v2.read.MetadataEvolutionHandler;
Expand All @@ -44,6 +45,7 @@
import java.util.Set;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
Expand All @@ -62,6 +64,7 @@
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.delta.DeltaErrors;
import org.apache.spark.sql.delta.DeltaTableUtils;
import org.apache.spark.sql.delta.RowCommitVersion$;
import org.apache.spark.sql.delta.RowId$;
Expand Down Expand Up @@ -199,7 +202,18 @@ private DeltaV2Table(
this.kernelEngine = DefaultEngine.create(this.hadoopConf);
this.snapshotManager = SnapshotManagerFactory.create(tablePath, kernelEngine, catalogTable);
// Load the initial snapshot through the manager
this.initialSnapshot = snapshotManager.loadLatestSnapshot();
Snapshot loadedSnapshot;
try {
loadedSnapshot = snapshotManager.loadLatestSnapshot();
} catch (TableNotFoundException e) {
// The _delta_log directory exists but holds no commits: the location is an initialized but
// empty Delta table.
if (deltaLogDirExists()) {
DeltaErrors.throwSchemaNotSet();
}
throw e;
}
this.initialSnapshot = loadedSnapshot;

this.isCDCRead = CDCReader.isCDCRead(new CaseInsensitiveStringMap(this.options));

Expand All @@ -226,6 +240,17 @@ private DeltaV2Table(
new SchemaProvider(SparkSession.active(), rawSchema, partitionColumnNames);
}

/** Returns whether the table's {@code _delta_log} directory exists. */
private boolean deltaLogDirExists() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's kernel's behavior when there is no log file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kernel throws TableNotFoundException

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My major concern is this one more addition io call.

Can we add an api to TableNotFoundException in kernel TableNotFoundException.reason() to provide a finer grain reason and use this to tell if this is schema not set or others.

Also, maybe split following into two pr

// TODO(#5900): fix exception mismatch
"no schema should throw an exception",
// TODO(#5900): fix exception mismatch
"Delta sources should verify the protocol reader version",

Delta sources should verify the protocol reader version -- one look good to me but "no schema should throw an exception" one need more discussion on how to handle it

try {
Path logPath = new Path(tablePath, "_delta_log");
FileSystem fs = logPath.getFileSystem(hadoopConf);
return fs.exists(logPath);
} catch (Exception ioe) {
return false;
}
}

/**
* Helper method to decode URI path handling URL-encoded characters correctly. E.g., converts
* "spark%25dir%25prefix" to "spark%dir%prefix"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.UnsupportedProtocolVersionException;
import io.delta.kernel.exceptions.UnsupportedTableFeatureException;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction;
Expand Down Expand Up @@ -499,6 +500,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
if (interruptCause.isPresent()) {
throw new UncheckedIOException(interruptCause.get());
}
// Translate Kernel's protocol-version error into Delta's InvalidProtocolVersionException.
Optional<UnsupportedProtocolVersionException> protocolCause = findUnsupportedProtocolCause(e);
if (protocolCause.isPresent()) {
UnsupportedProtocolVersionException p = protocolCause.get();
throw (RuntimeException)
DeltaErrors.invalidProtocolVersionError(
p.getTablePath(), p.getMinReaderVersion(), p.getMinWriterVersion());
}
throw e;
}
}
Expand Down Expand Up @@ -747,6 +756,22 @@ static Optional<ClosedByInterruptException> findClosedByInterruptCause(Throwable
return Optional.empty();
}

/**
* Walks the cause chain of the given throwable looking for a Kernel {@link
* UnsupportedProtocolVersionException}.
*/
static Optional<UnsupportedProtocolVersionException> findUnsupportedProtocolCause(Throwable t) {
for (Throwable current = t; current != null; current = current.getCause()) {
if (current instanceof UnsupportedProtocolVersionException) {
return Optional.of((UnsupportedProtocolVersionException) current);
}
if (current.getCause() == current) {
break;
}
}
return Optional.empty();
}

private boolean isExcludedPath(String path) {
return excludeRegex.isPresent() && excludeRegex.get().findFirstIn(path).isDefined();
}
Expand Down