[Spark] Clean up the read-time CDF changelog implementation#7075
Open
SanJSp wants to merge 14 commits into
Open
[Spark] Clean up the read-time CDF changelog implementation#7075SanJSp wants to merge 14 commits into
SanJSp wants to merge 14 commits into
Conversation
Replaces the "Auto-CDF" wording with "read-time CDF" across changelog comments, docs and config descriptions. Comment/string text only, no code identifiers changed. Reviewer comment: delta-io#6794 (comment)
The schema field holds the ending-version schema used as the read schema for the changelog range. Renames it to endDataSchema to disambiguate from per-commit schemas. Reviewer comment: delta-io#6794 (comment)
The changelog read path compared per-commit and start schemas to the end schema with strict equality. This rejects additive, read-compatible changes (new columns, relaxed nullability) and trips on internal field metadata. Replaces both checks with a SchemaUtils.isReadCompatible helper in the v2 schema utils that delegates to Delta's read-compatibility check (forbidTightenNullability = true, matching CDCReaderBase). Reviewer comment: delta-io#6794 (comment)
A Metadata action fully replaces the prior table configuration: all configs are listed explicitly. The changelog loop treated an absent row-tracking key as "inherit the prior value" and only failed on an explicit non-"true" value, so a Metadata commit that drops row tracking could slip through. Treats an absent key as the table default (disabled), and reads the config key from DeltaConfigs.ROW_TRACKING_ENABLED instead of a string literal. The in-range check now correctly rejects row tracking being disabled within the range; the boundary case is already caught earlier in DeltaChangelogScanBuilder. Reviewer comments: delta-io#6794 (comment) delta-io#6794 (comment) delta-io#6794 (comment)
The changelog read path wrapped unexpected failures while processing commit actions and planning input partitions in bare RuntimeExceptions, discarding the user-facing error class machinery. Adds a DELTA_CHANGELOG_READ_FAILED error class and a throwChangelogReadFailed helper: a cause that already carries a Spark error class is rethrown unchanged (preserving e.g. DELTA_CHANGELOG_ROW_TRACKING_DISABLED_IN_RANGE), anything else is wrapped in DELTA_CHANGELOG_READ_FAILED instead of a RuntimeException. Reviewer comments: https://github.com/delta-io/delta/pull/6830/files#r3301759577 delta-io#6886 (comment)
Adds a class-level comment describing how the changelog batch turns a commit range into CDC input partitions and where the CDC tail columns are appended.
Reflows comments to satisfy javafmtCheck (google-java-format) after the wording and isReadCompatible changes above.
The suite already rejects row tracking disabled within the range; these add the positive cases: toggling row tracking off (and back on) entirely before the range, and disabling it after the range, must not fail a read whose range stays row-tracking-enabled throughout.
Reads the in-range row-tracking flag via TableConfig.ROW_TRACKING_ENABLED .fromMetadata(md) (kernel Metadata is already in scope), which returns a Boolean with parsing and the table default backed in, instead of fetching the raw config string and parsing it by hand. Matches the existing pattern in ProtocolMetadataAdapterV2 and drops the DeltaConfigs Scala interop. Also extracts the duplicated start/per-commit schema check into a private requireReadCompatible helper.
Converting the schema check to isReadCompatible left the rejection branch (DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE) untested. Adds an integration test for a non-read-compatible mid-range change (DROP COLUMN) and direct unit tests for SchemaUtils.isReadCompatible (additive / relaxed-nullability accepted, dropped column / tightened nullability rejected). Also fixes a doubled "read-time CDF read path" comment.
Locks the two branches: a SparkThrowable cause is rethrown unchanged (its user-facing error class preserved), any other cause is wrapped in DELTA_CHANGELOG_READ_FAILED.
39ec98d to
6bfdb4a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which Delta project/connector is this regarding?
Description
Follow-up cleanup for the read-time CDF (changelog) reader added in #6794 / #6830 / #6886, addressing reviewer feedback. The changelog V2 reader stays gated behind
spark.databricks.delta.changelogV2.enabledand is master-only (unreleased).Changes, each in its own commit:
DeltaChangelogBatchwithSchemaUtils.isReadCompatible, so additive, read-compatible changes (new columns, relaxed nullability) are accepted and internal field metadata is ignored. The check is wrapped in a smallisReadCompatiblehelper on the v2SchemaUtils(forbidTightenNullability = true, matchingCDCReaderBase). (comment)dataSchemafield toendDataSchema. (comment)Metadataaction fully replaces the prior table configuration, so an absentdelta.enableRowTrackingkey now means the table default (disabled) rather than an inherited value; the key is read viaDeltaConfigs.ROW_TRACKING_ENABLEDinstead of a string literal. This makes the in-range row-tracking-disabled check correct; the boundary case is already caught earlier inDeltaChangelogScanBuilder. (supersedes, config helper, in-range check)RuntimeExceptionwrapping in the changelog read path with aDELTA_CHANGELOG_READ_FAILEDerror class. Causes that already carry a Spark error class are rethrown unchanged so their user-facing class is preserved. (comment, comment)DeltaChangelogBatchread flow.How was this patch tested?
Does this PR introduce any user-facing changes?
No