Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ object DeltaV2SourceSuite {
"deltaSourceIgnoreDeleteError contains removeFile, version, tablePath",
"deltaSourceIgnoreChangesError contains changeInfo, version, tablePath",
"excludeRegex throws good error on bad regex pattern",
"no schema should throw an exception",
"Delta sources should verify the protocol reader version",

// ========== Misc tests ==========
"a fast writer should not starve a Delta source",
Expand Down Expand Up @@ -181,10 +183,6 @@ object DeltaV2SourceSuite {
"incremental: commit file gap between versions, failOnDataLoss=false succeeds",

// === Misc ===
// TODO(#5900): fix exception mismatch
"no schema should throw an exception",
// TODO(#5900): fix exception mismatch
"Delta sources should verify the protocol reader version",
// TODO(#5895): gracefully handle corrupt checkpoint
"start from corrupt checkpoint",

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.{ConcurrentModificationException, UUID}
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBySpec}
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, DeltaGenerateCommand}
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.AutoCompactType
Expand Down Expand Up @@ -1389,6 +1389,30 @@ trait DeltaErrorsBase
)
}

/**
* Throws [[schemaNotSetException]]. Returns `Nothing` so Java callers can invoke it as a
* statement to raise the checked [[DeltaAnalysisException]].
*/
def throwSchemaNotSet(): Nothing = {
throw schemaNotSetException
}

/**
* Java-friendly factory for [[InvalidProtocolVersionException]]. The supported reader/writer
* version sets are `private[delta]` so this must be built in Scala.
*/
def invalidProtocolVersionError(
tableNameOrPath: String,
readerRequiredVersion: Int,
writerRequiredVersion: Int): Throwable = {
InvalidProtocolVersionException(
tableNameOrPath,
readerRequiredVersion,
writerRequiredVersion,
Action.supportedReaderVersionNumbers.toSeq,
Action.supportedWriterVersionNumbers.toSeq)
}

def specifySchemaAtReadTimeException: Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_SCHEMA_DURING_READ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.spark.internal.v2.read.MetadataEvolutionHandler;
Expand All @@ -44,6 +45,7 @@
import java.util.Set;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
Expand All @@ -62,6 +64,7 @@
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.delta.DeltaErrors;
import org.apache.spark.sql.delta.DeltaTableUtils;
import org.apache.spark.sql.delta.RowCommitVersion$;
import org.apache.spark.sql.delta.RowId$;
Expand Down Expand Up @@ -199,7 +202,18 @@ private DeltaV2Table(
this.kernelEngine = DefaultEngine.create(this.hadoopConf);
this.snapshotManager = SnapshotManagerFactory.create(tablePath, kernelEngine, catalogTable);
// Load the initial snapshot through the manager
this.initialSnapshot = snapshotManager.loadLatestSnapshot();
Snapshot loadedSnapshot;
try {
loadedSnapshot = snapshotManager.loadLatestSnapshot();
} catch (TableNotFoundException e) {
// The _delta_log directory exists but holds no commits: the location is an initialized but
// empty Delta table.
if (deltaLogDirExists()) {
DeltaErrors.throwSchemaNotSet();
}
throw e;
}
this.initialSnapshot = loadedSnapshot;

this.isCDCRead = CDCReader.isCDCRead(new CaseInsensitiveStringMap(this.options));

Expand All @@ -226,6 +240,17 @@ private DeltaV2Table(
new SchemaProvider(SparkSession.active(), rawSchema, partitionColumnNames);
}

/** Returns whether the table's {@code _delta_log} directory exists. */
private boolean deltaLogDirExists() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's kernel's behavior when there is no log file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kernel throws TableNotFoundException

try {
Path logPath = new Path(tablePath, "_delta_log");
FileSystem fs = logPath.getFileSystem(hadoopConf);
return fs.exists(logPath);
} catch (Exception ioe) {
return false;
}
}

/**
* Helper method to decode URI path handling URL-encoded characters correctly. E.g., converts
* "spark%25dir%25prefix" to "spark%dir%prefix"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.UnsupportedProtocolVersionException;
import io.delta.kernel.exceptions.UnsupportedTableFeatureException;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction;
Expand Down Expand Up @@ -499,6 +500,16 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
if (interruptCause.isPresent()) {
throw new UncheckedIOException(interruptCause.get());
}
// Translate Kernel's protocol-version error into Delta's InvalidProtocolVersionException.
Optional<UnsupportedProtocolVersionException> protocolCause = findUnsupportedProtocolCause(e);
if (protocolCause.isPresent()) {
UnsupportedProtocolVersionException p = protocolCause.get();
boolean isReader =
p.getVersionType() == UnsupportedProtocolVersionException.ProtocolVersionType.READER;
throw (RuntimeException)
DeltaErrors.invalidProtocolVersionError(
p.getTablePath(), isReader ? p.getVersion() : 0, isReader ? 0 : p.getVersion());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loses part of the table protocol when translating to Delta's InvalidProtocolVersionException. DSv1 builds that exception with both tableProtocol.minReaderVersion and tableProtocol.minWriterVersion; here a reader-side Kernel error reports writerRequired=0 in the DELTA_INVALID_PROTOCOL_VERSION message, which is not a valid table requirement and is still not DSv1-equivalent. Can we preserve the full Protocol when doing the translation, or use a factory/error shape that does not invent the missing side?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, UnsupportedProtocolVersionException now carries the full protocol.

}
throw e;
}
}
Expand Down Expand Up @@ -747,6 +758,22 @@ static Optional<ClosedByInterruptException> findClosedByInterruptCause(Throwable
return Optional.empty();
}

/**
* Walks the cause chain of the given throwable looking for a Kernel {@link
* UnsupportedProtocolVersionException}.
*/
static Optional<UnsupportedProtocolVersionException> findUnsupportedProtocolCause(Throwable t) {
for (Throwable current = t; current != null; current = current.getCause()) {
if (current instanceof UnsupportedProtocolVersionException) {
return Optional.of((UnsupportedProtocolVersionException) current);
}
if (current.getCause() == current) {
break;
}
}
return Optional.empty();
}

private boolean isExcludedPath(String path) {
return excludeRegex.isPresent() && excludeRegex.get().findFirstIn(path).isDefined();
}
Expand Down