Skip to content

[Spark] Add asynchronous Auto Compaction (PR 1: core async machinery)#7090

Open
mwc360 wants to merge 3 commits into
delta-io:masterfrom
mwc360:mcole_async_auto_compact
Open

[Spark] Add asynchronous Auto Compaction (PR 1: core async machinery)#7090
mwc360 wants to merge 3 commits into
delta-io:masterfrom
mwc360:mcole_async_auto_compact

Conversation

@mwc360

@mwc360 mwc360 commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Introduces an opt-in mode that runs Auto Compaction on a JVM-wide daemon thread pool instead of inline on the writer thread, eliminating the write-latency impact of AC for streaming and high-throughput workloads. Default behavior is unchanged.

Components:

  • AsyncAutoCompactService: singleton with a bounded ThreadPoolExecutor (size = autoCompact.async.parallelism, queue = autoCompact.async.maxQueueSize) and a per-table inflight counter. Submitting a CommittedTransaction is fire-and-forget; the worker re-reads deltaLog.update() and re-runs AutoCompact.compactIfNecessary against the fresh snapshot, so a peer that already compacted causes the worker to skip cleanly.

    • Sets/clears SparkSession.{active,default} on the worker thread so downstream code paths that consult SparkSession.getActiveSession (e.g. OptimizeExecutor, DeltaUDF) see the right session.
    • Wraps execution in NonFatal try/catch and records a telemetry event; writer threads are never observably affected by async failures.
    • Backpressure: queue-full submissions are recorded as delta.autoCompaction.async.dropped and silently declined.
  • New session-only confs (no table property -- async is writer-mechanics, no need for table specifics):

    • spark.databricks.delta.autoCompact.async.enabled (default false)
    • spark.databricks.delta.autoCompact.async.parallelism (default 2)
    • spark.databricks.delta.autoCompact.async.maxQueueSize (default 64)
    • spark.databricks.delta.autoCompact.async.backpressure (default drop, to be expanded in later PR)

Telemetry: delta.autoCompaction.async.{submitted,dropped,error,completed}.

Correctness: AutoCompactPartitionReserve still serializes overlapping AC runs because eligibility + reservation + optimize all execute on the same worker thread. OPTIMIZE conflict-retry (OptimizeExecutor.commitAndRetry) covers cross-JVM races as it does today. No new conflict class is introduced.

Shutdown drain and additional backpressure modes (block, fallback-inline) are intentionally reserved for follow-up PRs.

Resolves PR 1 of #7089

How was this patch tested?

Tests (AutoCompactAsyncSuite, 5 tests, all passing):

  • async disabled by default: writer thread runs AC inline
  • async enabled: AC commit lands after writer returns and worker drains
  • async enabled: submission telemetry event is recorded
  • async re-evaluates eligibility on a fresh snapshot
  • async queue full: writer succeeds and dropped event is recorded

Perf benchmark:
Config: 50 sequential writes, 8 small files per write, sync MIN_NUM_FILES=24, async MIN_NUM_FILES=0, async parallelism=2.

Metric Sync (inline AC) Async (off writer thread) Improvement
Writer total 31 230 ms 20 588 ms 1.52× faster
Mean latency 625 ms 412 ms 1.52×
p50 549 ms 388 ms 1.41×
p95 1 192 ms 549 ms 2.17× better tail
p99 1 345 ms 580 ms 2.32× better tail
Max 1 345 ms 580 ms 2.32×
Min 367 ms 336 ms
Async drain (post-loop) 40 ms
End-to-end (incl. drain) 31 230 ms 20 628 ms 1.51×
AC OPTIMIZE commits (auto=true) 17 17 identical
Writes with AC visible (vΔ≥2) 17/50 17/50 identical

Does this PR introduce any user-facing changes?

AutoCompactBase.run: branches on AsyncAutoCompactService.isEnabled. Inline path is byte-identical to today when the flag is off. Disabled by default, users can opt-in to it. It may make sense to make this the default mode in a future Delta major version.

@mwc360 mwc360 force-pushed the mcole_async_auto_compact branch from 374e1a0 to e63bd3f Compare June 24, 2026 22:01
mwc360 and others added 3 commits June 24, 2026 18:36
Introduces an opt-in mode that runs Auto Compaction on a JVM-wide daemon
thread pool instead of inline on the writer thread, eliminating the
write-latency impact of AC for streaming and high-throughput workloads.
Default behavior is unchanged.

Components:

* AsyncAutoCompactService: singleton with a bounded ThreadPoolExecutor
  (size = autoCompact.async.parallelism, queue = autoCompact.async.maxQueueSize)
  and a per-table inflight counter. Submitting a CommittedTransaction is
  fire-and-forget; the worker re-reads deltaLog.update() and re-runs
  AutoCompact.compactIfNecessary against the fresh snapshot, so a peer
  that already compacted causes the worker to skip cleanly.
  - Sets/clears SparkSession.{active,default} on the worker thread so
    downstream code paths that consult SparkSession.getActiveSession
    (e.g. OptimizeExecutor, DeltaUDF) see the right session.
  - Wraps execution in NonFatal try/catch and records a telemetry event;
    writer threads are never observably affected by async failures.
  - Backpressure: queue-full submissions are recorded as
    delta.autoCompaction.async.dropped and silently declined.

* AutoCompactBase.run: branches on AsyncAutoCompactService.isEnabled.
  Inline path is byte-identical to today when the flag is off.

