Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,38 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}

import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs

object HiveConnectorUtils extends Logging {

private val castCtor: DynConstructors.Ctor[Expression] =
DynConstructors.builder(classOf[Expression])
.impl(
classOf[Cast],
classOf[Expression],
classOf[DataType],
classOf[Option[_]])
.build[Expression]()

// Reflection-based constructor lookup to keep compatible
// with the Cast constructor signature change introduced by SPARK-40054: 3.4.0.
def castExpression(
child: Expression,
dataType: DataType,
timeZoneId: Option[String] = None): Expression = {
castCtor.newInstance(child, dataType, timeZoneId)
}

def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat =
Try { // SPARK-43186: 3.5.0
DynConstructors.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.spark.connector.hive

import java.net.URI
import java.util
import java.util.Locale

Expand All @@ -26,16 +27,19 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal}
import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
Expand All @@ -46,7 +50,7 @@ case class HiveTable(
sparkSession: SparkSession,
catalogTable: CatalogTable,
hiveTableCatalog: HiveTableCatalog)
extends Table with SupportsRead with SupportsWrite with Logging {
extends Table with SupportsRead with SupportsWrite with SupportsPartitionManagement with Logging {

lazy val dataSchema: StructType = catalogTable.dataSchema

Expand Down Expand Up @@ -112,4 +116,96 @@ case class HiveTable(
override def capabilities(): util.Set[TableCapability] = {
util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC)
}

override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = {
val spec = toPartitionSpec(ident)
val location = Option(properties.get(HiveTableProperties.LOCATION)).map(new URI(_))
val newPart = CatalogTablePartition(
spec,
catalogTable.storage.copy(locationUri = location),
properties.asScala.toMap)
hiveTableCatalog.externalCatalog.createPartitions(
catalogTable.database,
catalogTable.identifier.table,
Seq(newPart),
ignoreIfExists = false)
}

override def dropPartition(ident: InternalRow): Boolean = {
try {
hiveTableCatalog.externalCatalog.dropPartitions(
catalogTable.database,
catalogTable.identifier.table,
Seq(toPartitionSpec(ident)),
ignoreIfNotExists = false,
purge = false,
retainData = false)
true
} catch {
case _: NoSuchPartitionException => false
}
}

override def replacePartitionMetadata(
ident: InternalRow,
properties: util.Map[String, String]): Unit = {
throw new UnsupportedOperationException("Replace partition metadata is not supported")
}

override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = {
val spec = toPartitionSpec(ident)
val partition = hiveTableCatalog.externalCatalog.getPartition(
catalogTable.database,
catalogTable.identifier.table,
spec)
val metadata = new util.HashMap[String, String](partition.parameters.asJava)
partition.storage.locationUri.foreach { uri =>
metadata.put(HiveTableProperties.LOCATION, uri.toString)
}
metadata
}

override def listPartitionIdentifiers(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
val partialSpec = if (names.isEmpty) {
None
} else {
val fields = names.map(partitionSchema(_))
val schema = StructType(fields)
Some(toPartitionSpec(ident, schema))
}
hiveTableCatalog.externalCatalog.listPartitions(
catalogTable.database,
catalogTable.identifier.table,
partialSpec).map { part =>
val values = partitionSchema.map { field =>
val strValue = part.spec(field.name)
if (strValue == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
} else {
HiveConnectorUtils.castExpression(Literal(strValue), field.dataType).eval()
}
}
new GenericInternalRow(values.toArray)
}.toArray
}

private def toPartitionSpec(ident: InternalRow, schema: StructType): Map[String, String] = {
require(
schema.size == ident.numFields,
s"Schema size (${schema.size}) does not match numFields (${ident.numFields})")
schema.zipWithIndex.map { case (field, index) =>
val value = ident.get(index, field.dataType)
val fieldValue = HiveConnectorUtils.castExpression(
Literal(value, field.dataType),
StringType,
Some(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval().toString
field.name -> fieldValue
}.toMap
}

private def toPartitionSpec(ident: InternalRow): Map[String, String] = {
toPartitionSpec(ident, partitionSchema)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.hive

object HiveTableProperties {
val LOCATION = "location"
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ abstract class KyuubiHiveTest extends QueryTest with Logging {
SupportsNamespaces.PROP_LOCATION,
SupportsNamespaces.PROP_OWNER)

protected val catalogName: String = "hive"

override def beforeEach(): Unit = {
super.beforeAll()
getOrCreateSpark()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class CreateNamespaceV1Suite extends CreateNamespaceSuiteBase {

val SESSION_CATALOG_NAME: String = "spark_catalog"

override protected val catalogName: String = SESSION_CATALOG_NAME
override protected def catalogName: String = SESSION_CATALOG_NAME

override protected def catalogVersion: String = "V1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ trait DDLCommandTestUtils extends KyuubiHiveTest {
// Name of the command as SQL statement, for instance "SHOW PARTITIONS"
protected def command: String

protected def catalogName: String = "hive"
Copy link
Copy Markdown
Contributor Author

@yabola yabola May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strangely, when I ran the tests myself, I had to change catalogName from a val to a def to make V1 use the spark_catalog name. Otherwise, it always resolved to hive.


// Overrides the `test` method, and adds a prefix to easily find identify the catalog to which
// the failed test in logs belongs to.
override def test(testName: String, testTags: Tag*)(testFun: => Any /* Assertion */ )(implicit
Expand Down Expand Up @@ -82,6 +84,8 @@ trait DDLCommandTestUtils extends KyuubiHiveTest {
ns: String,
tableName: String,
cat: String = catalogName)(f: String => Unit): Unit = {
// scalastyle:off println
logInfo(s"${catalogName} catalogVersion is ${catalogVersion}")
val nsCat = s"$cat.$ns"
withNamespace(nsCat) {
sql(s"CREATE NAMESPACE $nsCat")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ trait DropNamespaceSuiteBase extends DDLCommandTestUtils {

class DropNamespaceV2Suite extends DropNamespaceSuiteBase {

override protected def catalogName: String = "hive"

override protected def catalogVersion: String = "Hive V2"

override protected def commandVersion: String = V2_COMMAND_VERSION
Expand All @@ -116,7 +118,7 @@ class DropNamespaceV1Suite extends DropNamespaceSuiteBase {

val SESSION_CATALOG_NAME: String = "spark_catalog"

override protected val catalogName: String = SESSION_CATALOG_NAME
override protected def catalogName: String = SESSION_CATALOG_NAME

override protected def catalogVersion: String = "V1"

Expand Down
Loading
Loading