Skip to content
190 changes: 151 additions & 39 deletions pkg/vm/engine/tae/logtail/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ type Manager struct {
eventOnce sync.Once
nextCompactTS types.TS

orderedList []*txnWithLogtails
collectWg sync.WaitGroup
collectPool *ants.Pool
}

Expand All @@ -111,8 +109,17 @@ func NewManager(
}

const batSize = 100
mgr.orderedList = make([]*txnWithLogtails, batSize*2)
mgr.collectPool, _ = ants.NewPool(runtime.NumCPU())
// Re-panic from ants's internal recover so a panic inside a
// collect goroutine crashes the process instead of being silently
// swallowed. If we only logged and continued, a committed txn
// whose collect panicked would apply to storage but its logtail
// would never be published, breaking CN-side consistency. Fatal
// crash matches what logservicedriver does for unrecoverable WAL
// errors and is the safer failure mode here.
mgr.collectPool, _ = ants.NewPool(
runtime.NumCPU(),
ants.WithPanicHandler(func(v any) { panic(v) }),
)
mgr.logtailQueue = sm.NewSafeQueue(batSize*batSize, batSize, mgr.onTxnLogTails)

return mgr
Expand All @@ -124,54 +131,159 @@ type txnWithLogtails struct {
closeCB func()
}

