Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import org.apache.spark.sql.connector.catalog.TableCatalog
*
* <p>The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types
* (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via
* SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only
* SPARK-56685. They do not exist in Spark 4.0/4.1, so the read-time CDF wiring is compiled in only
* when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`).
*
* <p>In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no
* `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in
* version-specific `java-shims/spark-4.2/` dirs and are not present here either.
* `loadChangelog` to override, and downstream read-time CDF classes (`DeltaChangelog`, etc.)
* live in version-specific `java-shims/spark-4.2/` dirs and are not present here either.
*/
trait ChangelogSupport extends TableCatalog
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import org.apache.spark.sql.connector.catalog.TableCatalog
*
* <p>The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types
* (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via
* SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only
* SPARK-56685. They do not exist in Spark 4.0/4.1, so the read-time CDF wiring is compiled in only
* when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`).
*
* <p>In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no
* `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in
* version-specific `java-shims/spark-4.2/` dirs and are not present here either.
* `loadChangelog` to override, and downstream read-time CDF classes (`DeltaChangelog`, etc.)
* live in version-specific `java-shims/spark-4.2/` dirs and are not present here either.
*/
trait ChangelogSupport extends TableCatalog
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.delta.{DeltaErrors, DeltaV2Mode}
import org.apache.spark.sql.delta.sources.DeltaSQLConf

/**
* Mixed into a [[TableCatalog]] implementation to add Auto-CDF support. Provides the
* Mixed into a [[TableCatalog]] implementation to add read-time CDF support. Provides the
* catalog-driven `TableCatalog.loadChangelog` entrypoint introduced by SPARK-56685.
*
* <p>This trait extends [[TableCatalog]] as a dependency marker: every concrete catalog that
Expand Down Expand Up @@ -57,7 +57,7 @@ trait ChangelogSupport extends TableCatalog {
val routeChangelogToV2 = new DeltaV2Mode(spark.sessionState.conf).shouldRouteChangelogToV2()
val sparkTable = loadTable(ident) match {
case st: DeltaV2Table => st
// Auto-CDF is V2-only. Re-resolve to V2.
// Read-time CDF is V2-only. Re-resolve to V2.
case v1: DeltaTableV2 if routeChangelogToV2 =>
asV2ChangelogTable(ident, v1)
case other =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private void withHistoryTable(String suffix, ThrowingConsumer body) throws Excep
spark.sql(String.format("INSERT INTO %s VALUES (3, 'Charlie')", tableName));
spark.sql(String.format("INSERT INTO %s VALUES (4, 'Dave')", tableName));
spark.sql(String.format("INSERT INTO %s VALUES (5, 'Eve')", tableName));
// Auto-CDF requires the V2 connector at read time. Writes above run in the
// Read-time CDF requires the V2 connector at read time. Writes above run in the
// session-default mode (AUTO → V1 connector for INSERT). The CHANGES read in
// the test body needs STRICT to ensure loadTable returns a V2 DeltaV2Table.
withSQLConf(
Expand All @@ -94,7 +94,7 @@ private interface ThrowingConsumer {
/**
* Returns the commit timestamp of {@code version}. Resolves the snapshot directly through the
* kernel snapshot manager, so it works irrespective of the catalog mode flipped on by the test
* body (which keeps the catalog in STRICT for the Auto-CDF read path).
* body (which keeps the catalog in STRICT for the read path of read-time CDF).
*/
private java.sql.Timestamp commitTimestamp(String tablePath, long version) {
DeltaSnapshotManager snapshotManager =
Expand Down Expand Up @@ -845,11 +845,118 @@ public void testChangelogRejectsRowTrackingDisabledMidRange() throws Exception {
}

/**
* A CHANGES read across a range where the table schema evolves mid-range must be rejected with
* {@code DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE}.
* Row tracking is toggled off and back on entirely BEFORE the requested range. The read range
* itself stays row-tracking-enabled, so the out-of-range toggle must not fail the read.
*/
@Test
public void testChangelogRejectsSchemaChangeMidRange() throws Exception {
public void testChangelogAllowsRowTrackingToggleBeforeRange() throws Exception {
String tableName = "dsv2_cdc_catalog_rt_toggle_before_" + System.nanoTime();
String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;

withTable(
tablePath,
() ->
withTable(
new String[] {tableName},
() -> {
// v0: CREATE with row tracking enabled.
spark.sql(
String.format(
"CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ "TBLPROPERTIES "
+ "('delta.enableDeletionVectors'='false', "
+ "'delta.enableRowTracking'='true')",
tableName, tablePath));
// v1: INSERT (row tracking on).
spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
// v2: disable row tracking, v3: re-enable it -- both before the read range.
spark.sql(
String.format(
"ALTER TABLE %s SET TBLPROPERTIES ('delta.enableRowTracking'='false')",
tableName));
spark.sql(
String.format(
"ALTER TABLE %s SET TBLPROPERTIES ('delta.enableRowTracking'='true')",
tableName));
// v4, v5: INSERTs inside the row-tracking-enabled read range.
spark.sql(String.format("INSERT INTO %s VALUES (2, 'Bob')", tableName));
spark.sql(String.format("INSERT INTO %s VALUES (3, 'Charlie')", tableName));

withSQLConf(
"spark.databricks.delta.v2.enableMode",
"STRICT",
() -> {
List<Row> rows =
spark
.sql(
String.format(
"SELECT id, _change_type FROM %s "
+ "CHANGES FROM VERSION 4 TO VERSION 5",
tableName))
.collectAsList();
assertEquals(
2, rows.size(), "Expected the two in-range inserts to be returned");
});
}));
}

/**
* Row tracking is disabled at a commit AFTER the requested range. The read range stays
* row-tracking-enabled, so the later toggle must not fail the read.
*/
@Test
public void testChangelogAllowsRowTrackingDisabledAfterRange() throws Exception {
String tableName = "dsv2_cdc_catalog_rt_disabled_after_" + System.nanoTime();
String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;

withTable(
tablePath,
() ->
withTable(
new String[] {tableName},
() -> {
// v0: CREATE with row tracking enabled.
spark.sql(
String.format(
"CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ "TBLPROPERTIES "
+ "('delta.enableDeletionVectors'='false', "
+ "'delta.enableRowTracking'='true')",
tableName, tablePath));
// v1, v2: INSERTs inside the row-tracking-enabled read range.
spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
spark.sql(String.format("INSERT INTO %s VALUES (2, 'Bob')", tableName));
// v3: disable row tracking after the range.
spark.sql(
String.format(
"ALTER TABLE %s SET TBLPROPERTIES ('delta.enableRowTracking'='false')",
tableName));

withSQLConf(
"spark.databricks.delta.v2.enableMode",
"STRICT",
() -> {
List<Row> rows =
spark
.sql(
String.format(
"SELECT id, _change_type FROM %s "
+ "CHANGES FROM VERSION 1 TO VERSION 2",
tableName))
.collectAsList();
assertEquals(
2, rows.size(), "Expected the two in-range inserts to be returned");
});
}));
}

/**
* A CHANGES read across an additive mid-range schema change (adding a nullable column) is read
* compatible and must succeed. Rows from before the change are read with the end schema, leaving
* the added column null.
*/
@Test
public void testChangelogAllowsAdditiveSchemaChangeMidRange() throws Exception {
String tableName = "dsv2_cdc_catalog_schema_change_" + System.nanoTime();
String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;

Expand All @@ -867,10 +974,58 @@ public void testChangelogRejectsSchemaChangeMidRange() throws Exception {
+ "'delta.enableRowTracking'='true')",
tableName, tablePath));
spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
// Schema change mid-range: add a column.
// Additive schema change mid-range: add a nullable column.
spark.sql(String.format("ALTER TABLE %s ADD COLUMN extra STRING", tableName));
spark.sql(String.format("INSERT INTO %s VALUES (2, 'Bob', 'x')", tableName));

withSQLConf(
"spark.databricks.delta.v2.enableMode",
"STRICT",
() -> {
List<Row> rows =
spark
.sql(
String.format(
"SELECT id, _change_type FROM %s "
+ "CHANGES FROM VERSION 1 TO VERSION 3",
tableName))
.collectAsList();
assertEquals(
2, rows.size(), "Additive schema change should not fail the read");
});
}));
}

