From d7710ffc3f31fda41127d0e43739e4241937e7b4 Mon Sep 17 00:00:00 2001 From: Tre Dubrava Date: Thu, 16 Apr 2026 16:28:26 -0500 Subject: [PATCH] fix(platfor-290): identity-access-token cleanup --- ...00_add-identity-access-token-idle-index.ts | 42 +++++++++ backend/src/keystore/keystore.ts | 5 +- backend/src/queue/queue-service.ts | 6 ++ backend/src/server/routes/index.ts | 3 +- .../identity-access-token-dal.ts | 88 ++++++++++++++++++- .../resource-cleanup-queue.ts | 56 +++++++++++- 6 files changed, 194 insertions(+), 6 deletions(-) create mode 100644 backend/src/db/migrations/20260416180000_add-identity-access-token-idle-index.ts diff --git a/backend/src/db/migrations/20260416180000_add-identity-access-token-idle-index.ts b/backend/src/db/migrations/20260416180000_add-identity-access-token-idle-index.ts new file mode 100644 index 00000000000..a393ddd5ee0 --- /dev/null +++ b/backend/src/db/migrations/20260416180000_add-identity-access-token-idle-index.ts @@ -0,0 +1,42 @@ +import { Knex } from "knex"; + +import { TableName } from "../schemas"; + +const MIGRATION_TIMEOUT = 4 * 60 * 60 * 1000; // 4 hours + +export async function up(knex: Knex): Promise { + const result = await knex.raw("SHOW statement_timeout"); + const originalTimeout = result.rows[0].statement_timeout; + + try { + await knex.raw(`SET statement_timeout = ${MIGRATION_TIMEOUT}`); + + if ( + (await knex.schema.hasTable(TableName.IdentityAccessToken)) && + (await knex.schema.hasColumn(TableName.IdentityAccessToken, "accessTokenLastUsedAt")) + ) { + // No AT TIME ZONE 'UTC' cast here — COALESCE(timestamptz, timestamptz) + // returns timestamptz, which is already immutable for index purposes. + // The existing expiration index applies AT TIME ZONE to convert to + // timestamp (no-tz) before arithmetic; we skip that cast so the query + // predicate can compare timestamptz directly without a matching cast. + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_identity_access_tokens_idle + ON ${TableName.IdentityAccessToken} ( + (COALESCE("accessTokenLastUsedAt", "createdAt")) + ) + `); + } + } finally { + await knex.raw(`SET statement_timeout = '${originalTimeout}'`); + } +} + +export async function down(knex: Knex): Promise { + await knex.raw(` + DROP INDEX IF EXISTS idx_identity_access_tokens_idle + `); +} + +const config = { transaction: false }; +export { config }; diff --git a/backend/src/keystore/keystore.ts b/backend/src/keystore/keystore.ts index a640b1dddea..e2f889cb28a 100644 --- a/backend/src/keystore/keystore.ts +++ b/backend/src/keystore/keystore.ts @@ -110,7 +110,10 @@ export const KeyStorePrefixes = { CertDashboardStats: (projectId: string) => `cert-dashboard-stats:${projectId}` as const, CertActivityTrend: (projectId: string, range: string) => `cert-activity-trend:${projectId}:${range}` as const, RefreshTokenGrace: (sessionId: string) => `refresh-token-grace:${sessionId}` as const, - InsightsCache: (projectId: string, endpoint: string) => `insights-cache:${projectId}:${endpoint}` as const + InsightsCache: (projectId: string, endpoint: string) => `insights-cache:${projectId}:${endpoint}` as const, + + FrequentResourceCleanUpLock: "frequent-resource-cleanup-lock" as const, + WeeklyResourceCleanUpLock: "weekly-resource-cleanup-lock" as const }; export const KeyStoreTtls = { diff --git a/backend/src/queue/queue-service.ts b/backend/src/queue/queue-service.ts index 50eb951dcc5..f6c4f632197 100644 --- a/backend/src/queue/queue-service.ts +++ b/backend/src/queue/queue-service.ts @@ -61,6 +61,7 @@ export enum QueueName { AuditLogPrune = "audit-log-prune", DailyResourceCleanUp = "daily-resource-cleanup", FrequentResourceCleanUp = "frequent-resource-cleanup", + WeeklyResourceCleanUp = "weekly-resource-cleanup", DailyExpiringPkiItemAlert = "daily-expiring-pki-item-alert", DailyPkiAlertV2Processing = "daily-pki-alert-v2-processing", PkiAlertV2Event = "pki-alert-v2-event", @@ -116,6 +117,7 @@ export enum QueueJobs { AuditLogPrune = "audit-log-prune-job", DailyResourceCleanUp = "daily-resource-cleanup-job", FrequentResourceCleanUp = "frequent-resource-cleanup-job", + WeeklyResourceCleanUp = "weekly-resource-cleanup-job", DailyExpiringPkiItemAlert = "daily-expiring-pki-item-alert", DailyPkiAlertV2Processing = "daily-pki-alert-v2-processing", PkiAlertV2ProcessEvent = "pki-alert-v2-process-event", @@ -499,6 +501,10 @@ export type TQueueJobTypes = { name: QueueJobs.FrequentResourceCleanUp; payload: undefined; }; + [QueueName.WeeklyResourceCleanUp]: { + name: QueueJobs.WeeklyResourceCleanUp; + payload: undefined; + }; [QueueName.PkiDiscoveryScan]: | { name: QueueJobs.PkiDiscoveryRunScan; diff --git a/backend/src/server/routes/index.ts b/backend/src/server/routes/index.ts index 7b7eb0dc3a4..75aa1670463 100644 --- a/backend/src/server/routes/index.ts +++ b/backend/src/server/routes/index.ts @@ -2204,7 +2204,8 @@ export const registerRoutes = async ( approvalRequestDAL, approvalRequestGrantsDAL, certificateRequestDAL, - scepTransactionDAL + scepTransactionDAL, + keyStore }); const healthAlert = healthAlertServiceFactory({ diff --git a/backend/src/services/identity-access-token/identity-access-token-dal.ts b/backend/src/services/identity-access-token/identity-access-token-dal.ts index 39ba33f3d53..7fa1afa92ab 100644 --- a/backend/src/services/identity-access-token/identity-access-token-dal.ts +++ b/backend/src/services/identity-access-token/identity-access-token-dal.ts @@ -143,5 +143,91 @@ export const identityAccessTokenDALFactory = (db: TDbClient) => { ); }; - return { ...identityAccessTokenOrm, findOne, removeExpiredTokens }; + // Deletes tokens that have been idle for longer than IDLE_THRESHOLD_DAYS. + // "Idle" is COALESCE(accessTokenLastUsedAt, createdAt) — i.e. tokens that + // have never been used fall back to their creation time. Known edge case: + // a token that was used but whose accessTokenQueue update job failed + // (removeOnFail: true) will keep accessTokenLastUsedAt = NULL and may be + // deleted early. Equally, a token that is only ever renewed (never used to + // auth) stays NULL here — accessTokenLastRenewedAt is not considered. Both + // cases are rare at a 30-day threshold and the worst outcome is a forced + // re-auth. + const removeIdleTokens = async (tx?: Knex) => { + logger.info(`${QueueName.WeeklyResourceCleanUp}: remove idle access tokens started`); + + const BATCH_SIZE = 5000; + const MAX_RETRY_ON_FAILURE = 3; + const QUERY_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes + const IDLE_THRESHOLD_DAYS = 30; + + const dbConnection = tx || db; + const nowResult = await dbConnection.raw<{ rows: Array<{ now: Date }> }>(`SELECT NOW() AT TIME ZONE 'UTC' as now`); + const { now } = nowResult.rows[0]; + + let deletedTokenIds: { id: string }[] = []; + let numberOfRetryOnFailure = 0; + let isRetrying = false; + let totalDeletedCount = 0; + + // No AT TIME ZONE 'UTC' cast — COALESCE(timestamptz, timestamptz) is + // immutable and the index expression matches this predicate as-is. The + // expiration index uses AT TIME ZONE to produce a timestamp (no-tz) before + // interval arithmetic; here we stay in timestamptz throughout so the cast + // is unnecessary and omitting it keeps the index expression consistent. + const getIdleTokensQuery = (dbClient: Knex | Knex.Transaction, nowTimestamp: Date) => + dbClient(TableName.IdentityAccessToken) + .whereRaw( + `COALESCE( + "${TableName.IdentityAccessToken}"."accessTokenLastUsedAt", + "${TableName.IdentityAccessToken}"."createdAt" + ) < ?::timestamptz - make_interval(days => ?)`, + [nowTimestamp, IDLE_THRESHOLD_DAYS] + ) + .select("id"); + + do { + try { + const deleteBatch = async (dbClient: Knex | Knex.Transaction) => { + await dbClient.raw(`SET LOCAL random_page_cost = 1.1`); + const idsToDeleteQuery = getIdleTokensQuery(dbClient, now).limit(BATCH_SIZE); + return dbClient(TableName.IdentityAccessToken).whereIn("id", idsToDeleteQuery).del().returning("id"); + }; + + if (tx) { + // eslint-disable-next-line no-await-in-loop + deletedTokenIds = await deleteBatch(tx); + } else { + // eslint-disable-next-line no-await-in-loop + deletedTokenIds = await db.transaction(async (trx) => { + await trx.raw(`SET LOCAL statement_timeout = ${QUERY_TIMEOUT_MS}`); + return deleteBatch(trx); + }); + } + + numberOfRetryOnFailure = 0; + totalDeletedCount += deletedTokenIds.length; + } catch (error) { + numberOfRetryOnFailure += 1; + logger.error(error, "Failed to delete a batch of idle identity access tokens on pruning"); + } finally { + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + } + isRetrying = numberOfRetryOnFailure > 0; + } while (deletedTokenIds.length > 0 || (isRetrying && numberOfRetryOnFailure < MAX_RETRY_ON_FAILURE)); + + if (numberOfRetryOnFailure >= MAX_RETRY_ON_FAILURE) { + logger.error( + `IdentityAccessTokenIdlePrune: Pruning failed and stopped after ${MAX_RETRY_ON_FAILURE} consecutive retries.` + ); + } + + logger.info( + `${QueueName.WeeklyResourceCleanUp}: remove idle access tokens completed. Deleted ${totalDeletedCount} tokens.` + ); + }; + + return { ...identityAccessTokenOrm, findOne, removeExpiredTokens, removeIdleTokens }; }; diff --git a/backend/src/services/resource-cleanup/resource-cleanup-queue.ts b/backend/src/services/resource-cleanup/resource-cleanup-queue.ts index c545a18fa11..23cd20194d3 100644 --- a/backend/src/services/resource-cleanup/resource-cleanup-queue.ts +++ b/backend/src/services/resource-cleanup/resource-cleanup-queue.ts @@ -4,6 +4,7 @@ import { TScepTransactionDALFactory } from "@app/ee/services/pki-scep/pki-scep-t import { TScimServiceFactory } from "@app/ee/services/scim/scim-types"; import { TSnapshotDALFactory } from "@app/ee/services/secret-snapshot/snapshot-dal"; import { TKeyValueStoreDALFactory } from "@app/keystore/key-value-store-dal"; +import { KeyStorePrefixes, TKeyStoreFactory } from "@app/keystore/keystore"; import { getConfig } from "@app/lib/config/env"; import { logger } from "@app/lib/logger"; import { JOB_SCHEDULER_PREFIX, QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue"; @@ -23,7 +24,8 @@ import { TServiceTokenServiceFactory } from "../service-token/service-token-serv type TDailyResourceCleanUpQueueServiceFactoryDep = { auditLogDAL: Pick; auditLogService: Pick; - identityAccessTokenDAL: Pick; + identityAccessTokenDAL: Pick; + keyStore: Pick; identityUniversalAuthClientSecretDAL: Pick; secretVersionDAL: Pick; secretVersionV2DAL: Pick; @@ -63,7 +65,8 @@ export const dailyResourceCleanUpQueueServiceFactory = ({ approvalRequestDAL, approvalRequestGrantsDAL, certificateRequestDAL, - scepTransactionDAL + scepTransactionDAL, + keyStore }: TDailyResourceCleanUpQueueServiceFactoryDep) => { const appCfg = getConfig(); @@ -113,8 +116,21 @@ export const dailyResourceCleanUpQueueServiceFactory = ({ { name: QueueJobs.DailyResourceCleanUp } ); - // Hourly cleanup routine + const CLEANUP_LOCK_TTL_MS = 3 * 60 * 60 * 1000; // 3 hours + + // Hourly cleanup routine. A distributed Redis lock prevents overlapping + // runs across instances — when a previous run exceeds the cron interval, + // the next tick skips instead of compounding DB load. queueService.start(QueueName.FrequentResourceCleanUp, async () => { + let lock: Awaited> | undefined; + try { + lock = await keyStore.acquireLock([KeyStorePrefixes.FrequentResourceCleanUpLock], CLEANUP_LOCK_TTL_MS, { + retryCount: 0 + }); + } catch { + logger.info(`${QueueName.FrequentResourceCleanUp}: another instance holds the lock, skipping this run`); + return; + } try { logger.info(`${QueueName.FrequentResourceCleanUp}: queue task started`); await identityAccessTokenDAL.removeExpiredTokens(); @@ -122,6 +138,8 @@ export const dailyResourceCleanUpQueueServiceFactory = ({ } catch (error) { logger.error(error, `${QueueName.FrequentResourceCleanUp}: resource cleanup failed`); throw error; + } finally { + await lock.release().catch((err) => logger.warn(err, `${QueueName.FrequentResourceCleanUp}: failed to release lock`)); } }); @@ -131,6 +149,38 @@ export const dailyResourceCleanUpQueueServiceFactory = ({ { pattern: appCfg.isDailyResourceCleanUpDevelopmentMode ? "*/5 * * * *" : "0 * * * *" }, { name: QueueJobs.FrequentResourceCleanUp } ); + + // Weekly cleanup routine. Drains idle access tokens that the hourly job's + // TTL/revoked/uses-exhausted predicates cannot reach. Separate lock from + // the hourly so a long-running hourly run does not starve the weekly job. + queueService.start(QueueName.WeeklyResourceCleanUp, async () => { + let lock: Awaited> | undefined; + try { + lock = await keyStore.acquireLock([KeyStorePrefixes.WeeklyResourceCleanUpLock], CLEANUP_LOCK_TTL_MS, { + retryCount: 0 + }); + } catch { + logger.info(`${QueueName.WeeklyResourceCleanUp}: another instance holds the lock, skipping this run`); + return; + } + try { + logger.info(`${QueueName.WeeklyResourceCleanUp}: queue task started`); + await identityAccessTokenDAL.removeIdleTokens(); + logger.info(`${QueueName.WeeklyResourceCleanUp}: queue task completed`); + } catch (error) { + logger.error(error, `${QueueName.WeeklyResourceCleanUp}: resource cleanup failed`); + throw error; + } finally { + await lock.release().catch((err) => logger.warn(err, `${QueueName.WeeklyResourceCleanUp}: failed to release lock`)); + } + }); + + await queueService.upsertJobScheduler( + QueueName.WeeklyResourceCleanUp, + `${JOB_SCHEDULER_PREFIX}:${QueueJobs.WeeklyResourceCleanUp}`, + { pattern: appCfg.isDailyResourceCleanUpDevelopmentMode ? "*/5 * * * *" : "0 3 * * 0" }, + { name: QueueJobs.WeeklyResourceCleanUp } + ); }; return {