Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
829 changes: 829 additions & 0 deletions pkg/sql/colexec/external/hive_partition.go

Large diffs are not rendered by default.

825 changes: 825 additions & 0 deletions pkg/sql/colexec/external/hive_partition_coverage_test.go

Large diffs are not rendered by default.

430 changes: 430 additions & 0 deletions pkg/sql/colexec/external/hive_partition_fill.go

Large diffs are not rendered by default.

1,757 changes: 1,757 additions & 0 deletions pkg/sql/colexec/external/hive_partition_test.go

Large diffs are not rendered by default.

79 changes: 62 additions & 17 deletions pkg/sql/colexec/external/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,35 @@ var maxParquetBatchCnt int64 = 100000

func newParquetHandler(param *ExternalParam) (*ParquetHandler, error) {
h := ParquetHandler{
batchCnt: maxParquetBatchCnt,
batchCnt: maxParquetBatchCnt,
filepathColIndex: -1, // sentinel: not projected
}
err := h.openFile(param)
if err != nil {
return nil, err
}

// Empty file handling (0 rows): only check column count, skip column name and type checks.
// This aligns with DuckDB behavior for empty parquet files.
if h.file.NumRows() == 0 {
// Check if @vars are used in column list (LOAD DATA ... (col1, @v, col2))
// Parquet doesn't support @vars, report explicit error
if param.Extern.ExternType == int32(plan.ExternType_LOAD) &&
param.ColumnListLen > int32(len(param.Attrs)) {
return nil, moerr.NewNYI(param.Ctx, "parquet load with @variables in column list")
}

// Only check column count, not column names or types
// Column count must match exactly (align with DuckDB behavior)
parquetColCnt := len(h.file.Root().Columns())
tableColCnt := getParquetExpectedColCnt(param)
if parquetColCnt != tableColCnt {
return nil, moerr.NewInvalidInputf(param.Ctx,
"column count mismatch: parquet file has %d columns, but table has %d columns",
parquetColCnt, tableColCnt)
// Skip column count check in Hive mode: partition-only projections have
// 0 expected physical columns while the empty file still has schema columns.
if !param.Extern.HivePartitioning {
parquetColCnt := len(h.file.Root().Columns())
tableColCnt := getParquetExpectedColCnt(param)
if parquetColCnt != tableColCnt {
return nil, moerr.NewInvalidInputf(param.Ctx,
"column count mismatch: parquet file has %d columns, but table has %d columns",
parquetColCnt, tableColCnt)
}
}
// Return nil to indicate empty file, no data to load
// Caller treats (nil, nil) as "empty file, advance to next".
return nil, nil
}

// Non-empty file: use original logic (check column names and types)
err = h.prepare(param)
if err != nil {
return nil, err
Expand Down Expand Up @@ -191,6 +189,18 @@ func (h *ParquetHandler) prepare(param *ExternalParam) error {
continue
}

// Skip virtual columns: they are not in Parquet schema.
if param.isHivePartitionCol(attr.ColName) {
h.partitionColIndices = append(h.partitionColIndices, int(attr.ColIndex))
continue
}
if catalog.ContainExternalHidenCol(attr.ColName) {
h.filepathColIndex = int(attr.ColIndex)
continue
}

h.hasPhysicalCol = true

// Use case-insensitive column lookup (fix for issue #15621)
col, err := h.findColumnIgnoreCase(param.Ctx, attr.ColName)
if err != nil {
Expand Down Expand Up @@ -234,6 +244,10 @@ func (h *ParquetHandler) prepare(param *ExternalParam) error {
h.pages[attr.ColIndex] = col.Pages()
}

if !h.hasPhysicalCol && (len(h.partitionColIndices) > 0 || h.filepathColIndex >= 0) {
h.rowCountOnly = true
}

// init row reader if has nested columns
if h.hasNestedCols {
h.rowReader = parquet.NewReader(h.file)
Expand Down Expand Up @@ -1790,12 +1804,39 @@ func bigIntToTwosComplementBytes(ctx context.Context, bi *big.Int, size int) ([]
}

func (h *ParquetHandler) getData(bat *batch.Batch, param *ExternalParam, proc *process.Process) error {
if h.rowCountOnly {
return h.getDataRowCountOnly(bat)
}
if h.hasNestedCols {
return h.getDataByRow(bat, param, proc)
}
return h.getDataByPage(bat, param, proc)
}

func (h *ParquetHandler) getDataRowCountOnly(bat *batch.Batch) error {
batchLimit := int(h.batchCnt)
rowCount := 0

if h.rowCountRemaining > 0 {
rowCount = min(h.rowCountRemaining, batchLimit)
h.rowCountRemaining -= rowCount
} else {
rgs := h.file.RowGroups()
if h.currentRowGroup >= len(rgs) {
bat.SetRowCount(0)
return nil
}
total := int(rgs[h.currentRowGroup].NumRows())
h.currentRowGroup++
rowCount = min(total, batchLimit)
h.rowCountRemaining = total - rowCount
}

h.offset += int64(rowCount)
bat.SetRowCount(rowCount)
return nil
}

func (h *ParquetHandler) getDataByPage(bat *batch.Batch, param *ExternalParam, proc *process.Process) error {
length := 0
finish := false
Expand Down Expand Up @@ -1991,9 +2032,13 @@ func parseStringToDecimal128(s string, precision, scale int32) (types.Decimal128
func getParquetExpectedColCnt(param *ExternalParam) int {
cnt := 0
for _, attr := range param.Attrs {
if !catalog.ContainExternalHidenCol(attr.ColName) {
cnt++
if catalog.ContainExternalHidenCol(attr.ColName) {
continue
}
if param.isHivePartitionCol(attr.ColName) {
continue
}
cnt++
}
return cnt
}
15 changes: 14 additions & 1 deletion pkg/sql/colexec/external/reader_parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

// ParquetReader handles Parquet format files.
// Phase 1: thin wrapper around existing ParquetHandler logic.
type ParquetReader struct {
param *ExternalParam
h *ParquetHandler
Expand All @@ -35,6 +34,9 @@ func NewParquetReader(param *ExternalParam, proc *process.Process) *ParquetReade

func (r *ParquetReader) Open(param *ExternalParam, proc *process.Process) (fileEmpty bool, err error) {
r.param = param
if err := param.refreshPartitionValues(); err != nil {
return false, err
}
r.h, err = newParquetHandler(param)
if err != nil {
return false, err
Expand Down Expand Up @@ -64,6 +66,17 @@ func (r *ParquetReader) ReadBatch(
return false, err
}

// Virtual column fill is independent of rowCountOnly: both physical-col
// branches (getDataByPage / getDataByRow) and rowCountOnly need to stamp
// the hive partition values and __mo_filepath into their vectors whenever
// those columns are projected. rowCountOnly in prepare() only gates the
// getData dispatch (no mapper reads), not the virtual-column fill.
if buf.RowCount() > 0 && (r.h.filepathColIndex >= 0 || len(r.h.partitionColIndices) > 0) {
if err := r.h.fillVirtualColumns(buf, r.param, proc); err != nil {
return false, err
}
}

// Check if file is finished: getData sets offset and checks NumRows
if r.h.file != nil && r.h.offset >= r.h.file.NumRows() {
return true, nil
Expand Down
13 changes: 11 additions & 2 deletions pkg/sql/colexec/external/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type ExParamConst struct {
}

type ExParam struct {
Fileparam *ExFileparam
Filter *FilterParam
Fileparam *ExFileparam
Filter *FilterParam
currentPartValues map[string]string
}

type ExFileparam struct {
Expand Down Expand Up @@ -285,6 +286,14 @@ type ParquetHandler struct {
// for nested types support
hasNestedCols bool
rowReader *parquet.Reader

// virtual column support (hive partitions + __mo_filepath)
partitionColIndices []int
filepathColIndex int // -1 = not projected
hasPhysicalCol bool
rowCountOnly bool
currentRowGroup int
rowCountRemaining int
}

type columnMapper struct {
Expand Down
73 changes: 73 additions & 0 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,11 @@ func (c *Compile) getReadWriteParallelFlag(param *tree.ExternParam, fileList []s
}

func (c *Compile) getExternalFileListAndSize(node *plan.Node, param *tree.ExternParam) (fileList []string, fileSize []int64, err error) {
// Hive partition tables use recursive list-and-filter discovery, not ReadDir.
// ReadDir requires glob patterns in filepath; Hive base paths are opaque directories.
if param.HivePartitioning {
return c.getHivePartitionFileList(node, param)
}
switch node.ExternScan.Type {
case int32(plan.ExternType_EXTERNAL_TB):
t := time.Now()
Expand Down Expand Up @@ -1637,6 +1642,68 @@ func (c *Compile) getExternalFileListAndSize(node *plan.Node, param *tree.Extern
return fileList, fileSize, nil
}

func (c *Compile) getHivePartitionFileList(node *plan.Node, param *tree.ExternParam) ([]string, []int64, error) {
partColSet := toLowerSet(param.HivePartitionCols)
partFilters, fpFilters, rowFilters := external.ClassifyFilters(
node.TableDef, node.FilterList, partColSet)

preds := external.ExtractPartitionPredicatesFromExprs(node.TableDef, partFilters, partColSet)

listDir := external.NewListDirFunc(param)
result, err := external.DiscoverHivePartitions(
c.proc.Ctx, listDir, param.Filepath,
param.HivePartitionCols, param.HivePartitionColTypes, preds)
if err != nil {
return nil, nil, err
}

fileList := make([]string, len(result.Files))
fileSize := make([]int64, len(result.Files))
for i, f := range result.Files {
fileList[i] = f.FilePath
fileSize[i] = f.FileSize
}

if len(fpFilters) > 0 {
var leftover []*plan.Expr
fileList, fileSize, leftover, err = runFilePathFilters(c.proc.Ctx, c.proc, node.TableDef, fpFilters, fileList, fileSize)
if err != nil {
return nil, nil, err
}
rowFilters = append(rowFilters, leftover...)
}

node.FilterList = rowFilters
return fileList, fileSize, nil
}

func runFilePathFilters(
ctx context.Context,
proc *process.Process,
tableDef *plan.TableDef,
fpFilters []*plan.Expr,
fileList []string,
fileSize []int64,
) ([]string, []int64, []*plan.Expr, error) {
tmpNode := &plan.Node{
TableDef: tableDef,
FilterList: fpFilters,
}
outFileList, outFileSize, err := external.FilterFileList(ctx, tmpNode, proc, fileList, fileSize)
if err != nil {
return nil, nil, nil, err
}
return outFileList, outFileSize, tmpNode.FilterList, nil
}

func toLowerSet(cols []string) map[string]bool {
m := make(map[string]bool, len(cols))
for _, col := range cols {
m[strings.ToLower(col)] = true
}
return m
}

func (c *Compile) compileExternScan(node *plan.Node) ([]*Scope, error) {
if c.isPrepare {
return nil, cantCompileForPrepareErr
Expand All @@ -1663,6 +1730,12 @@ func (c *Compile) compileExternScan(node *plan.Node) ([]*Scope, error) {
return c.compileExternValueScan(node, param, strictSqlMode)
}

// Hive partition tables must not enter parallel read paths — the parallel loop
// mutates param.Filepath per file, which breaks ExtractPartitionValues' base path.
if param.HivePartitioning {
param.Parallel = false
}

fileList, fileSize, err := c.getExternalFileListAndSize(node, param)
if err != nil {
return nil, err
Expand Down
Loading
Loading