/**
* A non-read-compatible mid-range schema change (dropping a column) must be rejected with {@code
* DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE}: the end schema can no longer read the data written
* before the drop.
*/
@Test
public void testChangelogRejectsIncompatibleSchemaChangeMidRange() throws Exception {
String tableName = "dsv2_cdc_catalog_drop_col_" + System.nanoTime();
String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;

withTable(
tablePath,
() ->
withTable(
new String[] {tableName},
() -> {
// Column mapping is required to drop a column.
spark.sql(
String.format(
"CREATE TABLE %s (id BIGINT, name STRING, extra STRING) USING delta "
+ "LOCATION '%s' TBLPROPERTIES "
+ "('delta.enableDeletionVectors'='false', "
+ "'delta.enableRowTracking'='true', "
+ "'delta.columnMapping.mode'='name')",
tableName, tablePath));
spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice', 'x')", tableName));
// Non-additive schema change mid-range: drop a column.
spark.sql(String.format("ALTER TABLE %s DROP COLUMN extra", tableName));
spark.sql(String.format("INSERT INTO %s VALUES (2, 'Bob')", tableName));

withSQLConf(
"spark.databricks.delta.v2.enableMode",
"STRICT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.jupiter.api.Test;

/**
* Integration tests for Auto-CDF / DSv2 changelog with Deletion Vectors (DVs) that are NOT
* Integration tests for read-time CDF / DSv2 changelog with Deletion Vectors (DVs) that are NOT
* covered by parameterized DV-on/off variants of
* {@link DeltaChangelogDirectBatchExecutionTest}. The parameterized tests already cover the
* basic single-file DELETE and UPDATE; this file holds scenarios that need DV-specific table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* <li>The hybrid {@code DeltaCatalog} (spark-unified) as {@code spark_catalog}, so {@code
* TableCatalog.loadChangelog} routes into the {@code ChangelogSupport} trait.
* <li>{@code spark.databricks.delta.changelogV2.enabled = true}, the SQLConf gate behind which
* Auto-CDF is hidden in production.
* Read-time CDF is hidden in production.
* </ul>
*
* <p>Keeping these two settings out of {@code DeltaV2TestBase} means the rest of the V2 test suites
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@
],
"sqlState" : "22003"
},
"DELTA_CHANGELOG_READ_FAILED" : {
"message" : [
"Failed to read the changelog while <context>."
],
"sqlState" : "XXKDS"
},
"DELTA_CHANGELOG_REQUIRES_ROW_TRACKING" : {
"message" : [
"Change data capture via CHANGES on `<tableName>` requires row tracking.",
Expand Down
26 changes: 21 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ trait DeltaErrorsBase
}

/**
* Auto-CDF batch read rejected because the source table does not have row tracking enabled.
* Read-time CDF batch read rejected because the source table does not have row tracking enabled.
* Row tracking is required for the V2 changelog reader to identify rows across commits.
*
* Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
Expand All @@ -638,7 +638,7 @@ trait DeltaErrorsBase
}

/**
* Auto-CDF batch read rejected because the user requested an unbounded changelog range.
* Read-time CDF batch read rejected because the user requested an unbounded changelog range.
* Batch CHANGES queries require explicit start and end bounds.
*
* Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
Expand All @@ -651,7 +651,7 @@ trait DeltaErrorsBase
}

/**
* Auto-CDF batch read rejected because the table resolved by the catalog is not a V2
* Read-time CDF batch read rejected because the table resolved by the catalog is not a V2
* [[io.delta.spark.internal.v2.catalog.DeltaV2Table]]. The V2 connector is the only path that
* implements the catalog-driven CHANGES surface. V1 Delta tables (`DeltaTableV2`) continue to
* use the legacy CDF path that does not go through `TableCatalog.loadChangelog`. Use
Expand All @@ -667,7 +667,7 @@ trait DeltaErrorsBase
}

/**
* Auto-CDF batch read rejected because the table schema differs at some commit within the
* Read-time CDF batch read rejected because the table schema differs at some commit within the
* requested range. The connector requires the schema to be stable across the read range so
* that downstream batch CDC post-processing sees a single schema.
*/
Expand All @@ -678,7 +678,7 @@ trait DeltaErrorsBase
}

/**
* Auto-CDF batch read rejected because row tracking was disabled at some commit within the
* Read-time CDF batch read rejected because row tracking was disabled at some commit within the
* requested range (the `delta.enableRowTracking` table property was set to `false`).
*/
def throwChangelogRowTrackingDisabledInRange(version: Long): Nothing = {
Expand All @@ -687,6 +687,22 @@ trait DeltaErrorsBase
messageParameters = Array(version.toString))
}

/**
* Read-time CDF batch read failed while reading the changelog (e.g. an IO error while iterating
* commit actions or planning input partitions). A cause that already carries a Spark error class
* is rethrown unchanged so its user-facing class is preserved. Anything else is wrapped in a
* Delta error class rather than a bare RuntimeException. `context` names the phase, e.g.
* "processing commit actions".
*/
def throwChangelogReadFailed(context: String, cause: Throwable): Nothing = cause match {
case t: org.apache.spark.SparkThrowable => throw t.asInstanceOf[Throwable]
case t =>
throw new DeltaIllegalStateException(
errorClass = "DELTA_CHANGELOG_READ_FAILED",
messageParameters = Array(context),
cause = t)
}

def setTransactionVersionConflict(appId: String, version1: Long, version2: Long): Throwable = {
new IllegalArgumentException(
s"Two SetTransaction actions within the same transaction have the same appId ${appId} but " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2446,7 +2446,7 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
.doc(
"""When enabled, the V2 connector's hybrid DeltaCatalog answers
|CHANGES FROM ... batch queries (TableCatalog.loadChangelog) using the
|kernel-based Auto-CDF reader stack. When disabled, the catalog falls back to the
|kernel-based read-time CDF reader stack. When disabled, the catalog falls back to the
|default behavior (UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE).""".stripMargin)
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2957,6 +2957,25 @@ trait DeltaErrorsSuiteBase
}
}

test("throwChangelogReadFailed preserves SparkThrowable cause and wraps others") {
// A cause that already carries a Spark error class is rethrown unchanged.
val sparkThrowableCause = new DeltaAnalysisException(
errorClass = "DELTA_CHANGELOG_UNBOUNDED_RANGE",
messageParameters = Array.empty[String])
val passed = intercept[DeltaAnalysisException] {
DeltaErrors.throwChangelogReadFailed("processing commit actions", sparkThrowableCause)
}
assert(passed eq sparkThrowableCause)

// Any other cause is wrapped in DELTA_CHANGELOG_READ_FAILED.
val wrapped = intercept[DeltaIllegalStateException] {
DeltaErrors.throwChangelogReadFailed(
"planning input partitions", new RuntimeException("boom"))
}
checkError(wrapped, "DELTA_CHANGELOG_READ_FAILED", "XXKDS",
Map("context" -> "planning input partitions"))
}

private def setCustomContext(session: SparkSession, context: SparkContext): Unit = {
val scField = session.getClass.getDeclaredField("sparkContext")
scField.setAccessible(true)
Expand Down
Loading
Loading