func (mgr *Manager) onTxnLogTails(items ...any) {
for i, item := range items {
txn := item.(txnif.AsyncTxn)
if txn.IsReplay() {
// orderedCollectAndPublish collects logtails for n items in parallel via submit,
// then publishes them strictly in index order (0, 1, 2, ...).
//
// - skip(i) returning true means item i is excluded (no collect, no publish).
// - collect(i) is invoked concurrently in a goroutine scheduled by submit.
// Returning nil means the item was collected but should not be published
// (e.g. the txn rolled back).
// - publish(v) is invoked serially by the caller's goroutine, for each
// collect result that is not nil, in ascending index order.
//
// The helper preserves PrepareTS ordering required by generateLogtailWithTxn
// (mgr.previousSaveTS invariant) while allowing later slots' collection to
// proceed in parallel with earlier slots' publish.
func orderedCollectAndPublish(
n int,
skip func(i int) bool,
submit func(fn func()),
collect func(i int) *txnWithLogtails,
publish func(v *txnWithLogtails),
) {
readyCh := make([]chan *txnWithLogtails, n)
for i := 0; i < n; i++ {
if skip(i) {
readyCh[i] = nil
continue
}
ch := make(chan *txnWithLogtails, 1)
readyCh[i] = ch
idx := i
submit(func() {
// The deferred send guarantees the publisher is never stuck
// on <-ch, even if collect() panics: Go runs deferred funcs
// during panic unwinding, so `ch <- v` executes with v still
// at its zero value (nil), and the publisher's nil-skip path
// drops that slot cleanly. The panic then continues to
// propagate out of this goroutine into ants's top-level
// recover, which logs the stack. Cleanup of per-txn state
// (closeCB, DoneEvent) must be handled by collect itself via
// its own defers — see onTxnLogTails below.
var v *txnWithLogtails
defer func() { ch <- v }()
v = collect(idx)
})
}

mgr.collectWg.Add(1)

mgr.collectPool.Submit(func() {
defer func() {
mgr.collectWg.Done()
}()

txn.GetStore().WaitEvent(txnif.WalPreparing)

builder := NewTxnLogtailRespBuilder(mgr.rt)
entries, closeCB := builder.CollectLogtail(txn)
for _, ch := range readyCh {
if ch == nil {
continue
}
if v := <-ch; v != nil {
publish(v)
}
}
}

// txnLogtailCollector is how onTxnLogTails invokes the real logtail
// builder. Exposed as a field only so tests can inject a stub without
// standing up a full TAE runtime.
type txnLogtailCollector func(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func())

// collectOneTxn runs the per-slot logic used by onTxnLogTails:
// - WaitEvent(WalPreparing) — make sure WAL marshal is done first
// - collect(txn) — materialize the logtail batches
// - DoneEvent(TailCollecting) — balance OnEndPrepareWAL's matching
// AddEvent so the owning txn's WaitWalAndTail can proceed. MUST
// fire before GetTxnState(true): apply runs WaitWalAndTail then
// DoneApply which flips the state, so if we wait on the state
// first the commit goroutine is blocked behind our event and we
// deadlock.
// - GetTxnState — only committed txns get published; rollback
// returns nil and the deferred closer releases batches
// - closeCB (deferred unless we hand it to publish) — release
// batches whenever we're not publishing
//
// A doneTail flag plus a deferred fallback guarantees DoneEvent still
// fires if collect(txn) panics before we reach the inline call. closeCB
// is only defined after collect succeeds, so its deferred release is
// registered afterwards. The panic then propagates out of this function
// into the collect goroutine, where the pool's PanicHandler re-panics
// to terminate the process so a committed-but-unpublished tail can
// never leak to subscribers.
func collectOneTxn(
txn txnif.AsyncTxn,
collect txnLogtailCollector,
) *txnWithLogtails {
txn.GetStore().WaitEvent(txnif.WalPreparing)

doneTail := false
defer func() {
if !doneTail {
txn.GetStore().DoneEvent(txnif.TailCollecting)
}
}()

txnTail := &txnWithLogtails{
txn: txn,
tails: entries,
closeCB: closeCB,
}
entries, closeCB := collect(txn)

mgr.orderedList[i] = txnTail
// Unblock apply's WaitWalAndTail before waiting on the txn state.
// Waiting first would deadlock: apply holds the commit state flip
// behind WaitWalAndTail, which waits on TailCollecting.
txn.GetStore().DoneEvent(txnif.TailCollecting)
doneTail = true

state := txn.GetTxnState(true)
if state != txnif.TxnStateCommitted {
if state != txnif.TxnStateRollbacked {
panic(fmt.Sprintf("wrong state %v", state))
}
return
}
})
runCloseCB := true
defer func() {
if runCloseCB {
closeCB()
}
}()

// A rolled-back txn must not be published as logtail:
// CollectLogtail walks the txn store without filtering by final
// state, so the batches captured above reflect pre-cleanup
// mutations that subscribers must never see. Release via the
// deferred closer and skip publish by returning nil.
state := txn.GetTxnState(true)
if state != txnif.TxnStateCommitted {
if state != txnif.TxnStateRollbacked {
panic(fmt.Sprintf("wrong state %v", state))
}
return nil
}

mgr.collectWg.Wait()
for i := range len(items) {
if mgr.orderedList[i] != nil {
mgr.generateLogtailWithTxn(mgr.orderedList[i])
mgr.orderedList[i] = nil
}
// Committed: hand closeCB over to the publish path.
runCloseCB = false
return &txnWithLogtails{
txn: txn,
tails: entries,
closeCB: closeCB,
}
}

func (mgr *Manager) onTxnLogTails(items ...any) {
// Collect logtails for all txns in parallel via collectPool.
// A slow txn only blocks the publisher up to its slot, not the
// collection of later slots nor the publishing of earlier already-ready
// slots. generateLogtailWithTxn is still called in PrepareTS order.
collect := func(txn txnif.AsyncTxn) (*[]logtail.TableLogtail, func()) {
builder := NewTxnLogtailRespBuilder(mgr.rt)
return builder.CollectLogtail(txn)
}
orderedCollectAndPublish(
len(items),
func(i int) bool {
return items[i].(txnif.AsyncTxn).IsReplay()
},
func(fn func()) { _ = mgr.collectPool.Submit(fn) },
func(i int) *txnWithLogtails {
return collectOneTxn(items[i].(txnif.AsyncTxn), collect)
},
mgr.generateLogtailWithTxn,
)
}

func (mgr *Manager) Stop() {
mgr.logtailQueue.Stop()
mgr.collectPool.Release()
Expand Down
Loading
Loading