Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.spark.internal.v2.read;

import io.delta.kernel.data.MapValue;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import java.util.Objects;
import java.util.Optional;

/** Narrow descriptor for a Delta file selected by a DSv2 scan. */
public final class DeltaScanFile {

private final String path;
private final MapValue partitionValues;
private final long size;
private final Optional<Long> baseRowId;
private final Optional<Long> defaultRowCommitVersion;
private final Optional<DeletionVectorDescriptor> deletionVector;

DeltaScanFile(AddFile addFile) {
this.path = Objects.requireNonNull(addFile, "addFile is null").getPath();
this.partitionValues = addFile.getPartitionValues();
this.size = addFile.getSize();
this.baseRowId = addFile.getBaseRowId();
this.defaultRowCommitVersion = addFile.getDefaultRowCommitVersion();
this.deletionVector = addFile.getDeletionVector();
}

public String getPath() {
return path;
}

public MapValue getPartitionValues() {
return partitionValues;
}

public long getSize() {
return size;
}

public Optional<Long> getBaseRowId() {
return baseRowId;
}

public Optional<Long> getDefaultRowCommitVersion() {
return defaultRowCommitVersion;
}

public Optional<DeletionVectorDescriptor> getDeletionVector() {
return deletionVector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim
private final DeltaOptions deltaOptions;
private final ZoneId zoneId;

// Planned input files and stats
// Planned input files and the corresponding selected AddFile actions.
private List<PartitionedFile> partitionedFiles = new ArrayList<>();
// Per-file row counts, parallel to partitionedFiles. Populated only while rowCountKnown is
// true; cleared if any AddFile lacks numRecords. Retained so totalRows can be recomputed
// after runtime partition filtering prunes files, instead of invalidating the count.
private List<Long> perFileRowCounts = new ArrayList<>();
private List<DeltaScanFile> selectedFiles = new ArrayList<>();
private long totalBytes = 0L;
private long totalRows = 0L;
// true iff every AddFile in the scan had numRecords in its stats JSON.
Expand Down Expand Up @@ -466,6 +467,7 @@ private void planScanFiles() {

totalBytes += addFile.getSize();
partitionedFiles.add(partitionedFile);
selectedFiles.add(new DeltaScanFile(addFile));

if (rowCountKnown) {
Optional<Long> numRecords = addFile.getNumRecords();
Expand Down Expand Up @@ -509,6 +511,7 @@ private synchronized void ensurePlanned(List<RuntimePredicate> runtimePredicates
}

List<PartitionedFile> runtimeFilteredPartitionedFiles = new ArrayList<>();
List<DeltaScanFile> runtimeFilteredFiles = new ArrayList<>();
// Parallel to runtimeFilteredPartitionedFiles; only used when rowCountKnown is true.
List<Long> filteredRowCounts = rowCountKnown ? new ArrayList<>() : null;
long newTotalRows = 0L;
Expand All @@ -520,6 +523,7 @@ private synchronized void ensurePlanned(List<RuntimePredicate> runtimePredicates
.allMatch(predicate -> predicate.evaluator.eval(partitionValues));
if (allMatch) {
runtimeFilteredPartitionedFiles.add(pf);
runtimeFilteredFiles.add(this.selectedFiles.get(i));
if (rowCountKnown) {
long rc = this.perFileRowCounts.get(i);
filteredRowCounts.add(rc);
Expand All @@ -532,6 +536,7 @@ private synchronized void ensurePlanned(List<RuntimePredicate> runtimePredicates
// is filtered out
if (runtimeFilteredPartitionedFiles.size() < this.partitionedFiles.size()) {
this.partitionedFiles = runtimeFilteredPartitionedFiles;
this.selectedFiles = runtimeFilteredFiles;
this.totalBytes =
runtimeFilteredPartitionedFiles.stream().mapToLong(PartitionedFile::fileSize).sum();
this.estimatedSizeInBytes = computeEstimatedSizeWithColumnProjection(this.totalBytes);
Expand All @@ -555,6 +560,19 @@ public StructType getDataSchema() {
return dataSchema;
}

/**
* Returns the Delta files selected by this scan after pushdown and runtime filtering.
*
* <p>The returned descriptors preserve only the metadata needed by row-level ReplaceData commits
* to construct matching RemoveFile actions.
*
* @apiNote Internal API for the DSv2 DML write path (see {@code DeltaReplaceDataBatchWrite}).
*/
public List<DeltaScanFile> getSelectedFiles() {
ensurePlanned();
return Collections.unmodifiableList(selectedFiles);
}

public StructType getPartitionSchema() {
return partitionSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import io.delta.kernel.Snapshot;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.InternalUtils;
import io.delta.spark.internal.v2.read.ColumnReorderReadFunction;
import io.delta.spark.internal.v2.read.DeltaParquetFileFormatV2;
import io.delta.spark.internal.v2.read.SparkReaderFactory;
Expand All @@ -32,9 +34,13 @@
import io.delta.spark.internal.v2.read.deletionvector.DeletionVectorSchemaContext;
import io.delta.spark.internal.v2.read.metadata.MetadataStructReadFunction;
import io.delta.spark.internal.v2.read.metadata.MetadataStructSchemaContext;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -61,6 +67,7 @@
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -185,6 +192,114 @@ public static InternalRow getPartitionRow(
return new GenericInternalRow(values);
}

/**
* Convert an AddFile partition-value map keyed by physical column names into a logical-name map
* using the provided partition schema.
*/
public static Map<String, String> getLogicalPartitionValueStrings(
MapValue partitionValues, StructType partitionSchema) {
Map<String, String> rawPartitionValues = toJavaStringStringMap(partitionValues);
Map<String, String> logicalPartitionValues = new LinkedHashMap<>();
for (StructField field : partitionSchema.fields()) {
String physicalName = DeltaColumnMapping.getPhysicalName(field);
logicalPartitionValues.put(field.name(), rawPartitionValues.get(physicalName));
}
return logicalPartitionValues;
}

/**
* Convert a Kernel {@link MapValue} with string keys and string values into a Java map. Inlined
* here because the {@code VectorUtils} dropin wrapper no longer exposes {@code toJavaMap}; the
* iteration is trivial against {@link MapValue}'s {@code getKeys()/getValues()} column-vector
* API.
*/
private static Map<String, String> toJavaStringStringMap(MapValue mapValue) {
Map<String, String> result = new LinkedHashMap<>(mapValue.getSize());
for (int i = 0; i < mapValue.getSize(); i++) {
String value = mapValue.getValues().isNullAt(i) ? null : mapValue.getValues().getString(i);
result.put(mapValue.getKeys().getString(i), value);
}
return result;
}

/** Build a Kernel write-context partition map keyed by logical partition-column names. */
public static Map<String, Literal> buildKernelPartitionLiteralMap(
Map<String, String> logicalPartitionValueStrings, StructType partitionSchema, ZoneId zoneId) {
Map<String, Literal> partitionLiteralMap = new LinkedHashMap<>();
for (StructField field : partitionSchema.fields()) {
String logicalName = field.name();
String rawValue = logicalPartitionValueStrings.get(logicalName);
if (rawValue == null) {
partitionLiteralMap.put(
logicalName,
Literal.ofNull(SchemaUtils.convertSparkDataTypeToKernelDataType(field.dataType())));
continue;
}
Object typedValue =
PartitioningUtils.castPartValueToDesiredType(field.dataType(), rawValue, zoneId);
partitionLiteralMap.put(
logicalName, convertPartitionValueToKernelLiteral(typedValue, field.dataType()));
}
return partitionLiteralMap;
}

private static Literal convertPartitionValueToKernelLiteral(
Object value, org.apache.spark.sql.types.DataType sparkDataType) {
if (value == null) {
throw new IllegalArgumentException(
"Partition literal conversion does not accept null values");
}
if (value instanceof Boolean) {
return Literal.ofBoolean((Boolean) value);
}
if (value instanceof Byte) {
return Literal.ofByte((Byte) value);
}
if (value instanceof Short) {
return Literal.ofShort((Short) value);
}
if (value instanceof Integer) {
return Literal.ofInt((Integer) value);
}
if (value instanceof Long) {
return Literal.ofLong((Long) value);
}
if (value instanceof Float) {
return Literal.ofFloat((Float) value);
}
if (value instanceof Double) {
return Literal.ofDouble((Double) value);
}
if (value instanceof BigDecimal) {
BigDecimal decimal = (BigDecimal) value;
DecimalType decimalType = (DecimalType) sparkDataType;
return Literal.ofDecimal(decimal, decimalType.precision(), decimalType.scale());
}
if (value instanceof org.apache.spark.sql.types.Decimal) {
org.apache.spark.sql.types.Decimal decimal = (org.apache.spark.sql.types.Decimal) value;
DecimalType decimalType = (DecimalType) sparkDataType;
return Literal.ofDecimal(
decimal.toJavaBigDecimal(), decimalType.precision(), decimalType.scale());
}
if (value instanceof UTF8String) {
return Literal.ofString(value.toString());
}
if (value instanceof String) {
return Literal.ofString((String) value);
}
if (value instanceof byte[]) {
return Literal.ofBinary((byte[]) value);
}
if (value instanceof Date) {
return Literal.ofDate(InternalUtils.daysSinceEpoch((Date) value));
}
if (value instanceof Timestamp) {
return Literal.ofTimestamp(InternalUtils.microsSinceEpoch((Timestamp) value));
}
throw new IllegalArgumentException(
"Unsupported partition literal value type: " + value.getClass().getName());
}

/**
* Build a PartitionedFile from an AddFile with the given partition schema and table path.
*
Expand Down
Loading
Loading