-
Notifications
You must be signed in to change notification settings - Fork 1.9k
fix(platfor-290): identity-access-token cleanup #6057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| import { Knex } from "knex"; | ||
|
|
||
| import { TableName } from "../schemas"; | ||
|
|
||
| const MIGRATION_TIMEOUT = 4 * 60 * 60 * 1000; // 4 hours | ||
|
|
||
| export async function up(knex: Knex): Promise<void> { | ||
| const result = await knex.raw("SHOW statement_timeout"); | ||
| const originalTimeout = result.rows[0].statement_timeout; | ||
|
|
||
| try { | ||
| await knex.raw(`SET statement_timeout = ${MIGRATION_TIMEOUT}`); | ||
|
|
||
| if ( | ||
| (await knex.schema.hasTable(TableName.IdentityAccessToken)) && | ||
| (await knex.schema.hasColumn(TableName.IdentityAccessToken, "accessTokenLastUsedAt")) | ||
| ) { | ||
| // No AT TIME ZONE 'UTC' cast here — COALESCE(timestamptz, timestamptz) | ||
| // returns timestamptz, which is already immutable for index purposes. | ||
| // The existing expiration index applies AT TIME ZONE to convert to | ||
| // timestamp (no-tz) before arithmetic; we skip that cast so the query | ||
| // predicate can compare timestamptz directly without a matching cast. | ||
| await knex.raw(` | ||
| CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_identity_access_tokens_idle | ||
| ON ${TableName.IdentityAccessToken} ( | ||
| (COALESCE("accessTokenLastUsedAt", "createdAt")) | ||
| ) | ||
| `); | ||
| } | ||
| } finally { | ||
| await knex.raw(`SET statement_timeout = '${originalTimeout}'`); | ||
| } | ||
| } | ||
|
|
||
| export async function down(knex: Knex): Promise<void> { | ||
| await knex.raw(` | ||
| DROP INDEX IF EXISTS idx_identity_access_tokens_idle | ||
| `); | ||
| } | ||
|
Check failure on line 39 in backend/src/db/migrations/20260416180000_add-identity-access-token-idle-index.ts
|
||
|
|
||
| const config = { transaction: false }; | ||
| export { config }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -143,5 +143,91 @@ | |
| ); | ||
| }; | ||
|
|
||
| return { ...identityAccessTokenOrm, findOne, removeExpiredTokens }; | ||
| // Deletes tokens that have been idle for longer than IDLE_THRESHOLD_DAYS. | ||
| // "Idle" is COALESCE(accessTokenLastUsedAt, createdAt) — i.e. tokens that | ||
| // have never been used fall back to their creation time. Known edge case: | ||
| // a token that was used but whose accessTokenQueue update job failed | ||
| // (removeOnFail: true) will keep accessTokenLastUsedAt = NULL and may be | ||
| // deleted early. Equally, a token that is only ever renewed (never used to | ||
| // auth) stays NULL here — accessTokenLastRenewedAt is not considered. Both | ||
| // cases are rare at a 30-day threshold and the worst outcome is a forced | ||
| // re-auth. | ||
| const removeIdleTokens = async (tx?: Knex) => { | ||
| logger.info(`${QueueName.WeeklyResourceCleanUp}: remove idle access tokens started`); | ||
|
|
||
| const BATCH_SIZE = 5000; | ||
| const MAX_RETRY_ON_FAILURE = 3; | ||
| const QUERY_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes | ||
| const IDLE_THRESHOLD_DAYS = 30; | ||
|
|
||
| const dbConnection = tx || db; | ||
| const nowResult = await dbConnection.raw<{ rows: Array<{ now: Date }> }>(`SELECT NOW() AT TIME ZONE 'UTC' as now`); | ||
| const { now } = nowResult.rows[0]; | ||
|
|
||
| let deletedTokenIds: { id: string }[] = []; | ||
| let numberOfRetryOnFailure = 0; | ||
| let isRetrying = false; | ||
| let totalDeletedCount = 0; | ||
|
|
||
| // No AT TIME ZONE 'UTC' cast — COALESCE(timestamptz, timestamptz) is | ||
| // immutable and the index expression matches this predicate as-is. The | ||
| // expiration index uses AT TIME ZONE to produce a timestamp (no-tz) before | ||
| // interval arithmetic; here we stay in timestamptz throughout so the cast | ||
| // is unnecessary and omitting it keeps the index expression consistent. | ||
| const getIdleTokensQuery = (dbClient: Knex | Knex.Transaction, nowTimestamp: Date) => | ||
| dbClient(TableName.IdentityAccessToken) | ||
| .whereRaw( | ||
| `COALESCE( | ||
| "${TableName.IdentityAccessToken}"."accessTokenLastUsedAt", | ||
| "${TableName.IdentityAccessToken}"."createdAt" | ||
| ) < ?::timestamptz - make_interval(days => ?)`, | ||
| [nowTimestamp, IDLE_THRESHOLD_DAYS] | ||
| ) | ||
| .select("id"); | ||
|
|
||
| do { | ||
| try { | ||
| const deleteBatch = async (dbClient: Knex | Knex.Transaction) => { | ||
| await dbClient.raw(`SET LOCAL random_page_cost = 1.1`); | ||
| const idsToDeleteQuery = getIdleTokensQuery(dbClient, now).limit(BATCH_SIZE); | ||
| return dbClient(TableName.IdentityAccessToken).whereIn("id", idsToDeleteQuery).del().returning("id"); | ||
| }; | ||
|
|
||
| if (tx) { | ||
| // eslint-disable-next-line no-await-in-loop | ||
| deletedTokenIds = await deleteBatch(tx); | ||
| } else { | ||
| // eslint-disable-next-line no-await-in-loop | ||
| deletedTokenIds = await db.transaction(async (trx) => { | ||
| await trx.raw(`SET LOCAL statement_timeout = ${QUERY_TIMEOUT_MS}`); | ||
| return deleteBatch(trx); | ||
| }); | ||
| } | ||
|
|
||
| numberOfRetryOnFailure = 0; | ||
| totalDeletedCount += deletedTokenIds.length; | ||
| } catch (error) { | ||
| numberOfRetryOnFailure += 1; | ||
| logger.error(error, "Failed to delete a batch of idle identity access tokens on pruning"); | ||
| } finally { | ||
| // eslint-disable-next-line no-await-in-loop | ||
| await new Promise((resolve) => { | ||
| setTimeout(resolve, 500); | ||
| }); | ||
| } | ||
| isRetrying = numberOfRetryOnFailure > 0; | ||
| } while (deletedTokenIds.length > 0 || (isRetrying && numberOfRetryOnFailure < MAX_RETRY_ON_FAILURE)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If one batch deletes rows and a later batch throws (e.g., statement timeout or lock contention), Useful? React with 👍 / 👎. |
||
|
|
||
| if (numberOfRetryOnFailure >= MAX_RETRY_ON_FAILURE) { | ||
| logger.error( | ||
| `IdentityAccessTokenIdlePrune: Pruning failed and stopped after ${MAX_RETRY_ON_FAILURE} consecutive retries.` | ||
| ); | ||
| } | ||
|
|
||
| logger.info( | ||
| `${QueueName.WeeklyResourceCleanUp}: remove idle access tokens completed. Deleted ${totalDeletedCount} tokens.` | ||
| ); | ||
| }; | ||
|
|
||
| return { ...identityAccessTokenOrm, findOne, removeExpiredTokens, removeIdleTokens }; | ||
|
Check failure on line 232 in backend/src/services/identity-access-token/identity-access-token-dal.ts
|
||
|
Comment on lines
+215
to
+232
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 The do-while loop in Extended reasoning...What the bug is and how it manifests
while (deletedTokenIds.length > 0 || (isRetrying && numberOfRetryOnFailure < MAX_RETRY_ON_FAILURE));
The specific code path that triggers it
Why existing code does not prevent it The MAX_RETRY_ON_FAILURE guard is only effective when What the impact would be In the failure scenario the weekly cleanup job enters an infinite busy loop — one DB round-trip every 500 ms — against an already broken or overloaded database, worsening the outage and keeping the BullMQ worker occupied indefinitely. The Redis distributed lock (3-hour TTL) will also be held, preventing any other instance from picking up the job. How to fix it Reset do {
deletedTokenIds = []; // add this line
try {
// ...
} catch (error) {
// ...
}
} while (...);This ensures that on any iteration where the DB throws, The same bug exists in |
||
| }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| import { TScimServiceFactory } from "@app/ee/services/scim/scim-types"; | ||
| import { TSnapshotDALFactory } from "@app/ee/services/secret-snapshot/snapshot-dal"; | ||
| import { TKeyValueStoreDALFactory } from "@app/keystore/key-value-store-dal"; | ||
| import { KeyStorePrefixes, TKeyStoreFactory } from "@app/keystore/keystore"; | ||
| import { getConfig } from "@app/lib/config/env"; | ||
| import { logger } from "@app/lib/logger"; | ||
| import { JOB_SCHEDULER_PREFIX, QueueJobs, QueueName, TQueueServiceFactory } from "@app/queue"; | ||
|
|
@@ -23,7 +24,8 @@ | |
| type TDailyResourceCleanUpQueueServiceFactoryDep = { | ||
| auditLogDAL: Pick<TAuditLogDALFactory, "pruneAuditLog">; | ||
| auditLogService: Pick<TAuditLogServiceFactory, "checkPostgresAuditLogVolumeMigrationAlert">; | ||
| identityAccessTokenDAL: Pick<TIdentityAccessTokenDALFactory, "removeExpiredTokens">; | ||
| identityAccessTokenDAL: Pick<TIdentityAccessTokenDALFactory, "removeExpiredTokens" | "removeIdleTokens">; | ||
| keyStore: Pick<TKeyStoreFactory, "acquireLock">; | ||
| identityUniversalAuthClientSecretDAL: Pick<TIdentityUaClientSecretDALFactory, "removeExpiredClientSecrets">; | ||
| secretVersionDAL: Pick<TSecretVersionDALFactory, "pruneExcessVersions">; | ||
| secretVersionV2DAL: Pick<TSecretVersionV2DALFactory, "pruneExcessVersions">; | ||
|
|
@@ -63,7 +65,8 @@ | |
| approvalRequestDAL, | ||
| approvalRequestGrantsDAL, | ||
| certificateRequestDAL, | ||
| scepTransactionDAL | ||
| scepTransactionDAL, | ||
| keyStore | ||
| }: TDailyResourceCleanUpQueueServiceFactoryDep) => { | ||
| const appCfg = getConfig(); | ||
|
|
||
|
|
@@ -113,15 +116,30 @@ | |
| { name: QueueJobs.DailyResourceCleanUp } | ||
| ); | ||
|
|
||
| // Hourly cleanup routine | ||
| const CLEANUP_LOCK_TTL_MS = 3 * 60 * 60 * 1000; // 3 hours | ||
|
|
||
| // Hourly cleanup routine. A distributed Redis lock prevents overlapping | ||
| // runs across instances — when a previous run exceeds the cron interval, | ||
| // the next tick skips instead of compounding DB load. | ||
| queueService.start(QueueName.FrequentResourceCleanUp, async () => { | ||
| let lock: Awaited<ReturnType<typeof keyStore.acquireLock>> | undefined; | ||
| try { | ||
| lock = await keyStore.acquireLock([KeyStorePrefixes.FrequentResourceCleanUpLock], CLEANUP_LOCK_TTL_MS, { | ||
| retryCount: 0 | ||
| }); | ||
| } catch { | ||
| logger.info(`${QueueName.FrequentResourceCleanUp}: another instance holds the lock, skipping this run`); | ||
| return; | ||
|
Comment on lines
+130
to
+132
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This Useful? React with 👍 / 👎. |
||
| } | ||
| try { | ||
| logger.info(`${QueueName.FrequentResourceCleanUp}: queue task started`); | ||
| await identityAccessTokenDAL.removeExpiredTokens(); | ||
| logger.info(`${QueueName.FrequentResourceCleanUp}: queue task completed`); | ||
| } catch (error) { | ||
| logger.error(error, `${QueueName.FrequentResourceCleanUp}: resource cleanup failed`); | ||
| throw error; | ||
| } finally { | ||
| await lock.release().catch((err) => logger.warn(err, `${QueueName.FrequentResourceCleanUp}: failed to release lock`)); | ||
| } | ||
| }); | ||
|
|
||
|
|
@@ -131,6 +149,38 @@ | |
| { pattern: appCfg.isDailyResourceCleanUpDevelopmentMode ? "*/5 * * * *" : "0 * * * *" }, | ||
| { name: QueueJobs.FrequentResourceCleanUp } | ||
| ); | ||
|
|
||
| // Weekly cleanup routine. Drains idle access tokens that the hourly job's | ||
| // TTL/revoked/uses-exhausted predicates cannot reach. Separate lock from | ||
| // the hourly so a long-running hourly run does not starve the weekly job. | ||
| queueService.start(QueueName.WeeklyResourceCleanUp, async () => { | ||
| let lock: Awaited<ReturnType<typeof keyStore.acquireLock>> | undefined; | ||
| try { | ||
| lock = await keyStore.acquireLock([KeyStorePrefixes.WeeklyResourceCleanUpLock], CLEANUP_LOCK_TTL_MS, { | ||
| retryCount: 0 | ||
| }); | ||
| } catch { | ||
| logger.info(`${QueueName.WeeklyResourceCleanUp}: another instance holds the lock, skipping this run`); | ||
| return; | ||
| } | ||
| try { | ||
| logger.info(`${QueueName.WeeklyResourceCleanUp}: queue task started`); | ||
| await identityAccessTokenDAL.removeIdleTokens(); | ||
| logger.info(`${QueueName.WeeklyResourceCleanUp}: queue task completed`); | ||
| } catch (error) { | ||
| logger.error(error, `${QueueName.WeeklyResourceCleanUp}: resource cleanup failed`); | ||
| throw error; | ||
| } finally { | ||
| await lock.release().catch((err) => logger.warn(err, `${QueueName.WeeklyResourceCleanUp}: failed to release lock`)); | ||
| } | ||
| }); | ||
|
|
||
| await queueService.upsertJobScheduler( | ||
| QueueName.WeeklyResourceCleanUp, | ||
| `${JOB_SCHEDULER_PREFIX}:${QueueJobs.WeeklyResourceCleanUp}`, | ||
| { pattern: appCfg.isDailyResourceCleanUpDevelopmentMode ? "*/5 * * * *" : "0 3 * * 0" }, | ||
| { name: QueueJobs.WeeklyResourceCleanUp } | ||
| ); | ||
| }; | ||
|
|
||
| return { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 The down() migration uses plain
DROP INDEX IF EXISTSrather thanDROP INDEX CONCURRENTLY IF EXISTS, which acquires an AccessExclusiveLock that blocks all reads and writes on theidentity_access_tokenstable for the full duration of the drop. On the 64M-row throttled-EBS table this PR explicitly calls out, rolling back this migration in production would cause a write outage on identity access token operations; change line 36 toDROP INDEX CONCURRENTLY IF EXISTS idx_identity_access_tokens_idleto avoid the lock.Extended reasoning...
What the bug is and how it manifests
PostgreSQL's plain
DROP INDEXacquires anAccessExclusiveLockon the parent table, which blocks every concurrent read and write for the entire duration of the drop. This is in direct contrast toDROP INDEX CONCURRENTLY, which only takes brief metadata locks and allows normal DML to proceed throughout. On a large table, this distinction is critical.The specific code path that triggers it
The bug is in the
down()function of20260416180000_add-identity-access-token-idle-index.ts(lines 35–39):This runs whenever the migration is rolled back (e.g.,
knex migrate:downor a failed deployment rollback).Why existing code doesn't prevent it
The
up()migration correctly usesCREATE INDEX CONCURRENTLY— but the same care was not applied to the rollback path. The migration already exportsconfig = { transaction: false }, which is the exact prerequisite PostgreSQL requires before it will acceptDROP INDEX CONCURRENTLY(concurrent index operations cannot run inside a transaction block). So the infrastructure for a safe drop is already in place; only the keyword is missing.What the impact would be
The PR description explicitly states this table has ~64M rows on a throttled EBS volume, which is the reason
CONCURRENTLYwas chosen for the forward migration. While dropping an index is faster than building one, on throttled I/O it can still take minutes. During that window, every request that touchesidentity_access_tokens— token validation, authentication, refresh — would be blocked, causing a production outage on all identity access token operations for the full duration.How to fix it
Replace line 36 with:
No other changes are needed;
config = { transaction: false }is already exported.Step-by-step proof
down()on the20260416180000migration.DROP INDEX IF EXISTS idx_identity_access_tokens_idle.AccessExclusiveLockonidentity_access_tokens.SELECT,INSERT,UPDATE, orDELETEonidentity_access_tokensblocks immediately — no reads, no writes proceed.DROP INDEX CONCURRENTLYbeen used, only a brief ShareUpdateExclusiveLock would be taken at the start and end, and all DML would continue uninterrupted throughout.