* Per-table dedup: at most one running + one queued task per tableId.
  Redundant submissions are recorded as
  delta.autoCompaction.async.coalesced.

* New configs (DeltaSQLConf):
  - autoCompact.async.enabled (default false; opt-in)
  - autoCompact.async.parallelism (default 2)
  - autoCompact.async.maxQueueSize (default 64)
  - autoCompact.async.backpressure ("drop", reserved for future policies)

* Telemetry events: delta.autoCompaction.async.{submitted, coalesced,
  dropped, completed, error}.

Tests: AutoCompactAsyncSuite (6 tests) covers default-off behaviour,
opt-in commit-after-write-returns, snapshot freshness on a fresh
snapshot, queue-full backpressure, and per-table coalescing under load.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Async Auto Compaction introduced a possible race against in-flight user
DML (MERGE/DELETE/UPDATE/OVERWRITE): an AC OPTIMIZE that wins the commit
race causes the user's DML to throw ConcurrentModificationException
after potentially minutes of Spark work. Even though Delta retries are
cheap at the metadata level, MERGE's retry harness only handles
CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND, not CME, so users would see the
failure.

This change inverts the conflict: async AC yields to in-flight DML
rather than the other way around. AC is opportunistic background work;
user writes are the critical workload, so deferring AC by one cycle is
strictly preferable to surfacing an avoidable failure to a user query
that has already done expensive shuffles and DV writes.

Components:

* InflightDMLRegistry: JVM-wide singleton mapping tableId -> active DML
  counter. Supports a single acquire-listener slot per table (single is
  sufficient because the async-AC submission path's per-table dedup
  guarantees <= 1 async AC task per table at a time).

* Three coordination points in AsyncAutoCompactService:
   1. Dequeue-time check: cheap. If a DML is already active when the
      worker dequeues, skip silently and let the DML's own post-commit
      hook resubmit AC when it releases.
   2. Mid-flight cancellation: worker tags its Spark jobs with a job
      group and registers an acquire-listener. On DML acquire the
      listener calls sparkContext.cancelJobGroup(jobGroupId,
      interruptOnCancel=true), which surfaces as a SparkException on the
      driver and is caught.
   3. Pre-commit gate: a check inside OptimizeExecutor.commitAndRetry
      (guarded by a ThreadLocal `asyncWorkerThread` flag so manual
      OPTIMIZE is unaffected) throws AsyncAutoCompactCancelledException
      before the first commit attempt if DML went active.

  A re-check after listener install catches the narrow race where DML
  acquires between the dequeue check and the listener installation.

* DML hook points: MergeIntoCommandBase.run, DeleteCommand.run,
  UpdateCommand.run, and WriteIntoDelta.run (overwrite only -- APPEND
  skips because appends never conflict with AC) wrap their bodies in
  InflightDMLRegistry.acquire/release try/finally.

* Telemetry + observability:
   - WARN logs on every yield path with MDC(METADATA_ID).
   - Three telemetry events:
     delta.autoCompaction.async.yieldedToDML.{atDequeue, midFlight,
     preCommit}.
   - Test-visible per-table per-kind yield counters.

Tests: AutoCompactDMLYieldSuite covers registry primitives (reference
counting, listener safety), end-to-end dequeue yield, end-to-end
pre-commit yield, manual OPTIMIZE bypassing the yield gate, MERGE
acquire/release lifecycle, and composition with the per-table async-AC
dedup.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…L yield

Streaming MERGE/UPDATE/DELETE workloads commit small batches in tight
loops. With async Auto Compaction enabled, every commit submits an AC
task to the async pool; before that task can finish, the next DML batch
arrives and the in-flight AC yields to it (at dequeue, mid-flight, or
at pre-commit). The result is wasted Spark cycles: AC is submitted,
scheduled, partially executed, and then aborted -- without making any
progress on the table.

This change adds a sticky per-table fallback: when an async AC task
yields to DML for any reason, the table is marked as "inline-fallback"
in a JVM-wide ConcurrentHashMap. Future writes to that table run AC
inline on the writer's own commit path, so the work completes (no
future writer exists yet to cancel it) and no Spark cycles are wasted
on the async-submit-then-yield loop.

Design choices:
 - Sticky forever (until JVM restart). On a streaming pipeline where
   the first yield happened, the next yield is overwhelmingly likely;
   recovery heuristics add complexity for no real benefit.
 - One yield is enough to engage the fallback. All three yield kinds
   (atDequeue, midFlight, preCommit) count.
 - Opt-out via the new conf
   spark.databricks.delta.autoCompact.async.fallbackToInline.enabled
   (default true). When false, behavior is identical to the prior
   async path -- the table keeps re-submitting and re-yielding.
 - One-shot telemetry: delta.autoCompaction.async.fallbackEngaged
   fires exactly once per table via putIfAbsent, so log volume is
   bounded regardless of write rate.
 - Per-table state: yielding table A does not put table B in inline
   mode.
 - Manual OPTIMIZE is unaffected (it does not consult the async path).

Tests (AutoCompactInlineFallbackSuite):
 - After a single dequeue yield, the next AC eval on the same table
   runs inline (no async.submitted event; AC commit lands on the
   writer's own commit version).
 - fallbackEngaged telemetry fires exactly once per table.
 - Per-table isolation.
 - With the conf disabled, the next AC still submits async and
   yields.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@mwc360 mwc360 force-pushed the mcole_async_auto_compact branch from acce1cf to 13e4be6 Compare June 25, 2026 00:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant