diff --git a/server/leaderboard_scheduler.go b/server/leaderboard_scheduler.go index 42ae21199..9019805b6 100644 --- a/server/leaderboard_scheduler.go +++ b/server/leaderboard_scheduler.go @@ -17,7 +17,7 @@ package server import ( "context" "database/sql" - "sync" + "errors" "time" "github.com/heroiclabs/nakama-common/api" @@ -42,7 +42,6 @@ type LeaderboardScheduler interface { } type LocalLeaderboardScheduler struct { - sync.Mutex logger *zap.Logger db *sql.DB config Config @@ -53,16 +52,11 @@ type LocalLeaderboardScheduler struct { fnTournamentReset RuntimeTournamentResetFunction fnTournamentEnd RuntimeTournamentEndFunction - endActiveTimer *time.Timer - expiryTimer *time.Timer - lastEnd int64 - lastExpiry int64 - scheduledEndActive int64 - scheduledExpiry int64 - started bool - queue chan *LeaderboardSchedulerCallback active *atomic.Uint32 + queue chan *LeaderboardSchedulerCallback + + updateCh chan struct{} ctx context.Context ctxCancelFn context.CancelFunc @@ -77,15 +71,9 @@ func NewLocalLeaderboardScheduler(logger *zap.Logger, db *sql.DB, config Config, cache: cache, rankCache: rankCache, - // endActiveTimer only initialized when needed. - // expiryTimer only initialized when needed. - // lastEnd only initialized when needed. - // lastExpiry only initialized when needed. - scheduledEndActive: -1, - scheduledExpiry: -1, - - queue: make(chan *LeaderboardSchedulerCallback, config.GetLeaderboard().CallbackQueueSize), - active: atomic.NewUint32(1), + queue: make(chan *LeaderboardSchedulerCallback, config.GetLeaderboard().CallbackQueueSize), + active: atomic.NewUint32(1), + updateCh: make(chan struct{}, 1), ctx: ctx, ctxCancelFn: ctxCancelFn, @@ -113,89 +101,152 @@ func (ls *LocalLeaderboardScheduler) Start(runtime *Runtime) { ls.logger.Info("Leaderboard scheduler start") ls.started = true - // Capture callback references, if any are registered. ls.fnLeaderboardReset = runtime.LeaderboardReset() ls.fnTournamentReset = runtime.TournamentReset() ls.fnTournamentEnd = runtime.TournamentEnd() - // Start the required number of callback workers. for i := 0; i < ls.config.GetLeaderboard().CallbackQueueWorkers; i++ { go ls.invokeCallback() } + go ls.scheduleLoop() + ls.Update() } func (ls *LocalLeaderboardScheduler) Pause() { ls.logger.Info("Leaderboard scheduler pause") - if !ls.active.CompareAndSwap(1, 0) { - // Already paused. return } - - ls.Lock() - if ls.endActiveTimer != nil { - ls.endActiveTimer.Stop() - ls.endActiveTimer = nil - } - if ls.expiryTimer != nil { - ls.expiryTimer.Stop() - ls.endActiveTimer = nil + // Wake the scheduling loop so it sees the paused state immediately. + select { + case ls.updateCh <- struct{}{}: + default: } - ls.scheduledEndActive = -1 - ls.scheduledExpiry = -1 - ls.Unlock() } func (ls *LocalLeaderboardScheduler) Resume() { ls.logger.Info("Leaderboard scheduler resume") - if !ls.active.CompareAndSwap(0, 1) { - // Already active. return } - ls.Update() } func (ls *LocalLeaderboardScheduler) Stop() { - ls.Lock() ls.ctxCancelFn() - if ls.endActiveTimer != nil { - ls.endActiveTimer.Stop() - ls.endActiveTimer = nil - } - if ls.expiryTimer != nil { - ls.expiryTimer.Stop() - ls.endActiveTimer = nil - } - ls.scheduledEndActive = -1 - ls.scheduledExpiry = -1 - ls.Unlock() } +// Update signals the scheduling loop to recompute its next wake time. func (ls *LocalLeaderboardScheduler) Update() { if !ls.started { - // In case the update is called during runtime VM init, skip setting timers until ready. return } - - if ls.active.Load() != 1 { - // Not active. - return + select { + case ls.updateCh <- struct{}{}: + default: } +} - now := time.Now().UTC() - nowUnix := now.Unix() +func (ls *LocalLeaderboardScheduler) scheduleLoop() { + var lastFireUnix int64 + for { + // While paused, block until a signal arrives. + if ls.active.Load() == 0 { + select { + case <-ls.ctx.Done(): + return + case <-ls.updateCh: + } + continue + } + + now := time.Now().UTC() + endActiveTs, expiryTs, endActiveIds, expiryIds := ls.computeNext(now) + + // Filter out any events already processed in a previous iteration to + // guard against re-queuing hooks within the same second as the last fire. + if endActiveTs > 0 && endActiveTs <= lastFireUnix { + endActiveTs = -1 + endActiveIds = nil + } + if expiryTs > 0 && expiryTs <= lastFireUnix { + expiryTs = -1 + expiryIds = nil + } + + // Pick the earlier of the two deadlines as the next wake time. + wakeTs := int64(-1) + if endActiveTs > 0 { + wakeTs = endActiveTs + } + if expiryTs > 0 && (wakeTs < 0 || expiryTs < wakeTs) { + wakeTs = expiryTs + } + + ls.logger.Info("Leaderboard scheduler update", + zap.Int64("end_active_ts", endActiveTs), + zap.Int("end_active_count", len(endActiveIds)), + zap.Int64("expiry_ts", expiryTs), + zap.Int("expiry_count", len(expiryIds)), + ) + + if wakeTs < 0 { + // Nothing to schedule, block until Update() signals or context is cancelled. + select { + case <-ls.ctx.Done(): + return + case <-ls.updateCh: + continue + } + } + + delay := time.Unix(wakeTs, 0).UTC().Sub(now) + if delay < 0 { + delay = 0 + } + + timer := time.NewTimer(delay) + select { + case <-ls.ctx.Done(): + timer.Stop() + return + + case <-ls.updateCh: + // Update() called while waiting for next hook execution, recompute schedules. + select { + case t := <-timer.C: + // Timer fired at the same time as an Update() call, process hooks. + lastFireUnix = t.Unix() + ls.processHooks(t, endActiveTs, expiryTs, endActiveIds, expiryIds) + default: + // Update() was called, stop timer and recalculate new wake time. + timer.Stop() + } + case t := <-timer.C: + lastFireUnix = t.Unix() + ls.processHooks(t, endActiveTs, expiryTs, endActiveIds, expiryIds) + } + } +} - earliestEndActive := int64(-1) - earliestExpiry := int64(-1) +func (ls *LocalLeaderboardScheduler) processHooks(ts time.Time, endActiveTs, expiryTs int64, endActiveIds, expiryIds []string) { + fireUnix := ts.Unix() + if endActiveTs > 0 && endActiveTs <= fireUnix { + ls.processEndActive(time.Unix(endActiveTs, 0).UTC(), endActiveIds) + } + if expiryTs > 0 && expiryTs <= fireUnix { + ls.rankCache.TrimExpired(expiryTs) + ls.processExpiry(time.Unix(expiryTs, 0).UTC(), expiryIds) + } +} - endActiveLeaderboardIds := make([]string, 0, 1) - expiryLeaderboardIds := make([]string, 0, 1) +func (ls *LocalLeaderboardScheduler) computeNext(now time.Time) (endActiveTs, expiryTs int64, endActiveIds []string, expiryIds []string) { + endActiveTs = -1 + expiryTs = -1 + nowUnix := now.Unix() - // Grab the set of known leaderboards in batches, and process them looking for expiry and end active times. var cursor *LeaderboardAllCursor for { var leaderboards []*Leaderboard @@ -203,7 +254,6 @@ func (ls *LocalLeaderboardScheduler) Update() { for _, l := range leaderboards { if l.IsTournament() { - // Tournament. _, endActive, expiry := calculateTournamentDeadlines(l.StartTime, l.EndTime, int64(l.Duration), l.ResetSchedule, now) if l.EndTime > 0 && l.EndTime < nowUnix { @@ -211,36 +261,32 @@ func (ls *LocalLeaderboardScheduler) Update() { continue } - // Check tournament end. if endActive > 0 && nowUnix < endActive { - if earliestEndActive == -1 || endActive < earliestEndActive { - earliestEndActive = endActive - endActiveLeaderboardIds = []string{l.Id} - } else if endActive == earliestEndActive { - endActiveLeaderboardIds = append(endActiveLeaderboardIds, l.Id) + if endActiveTs < 0 || endActive < endActiveTs { + endActiveTs = endActive + endActiveIds = []string{l.Id} + } else if endActive == endActiveTs { + endActiveIds = append(endActiveIds, l.Id) } } - // Check tournament expiry. if expiry > 0 { - if earliestExpiry == -1 || expiry < earliestExpiry { - earliestExpiry = expiry - expiryLeaderboardIds = []string{l.Id} - } else if expiry == earliestExpiry { - expiryLeaderboardIds = append(expiryLeaderboardIds, l.Id) + if expiryTs < 0 || expiry < expiryTs { + expiryTs = expiry + expiryIds = []string{l.Id} + } else if expiry == expiryTs { + expiryIds = append(expiryIds, l.Id) } } } else { - // Leaderboard. + // Leaderboards don't end, only check for expiry. if l.ResetSchedule != nil { - // Leaderboards don't end, only check for expiry. expiry := l.ResetSchedule.Next(now).UTC().Unix() - - if earliestExpiry == -1 || expiry < earliestExpiry { - earliestExpiry = expiry - expiryLeaderboardIds = []string{l.Id} - } else if expiry == earliestExpiry { - expiryLeaderboardIds = append(expiryLeaderboardIds, l.Id) + if expiryTs < 0 || expiry < expiryTs { + expiryTs = expiry + expiryIds = []string{l.Id} + } else if expiry == expiryTs { + expiryIds = append(expiryIds, l.Id) } } } @@ -250,133 +296,40 @@ func (ls *LocalLeaderboardScheduler) Update() { break } } - - endActiveDuration := time.Duration(-1) - if earliestEndActive > -1 { - endActiveDuration = time.Unix(earliestEndActive, 0).UTC().Sub(now) - } - - expiryDuration := time.Duration(-1) - if earliestExpiry > -1 { - expiryDuration = time.Unix(earliestExpiry, 0).UTC().Sub(now) - } - - // Replace IDs earmarked for end and expiry, and restart timers as needed. - // Only replace a timer when its target cycle changes — if the computed target is the - // same Unix second as the currently scheduled timer, leave it running. This prevents - // queueEndActiveElapse's Update() call (which fires while the expiry timer for the - // same second is still pending) from inadvertently cancelling that expiry timer. - ls.Lock() - if endActiveDuration > -1 { - if earliestEndActive != ls.scheduledEndActive { - if ls.endActiveTimer != nil { - ls.endActiveTimer.Stop() - ls.endActiveTimer = nil - } - ls.scheduledEndActive = earliestEndActive - ls.logger.Debug("Setting timer to run end active function", zap.Duration("end_active", endActiveDuration), zap.Strings("ids", endActiveLeaderboardIds)) - ls.endActiveTimer = time.AfterFunc(endActiveDuration, func() { - ls.queueEndActiveElapse(time.Unix(earliestEndActive, 0).UTC(), endActiveLeaderboardIds) - }) - } - } else { - if ls.endActiveTimer != nil { - ls.endActiveTimer.Stop() - ls.endActiveTimer = nil - } - ls.scheduledEndActive = -1 - } - if expiryDuration > -1 { - if earliestExpiry != ls.scheduledExpiry { - if ls.expiryTimer != nil { - ls.expiryTimer.Stop() - ls.expiryTimer = nil - } - ls.scheduledExpiry = earliestExpiry - ls.logger.Debug("Setting timer to run expiry function", zap.Duration("expiry", expiryDuration), zap.Strings("ids", expiryLeaderboardIds)) - ls.expiryTimer = time.AfterFunc(expiryDuration, func() { - ls.queueExpiryElapse(time.Unix(earliestExpiry, 0).UTC(), expiryLeaderboardIds) - }) - } - } else { - if ls.expiryTimer != nil { - ls.expiryTimer.Stop() - ls.expiryTimer = nil - } - ls.scheduledExpiry = -1 - } - ls.Unlock() - - ls.logger.Info("Leaderboard scheduler update", zap.Duration("end_active", endActiveDuration), zap.Int("end_active_count", len(endActiveLeaderboardIds)), zap.Duration("expiry", expiryDuration), zap.Int("expiry_count", len(expiryLeaderboardIds))) + return } -func (ls *LocalLeaderboardScheduler) queueEndActiveElapse(t time.Time, ids []string) { - if ls.active.Load() != 1 { - // Not active. - return - } - - ts := t.Unix() - tMinusOne := time.Unix(ts-1, 0).UTC() // Subtract 1s so that the calculated deadline is for the current cycle, not the next. - - // Immediately schedule the next invocation to avoid any gaps caused by time spent processing below. - ls.Update() - - // Skip processing if there is no tournament end callback registered. +func (ls *LocalLeaderboardScheduler) processEndActive(t time.Time, ids []string) { if ls.fnTournamentEnd == nil { return } - ls.Lock() - if ls.lastEnd != 0 && ls.lastEnd >= ts { - // Avoid running duplicate or delayed scheduling. - ls.Unlock() - return - } - ls.lastEnd = ts - ls.Unlock() + ts := t.Unix() + tMinusOne := time.Unix(ts-1, 0).UTC() ls.logger.Info("Leaderboard scheduler end active", zap.Int("count", len(ids))) go func() { - // Process the current set of tournament ends. for _, id := range ids { - currentId := id - // Will block if the queue is full. - ls.queue <- &LeaderboardSchedulerCallback{id: currentId, ts: ts, t: tMinusOne} + select { + case ls.queue <- &LeaderboardSchedulerCallback{id: id, ts: ts, t: tMinusOne}: + case <-ls.ctx.Done(): + return + } } }() } -func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string) { - if ls.active.Load() != 1 { - // Not active. - return - } - +func (ls *LocalLeaderboardScheduler) processExpiry(t time.Time, ids []string) { ts := t.Unix() tMinusOne := time.Unix(ts-1, 0).UTC() - // Immediately schedule the next invocation to avoid any gaps caused by time spent processing below. - ls.rankCache.TrimExpired(ts) - ls.Update() - - ls.Lock() - if ls.lastExpiry != 0 && ls.lastExpiry >= ts { - // Avoid running duplicate or delayed scheduling. - ls.Unlock() - return - } - ls.lastExpiry = ts - ls.Unlock() - ls.logger.Info("Leaderboard scheduler expiry reset", zap.Int("count", len(ids))) go func() { // Queue the current set of leaderboard and tournament resets. // Executes inside a goroutine to ensure further invocation timings are not skewed. for _, id := range ids { - currentId := id leaderboard := ls.cache.Get(id) if leaderboard == nil { // Cached entry was deleted before it reached the scheduler here. @@ -387,8 +340,11 @@ func (ls *LocalLeaderboardScheduler) queueExpiryElapse(t time.Time, ids []string // Tournaments have some processing to do even if no callback is registered. continue } - // Will block if queue is full. - ls.queue <- &LeaderboardSchedulerCallback{id: currentId, leaderboard: leaderboard, ts: ts, t: tMinusOne} + select { + case ls.queue <- &LeaderboardSchedulerCallback{id: id, leaderboard: leaderboard, ts: ts, t: tMinusOne}: + case <-ls.ctx.Done(): + return + } } }() } @@ -401,7 +357,7 @@ func (ls *LocalLeaderboardScheduler) invokeCallback() { case callback := <-ls.queue: if callback.leaderboard != nil { if callback.leaderboard.IsTournament() { - // Tournament, fetch most up to date info for size etc. + // Tournament, fetch most up-to-date info for size etc. // Some processing is needed even if there is no runtime callback registered for tournament reset. query := `SELECT id, sort_order, operator, reset_schedule, metadata, create_time, @@ -451,7 +407,7 @@ WHERE id = $1` row := ls.db.QueryRowContext(ls.ctx, query, callback.id) tournament, err := parseTournament(row, callback.t) if err != nil { - if err != sql.ErrNoRows { + if !errors.Is(err, sql.ErrNoRows) { // Do not log if tournament was deleted before it reached the scheduler here. ls.logger.Error("Error retrieving tournament to invoke end callback", zap.Error(err), zap.String("id", callback.id)) } diff --git a/server/leaderboard_scheduler_test.go b/server/leaderboard_scheduler_test.go index 8bb26a76b..8919cb461 100644 --- a/server/leaderboard_scheduler_test.go +++ b/server/leaderboard_scheduler_test.go @@ -79,12 +79,6 @@ func TestLeaderboardScheduler(t *testing.T) { } } - // Truncate endActiveDuration to whole seconds, removing sub-second precision. Both timers - // target the same Unix second T, so this makes endActiveTimer fire up to ~999ms earlier than - // expiryTimer — always within the same second — giving queueEndActiveElapse's Update() a - // window to Stop() the expiry timer before it fires, reproducing the race. - // leaderboardScheduler.(*LocalLeaderboardScheduler).testTruncateEndActiveDuration = true - leaderboardScheduler.Start(rt) now := time.Now()