[flink][spark] supports adding blob columns through ALTER TABLE statements#7921
[flink][spark] supports adding blob columns through ALTER TABLE statements#7921steFaiz wants to merge 8 commits into
ALTER TABLE statements#7921Conversation
|
PR requires users to first set the bloom field and then add the column. This is a two-step operation, the order cannot be reversed. It is recommended to provide clearer operation instructions in the error message. |
|
Lines 1134-1138 of FlinkCatalog.java (the original blobTypeFields method) are missing the blobExternalStoreField, while the newly added Spark side blobTypeFields in PR and the blobTypeFields in the alterTable path both contain the blobExternalStoreField. This leads to inconsistency in the determination of blob fields between Flink's table creation path (from CatalogTable) and the Alter TABLE path. The original method should be fixed to make it consistent. |
|
Thanks for working on this. I think this still has a blocking issue: the newly added engine scenarios do not pass locally. I ran the added Flink/Spark tests on the PR head (fe99faa) with Java 8: mvn -pl paimon-flink/paimon-flink-common -DskipITs=false -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -Dtest=SchemaChangeITCase#testAlterAddBlobColumn test
mvn -pl paimon-spark/paimon-spark-ut -DskipITs=false -Dcheckstyle.skip -Drat.skip=true -Dspotless.check.skip=true -Dtest=SparkSchemaEvolutionITCase#testAlterAddBlobColumn testBoth fail with the same validation error: So the intended workflow in the tests: ALTER TABLE ... SET ('blob-field' = 'picture');
ALTER TABLE ... ADD picture BYTES/BINARY;still ends up committing a normal binary column while the table option says that Please fix this before merge. The engine-side ALTER ADD COLUMN path needs to resolve the new column type using the effective table options after the previous/current option changes, not only the options carried by the ADD COLUMN request. In particular, Flink's After the fix, please make sure the added Flink and Spark tests pass for both |
@JingsongLi Thanks for pointing this out. This inconsistency did exist in the original blobTypeFields method, but it has been fixed in the latest commit: Flink now includes blobExternalStorageField() in blobTypeFields, so the create- |
Thanks for your remind! I've add some hint in the error message when users want to convert an existing field to Blob. |
|
@leaves12138 Thanks for your review! For the second issue:
Flink do not need to do this, because the |
JingsongLi
left a comment
There was a problem hiding this comment.
Review
Overall the approach is sound — making blob-field / blob-descriptor-field mutable and having Flink/Spark catalogs resolve BYTES/BINARY → BLOB type conversion at ALTER time is clean.
Issues
1. Spark resolveDataType applies .copy(isNullable) twice.
// resolveDataType():
return new BlobType().copy(add.isNullable()); // first copy
// caller in toSchemaChange():
resolveDataType(add, newOptions).copy(add.isNullable()) // second copyThe Flink side doesn't have this problem because resolveDataType there doesn't call .copy() internally. The Spark resolveDataType should return new BlobType() without .copy(), leaving the caller to apply nullability — matching the toPaimonType(add.dataType()) branch.
2. checkAlterBlobFieldOption checks against oldOptions — not accumulated newOptions.
checkAlterBlobFieldOption(oldTableSchema, oldOptions, setOption.key(), setOption.value());If a single commitChanges() call contains multiple SetOption changes to the same blob key (e.g., first set blob-field=a,b, then set blob-field=a), the second check still compares against the original oldOptions, not the state after the first SET. This means the "do not remove existing blob fields" check could be bypassed in a single batch:
schemaManager.commitChanges(
SchemaChange.setOption("blob-field", "a,b"), // adds b
SchemaChange.setOption("blob-field", "a") // removes b — check compares against original, not "a,b"
);Should check against newOptions instead. The only reason to use oldOptions here is to not pick up the option before it's put into the map, but the put happens on the next line anyway.
3. blob-view-field and blob-external-storage-field are still checked in isBlobFieldOption() and getBlobFields() but marked as immutable.
The PR states these are still immutable, but checkAlterBlobFieldOption and the RemoveOption guard both check all four blob option keys. Since blob-view-field and blob-external-storage-field are @Immutable, the existing checkAlterTableOption already blocks mutations. The overlap is harmless but confusing — a user trying to SET blob-view-field gets two error messages from different checks. Consider either: (a) skip the blob-specific check for immutable keys, or (b) remove @Immutable from those too and handle them uniformly.
4. No validation that the SET happens before ADD in SchemaManager.generateTableSchema.
The PR documentation says "set blob-field before adding the column." But generateTableSchema processes changes in list order. If a caller passes [AddColumn("picture", BYTES), SetOption("blob-field", "picture")], the AddColumn will create a BYTES field (not BLOB), then the SetOption will fail with "Cannot configure existing field." The Flink/Spark catalogs enforce the correct order in their UI flow, but the core API doesn't guard against this — it's just documentation. This is acceptable but worth noting.
5. FlinkCatalog.alterTable passes newTable.getOptions() to blobTypeFields() — relying on Flink framework behavior.
The newTable CatalogTable provided by Flink already merges the SET changes. This works for Flink's standard flow (option diff against old/new table). But the toSchemaChange method now receives blobTypeFields computed from the new table options. If the new table has blob-field=picture and the AddColumn for picture is in the changes list, resolveDataType correctly converts it to BLOB. Good.
However, the option diff logic earlier (lines 831-841) already adds SetOption("blob-field", "picture") to changes. Then later, toSchemaChange on the AddColumn converts it to BLOB. When SchemaManager processes these, the SetOption runs first (it's added before schemaChanges), then AddColumn(BLOB) runs. This works because checkAlterBlobFieldOption passes (field doesn't exist) and the column gets added as BLOB type. The ordering is implicitly correct because option diffs are added before column changes — but it's fragile. A comment explaining this dependency would help.
Minor
testAlterBlobFieldOptionCanRemoveNonExistingField— good edge case coverage- The Spark test
testAlterAddBlobColumnWithCombinedTableChangesproperly validates the combined-changes path via the catalog API - Doc section is clear about the required ordering
|
@JingsongLi
|
Purpose
This is the part0 of #7881
Currently, we can't add new blob columns through Flink/Spark sql. In this PR, I slightly changes the restriction of altering
blob-fieldsconfiguration, as below:blob-fieldandblob-descriptor-fieldare mutable nowThen during altering tables, new bytes fields with blob-fields configured will be converted to blob fields, as below:
Tests
See Unit Tests and ITCases