Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 74 additions & 61 deletions spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,35 @@ class DeltaSinkSuite

import testImplicits._

/**
* Run a sink test against a name-based (catalog) target table.
*/
protected def withSinkTarget(f: (String, File) => Unit): Unit = {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned we are losing test coverage for path-based tables.

withTempDir { checkpointDir =>
// Unique table name per invocation: the target is a managed table at a deterministic
// warehouse path, so a fixed name leaks state across tests (stale data / DeltaLog cache).
val table = "test_delta_sink_" + checkpointDir.getName.replaceAll("[^A-Za-z0-9]", "")
withTable(table) {
f(table, checkpointDir)
}
}
}

test("append mode") {
failAfter(streamingTimeout) {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val query = df.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
val log = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
.toTable(table)
val log = DeltaLog.forTable(spark, TableIdentifier(table))
try {
inputData.addData(1)
query.processAllAvailable()

val outputDf = spark.read.format("delta").load(outputDir.getCanonicalPath)
val outputDf = spark.read.table(table)
checkDatasetUnorderly(outputDf.as[Int], 1)
assert(log.update().transactions.head == (query.id.toString -> 0L))

Expand All @@ -119,7 +133,7 @@ class DeltaSinkSuite

test("complete mode") {
failAfter(streamingTimeout) {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val query =
Expand All @@ -128,13 +142,13 @@ class DeltaSinkSuite
.outputMode("complete")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
val log = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
.toTable(table)
val log = DeltaLog.forTable(spark, TableIdentifier(table))
try {
inputData.addData(1)
query.processAllAvailable()

val outputDf = spark.read.format("delta").load(outputDir.getCanonicalPath)
val outputDf = spark.read.table(table)
checkDatasetUnorderly(outputDf.as[Long], 1L)
assert(log.update().transactions.head == (query.id.toString -> 0L))

Expand All @@ -158,15 +172,15 @@ class DeltaSinkSuite

test("update mode: not supported") {
failAfter(streamingTimeout) {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val e = intercept[AnalysisException] {
df.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.outputMode("update")
.format("delta")
.start(outputDir.getCanonicalPath)
.toTable(table)
}
Seq("update", "not support").foreach { msg =>
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(msg))
Expand Down Expand Up @@ -194,7 +208,7 @@ class DeltaSinkSuite
}

test("SPARK-21167: encode and decode path correctly") {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[String]
val query = inputData.toDS()
.map(s => (s, s.length))
Expand All @@ -203,7 +217,7 @@ class DeltaSinkSuite
.partitionBy("value")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
.toTable(table)

try {
// The output is partitioned by "value", so the value will appear in the file path.
Expand All @@ -212,7 +226,7 @@ class DeltaSinkSuite
failAfter(streamingTimeout) {
query.processAllAvailable()
}
val outputDf = spark.read.format("delta").load(outputDir.getCanonicalPath)
val outputDf = spark.read.table(table)
checkDatasetUnorderly(outputDf.as[(String, Int)], ("hello world", "hello world".length))
} finally {
query.stop()
Expand All @@ -221,7 +235,7 @@ class DeltaSinkSuite
}

test("partitioned writing and batch reading") {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
val query =
Expand All @@ -231,15 +245,15 @@ class DeltaSinkSuite
.partitionBy("id")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
.toTable(table)
try {

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

val outputDf = spark.read.format("delta").load(outputDir.getCanonicalPath)
val outputDf = spark.read.table(table)
val expectedSchema = new StructType()
.add(StructField("id", IntegerType))
.add(StructField("value", IntegerType))
Expand Down Expand Up @@ -301,7 +315,7 @@ class DeltaSinkSuite
}

test("work with aggregation + watermark") {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Long]
val inputDF = inputData.toDF.toDF("time")
val outputDf = inputDF
Expand All @@ -315,7 +329,7 @@ class DeltaSinkSuite
outputDf.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
.toTable(table)
try {
def addTimestamp(timestampInSecs: Int*): Unit = {
inputData.addData(timestampInSecs.map(_ * 1L): _*)
Expand All @@ -325,7 +339,7 @@ class DeltaSinkSuite
}

def check(expectedResult: ((Long, Long), Long)*): Unit = {
val outputDf = spark.read.format("delta").load(outputDir.getCanonicalPath)
val outputDf = spark.read.table(table)
.selectExpr(
"CAST(start as BIGINT) AS start",
"CAST(end as BIGINT) AS end",
Expand All @@ -350,7 +364,7 @@ class DeltaSinkSuite
}

test("throw exception when users are trying to write in batch with different partitioning") {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
val query =
Expand All @@ -360,24 +374,26 @@ class DeltaSinkSuite
.partitionBy("id")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
.toTable(table)
try {

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

val e = intercept[AnalysisException] {
val e = intercept[IllegalArgumentException] {
spark.range(100)
.select('id.cast("integer"), 'id % 4 as "by4", 'id.cast("integer") * 1000 as "value")
.write
.format("delta")
.partitionBy("id", "by4")
.mode("append")
.save(outputDir.getCanonicalPath)
.saveAsTable(table)
}
assert(e.getMessage.contains("Partition columns do not match"))
assert(
e.getMessage.contains(
"The provided partitioning or clustering columns do not match the existing table's"))

} finally {
query.stop()
Expand Down Expand Up @@ -464,32 +480,31 @@ class DeltaSinkSuite
}

test("can't write out with all columns being partition columns") {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
val query =
// Name-based: creating a table partitioned by all of its columns is rejected up front (at
// table creation), rather than surfacing as a StreamingQueryException once the stream runs.
val e = intercept[AnalysisException] {
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
.partitionBy("id", "value")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("delta")
.start(outputDir.getCanonicalPath)
val e = intercept[StreamingQueryException] {
inputData.addData(1)
query.awaitTermination(30000)
.toTable(table)
}
assert(e.cause.isInstanceOf[AnalysisException])
assert(e.getMessage.contains("Cannot use all columns for partition columns"))
}
}

test("streaming write correctly sets isBlindAppend in CommitInfo") {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>

val input = MemoryStream[Int]
val inputDataStream = input.toDF().toDF("value")

def tableData: DataFrame = spark.read.format("delta").load(outputDir.toString)
def tableData: DataFrame = spark.read.table(table)

def appendToTable(df: DataFrame): Unit = failAfter(streamingTimeout) {
var q: StreamingQuery = null
Expand All @@ -498,7 +513,7 @@ class DeltaSinkSuite
q = df.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.toString)
.start(outputDir.toString)
.toTable(table)
q.processAllAvailable()
} finally {
if (q != null) q.stop()
Expand All @@ -507,7 +522,7 @@ class DeltaSinkSuite

var lastCheckedVersion = -1L
def isLastCommitBlindAppend: Boolean = {
val log = DeltaLog.forTable(spark, outputDir.toString)
val log = DeltaLog.forTable(spark, TableIdentifier(table))
val lastVersion = log.update().version
assert(lastVersion > lastCheckedVersion, "no new commit was made")
lastCheckedVersion = lastVersion
Expand Down Expand Up @@ -538,48 +553,46 @@ class DeltaSinkSuite
.add("i", IntegerType, nullable = false))
.add("c", IntegerType, nullable = false)

withTempDir { base =>
val sourceDir = new File(base, "source").getCanonicalPath
val tableDir = new File(base, "output").getCanonicalPath
val chkDir = new File(base, "checkpoint").getCanonicalPath

FileUtils.write(new File(sourceDir, "a.json"), jsonRec)
withSinkTarget { (table, checkpointDir) =>
withTempDir { sourceDir =>
FileUtils.write(new File(sourceDir, "a.json"), jsonRec)

val q = spark.readStream
.format("json")
.schema(schema)
.load(sourceDir)
.withColumn("file", input_file_name()) // Not sure why needs this to reproduce
.writeStream
.format("delta")
.trigger(org.apache.spark.sql.streaming.Trigger.Once)
.option("checkpointLocation", chkDir)
.start(tableDir)
val q = spark.readStream
.format("json")
.schema(schema)
.load(sourceDir.getCanonicalPath)
.withColumn("file", input_file_name()) // Not sure why needs this to reproduce
.writeStream
.format("delta")
.trigger(org.apache.spark.sql.streaming.Trigger.Once)
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.toTable(table)

q.awaitTermination()
q.awaitTermination()

checkAnswer(
spark.read.format("delta").load(tableDir).drop("file"),
Seq(Row("ss", Row("ss", null), null)))
checkAnswer(
spark.read.table(table).drop("file"),
Seq(Row("ss", Row("ss", null), null)))
}
}
}

test("history includes user-defined metadata for DataFrame.writeStream API") {
failAfter(streamingTimeout) {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val query = df.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.option("userMetadata", "testMeta!")
.format("delta")
.start(outputDir.getCanonicalPath)
val log = DeltaLog.forTable(spark, outputDir.getCanonicalPath)
.toTable(table)
val log = DeltaLog.forTable(spark, TableIdentifier(table))

inputData.addData(1)
query.processAllAvailable()

val lastCommitInfo = io.delta.tables.DeltaTable.forPath(spark, outputDir.getCanonicalPath)
val lastCommitInfo = io.delta.tables.DeltaTable.forName(spark, table)
.history(1).as[DeltaHistory].head

assert(lastCommitInfo.userMetadata === Some("testMeta!"))
Expand Down Expand Up @@ -625,7 +638,7 @@ class DeltaSinkSuite

test("DeltaSink rejects DataFrame with UDT containing NullType") {
failAfter(streamingTimeout) {
withTempDirs { (outputDir, checkpointDir) =>
withSinkTarget { (table, checkpointDir) =>
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
val dsWriter =
Expand All @@ -636,7 +649,7 @@ class DeltaSinkSuite
.format("delta")

val wrapperException = intercept[StreamingQueryException] {
val q = dsWriter.start(outputDir.getCanonicalPath)
val q = dsWriter.toTable(table)
inputData.addData(42)
q.processAllAvailable()
}
Expand Down
Loading