diff --git a/packages/@n8n/db/src/repositories/__tests__/clock.repository.test.ts b/packages/@n8n/db/src/repositories/__tests__/clock.repository.test.ts deleted file mode 100644 index 00c45435a505a..0000000000000 --- a/packages/@n8n/db/src/repositories/__tests__/clock.repository.test.ts +++ /dev/null @@ -1,54 +0,0 @@ -import type { DatabaseConfig } from '@n8n/config'; -import type { DataSource } from '@n8n/typeorm'; -import { mock } from 'jest-mock-extended'; - -import { ClockRepository } from '../clock.repository'; - -describe('ClockRepository', () => { - const databaseConfig = mock({ type: 'sqlite' }); - const dataSource = mock(); - - let clockRepository: ClockRepository; - - beforeEach(() => { - clockRepository = new ClockRepository(dataSource, databaseConfig); - }); - - afterEach(() => { - jest.restoreAllMocks(); - }); - - describe('getDbTime()', () => { - it('parses SQLite ISO timestamp string into a UTC Date', async () => { - databaseConfig.type = 'sqlite'; - dataSource.query.mockResolvedValueOnce([{ now: '2024-01-15T12:30:45.123Z' }]); - - const result = await clockRepository.getDbTime(); - - expect(result).toBeInstanceOf(Date); - expect(result.getUTCFullYear()).toBe(2024); - expect(result.getUTCMonth()).toBe(0); // January - expect(result.getUTCDate()).toBe(15); - expect(result.getUTCHours()).toBe(12); - expect(result.getUTCMinutes()).toBe(30); - }); - - it('returns the PostgreSQL Date directly', async () => { - databaseConfig.type = 'postgresdb'; - const pgNow = new Date('2024-06-01T10:20:30.456Z'); - dataSource.query.mockResolvedValueOnce([{ now: pgNow }]); - - const result = await clockRepository.getDbTime(); - - expect(result).toBeInstanceOf(Date); - expect(result).toBe(pgNow); - }); - - it('throws UnexpectedError when SQLite returns an unparseable string', async () => { - databaseConfig.type = 'sqlite'; - dataSource.query.mockResolvedValueOnce([{ now: 'not-a-date' }]); - - await expect(clockRepository.getDbTime()).rejects.toThrow('Invalid DB server time'); - }); - }); -}); diff --git a/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts b/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts index 2d6c8d22d2287..cbd655cd8fc97 100644 --- a/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts +++ b/packages/@n8n/db/src/repositories/__tests__/execution.repository.test.ts @@ -1,8 +1,6 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { GlobalConfig } from '@n8n/config'; import { Container } from '@n8n/di'; -import { In, LessThan, And, Not, type SelectQueryBuilder } from '@n8n/typeorm'; -import { mock } from 'jest-mock-extended'; +import { In, LessThan, And, Not } from '@n8n/typeorm'; import type { IExecutionResponse } from 'entities/types-db'; @@ -409,61 +407,6 @@ describe('ExecutionRepository', () => { }); }); - describe('getWaitingExecutions()', () => { - const globalConfig = Container.get(GlobalConfig); - const originalDbType = globalConfig.database.type; - - let queryBuilder: jest.Mocked>; - - beforeEach(() => { - queryBuilder = mock>(); - queryBuilder.select.mockReturnThis(); - queryBuilder.where.mockReturnThis(); - queryBuilder.andWhere.mockReturnThis(); - queryBuilder.orderBy.mockReturnThis(); - queryBuilder.getMany.mockResolvedValue([]); - jest.spyOn(executionRepository, 'createQueryBuilder').mockReturnValue(queryBuilder); - }); - - afterEach(() => { - globalConfig.database.type = originalDbType; - }); - - it('should filter by status = waiting', async () => { - await executionRepository.getWaitingExecutions(); - - expect(queryBuilder.andWhere).toHaveBeenCalledWith('e.status = :status', { - status: 'waiting', - }); - }); - - it('should use a DB-clock lookahead condition for PostgreSQL', async () => { - globalConfig.database.type = 'postgresdb'; - - await executionRepository.getWaitingExecutions(); - - expect(queryBuilder.where).toHaveBeenCalledWith( - expect.stringContaining("NOW() + INTERVAL '15 seconds'"), - ); - }); - - it('should use a DB-clock lookahead condition for SQLite', async () => { - globalConfig.database.type = 'sqlite'; - - await executionRepository.getWaitingExecutions(); - - expect(queryBuilder.where).toHaveBeenCalledWith( - expect.stringContaining("datetime('now', '+15 seconds')"), - ); - }); - - it('should order results by waitTill ASC', async () => { - await executionRepository.getWaitingExecutions(); - - expect(queryBuilder.orderBy).toHaveBeenCalledWith('e.waitTill', 'ASC'); - }); - }); - describe('setRunning', () => { beforeEach(() => { entityManager.transaction.mockImplementation(async (fn: unknown) => { diff --git a/packages/@n8n/db/src/repositories/clock.repository.ts b/packages/@n8n/db/src/repositories/clock.repository.ts deleted file mode 100644 index dba7539c606af..0000000000000 --- a/packages/@n8n/db/src/repositories/clock.repository.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { DatabaseConfig } from '@n8n/config'; -import { Service } from '@n8n/di'; -import { DataSource } from '@n8n/typeorm'; -import { UnexpectedError } from 'n8n-workflow'; - -/** Provides access to the database server's clock for time-sensitive scheduling. */ -@Service() -export class ClockRepository { - constructor( - private readonly dataSource: DataSource, - private readonly databaseConfig: DatabaseConfig, - ) {} - - async getDbTime(): Promise { - if (this.databaseConfig.type === 'postgresdb') { - const [{ now }] = await this.dataSource.query<[{ now: Date }]>( - 'SELECT CURRENT_TIMESTAMP(3) AS now', - ); - return now; - } - - // SQLite: use ISO-friendly format directly to avoid JS-side string manipulation - const [{ now }] = await this.dataSource.query<[{ now: string }]>( - "SELECT STRFTIME('%Y-%m-%dT%H:%M:%fZ', 'NOW') AS now", - ); - const date = new Date(now); - if (Number.isNaN(date.getTime())) { - throw new UnexpectedError(`Invalid DB server time: ${now}`); - } - return date; - } -} diff --git a/packages/@n8n/db/src/repositories/execution.repository.ts b/packages/@n8n/db/src/repositories/execution.repository.ts index a690c31459171..53faf7d3d29af 100644 --- a/packages/@n8n/db/src/repositories/execution.repository.ts +++ b/packages/@n8n/db/src/repositories/execution.repository.ts @@ -602,21 +602,28 @@ export class ExecutionRepository extends Repository { return await this.delete({ id: In(executionIds) }); } - async getWaitingExecutions(): Promise>> { - // DB-clock lookahead: 5s poll + 10s buffer = 15s window. + async getWaitingExecutions() { + // Find all the executions which should be triggered in the next 70 seconds + const waitTill = new Date(Date.now() + 70000); + const where: FindOptionsWhere = { + waitTill: LessThanOrEqual(waitTill), + status: Not('crashed'), + }; + const dbType = this.globalConfig.database.type; + if (dbType === 'sqlite') { + // This is needed because of issue in TypeORM <> SQLite: + // https://github.com/typeorm/typeorm/issues/2286 + where.waitTill = LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(waitTill)); + } - const lookaheadCondition = - dbType === 'postgresdb' - ? "e.waitTill <= NOW() + INTERVAL '15 seconds'" - : "e.waitTill <= datetime('now', '+15 seconds')"; - - return await this.createQueryBuilder('e') - .select(['e.id', 'e.waitTill']) - .where(lookaheadCondition) - .andWhere('e.status = :status', { status: 'waiting' }) - .orderBy('e.waitTill', 'ASC') - .getMany(); + return await this.findMultipleExecutions({ + select: ['id', 'waitTill'], + where, + order: { + waitTill: 'ASC', + }, + }); } async getExecutionsCountForPublicApi(params: { diff --git a/packages/@n8n/db/src/repositories/index.ts b/packages/@n8n/db/src/repositories/index.ts index 059a672493f34..d2f4a2959dfd2 100644 --- a/packages/@n8n/db/src/repositories/index.ts +++ b/packages/@n8n/db/src/repositories/index.ts @@ -4,7 +4,6 @@ export { ApiKeyRepository } from './api-key.repository'; export { AuthIdentityRepository } from './auth-identity.repository'; export { AuthProviderSyncHistoryRepository } from './auth-provider-sync-history.repository'; export { BinaryDataRepository } from './binary-data.repository'; -export { ClockRepository } from './clock.repository'; export { CredentialsRepository } from './credentials.repository'; export { CredentialDependencyRepository } from './credential-dependency.repository'; export { ExecutionAnnotationRepository } from './execution-annotation.repository'; diff --git a/packages/cli/src/__tests__/db-clock.service.test.ts b/packages/cli/src/__tests__/db-clock.service.test.ts deleted file mode 100644 index 70aa605eed7e4..0000000000000 --- a/packages/cli/src/__tests__/db-clock.service.test.ts +++ /dev/null @@ -1,77 +0,0 @@ -import type { ClockRepository } from '@n8n/db'; -import { mock } from 'jest-mock-extended'; - -import { DbClock } from '@/services/db-clock.service'; - -jest.useFakeTimers({ advanceTimers: true }); - -describe('DbClock', () => { - const clockRepository = mock(); - let dbClock: DbClock; - - beforeEach(() => { - dbClock = new DbClock(clockRepository); - }); - - afterEach(() => { - jest.clearAllMocks(); - jest.clearAllTimers(); - }); - - it('should fetch DB time from the repository on first call', async () => { - const dbTime = new Date(); - clockRepository.getDbTime.mockResolvedValue(dbTime); - - const result = await dbClock.getApproximateDbTime(); - - expect(clockRepository.getDbTime).toHaveBeenCalledTimes(1); - expect(result.getTime()).toBeCloseTo(dbTime.getTime(), -1); - }); - - it('should reuse cached DB time within 60s TTL', async () => { - clockRepository.getDbTime.mockResolvedValue(new Date()); - - await dbClock.getApproximateDbTime(); - await dbClock.getApproximateDbTime(); - - expect(clockRepository.getDbTime).toHaveBeenCalledTimes(1); - }); - - it('should refresh DB time after 60s TTL expires', async () => { - clockRepository.getDbTime.mockResolvedValue(new Date()); - - await dbClock.getApproximateDbTime(); - - jest.advanceTimersByTime(60_001); - - await dbClock.getApproximateDbTime(); - - expect(clockRepository.getDbTime).toHaveBeenCalledTimes(2); - }); - - it('should interpolate DB time between cache refreshes', async () => { - const dbTimeAtFetch = new Date(Date.now() - 5_000); - clockRepository.getDbTime.mockResolvedValue(dbTimeAtFetch); - - await dbClock.getApproximateDbTime(); - - jest.advanceTimersByTime(10_000); - - const result = await dbClock.getApproximateDbTime(); - - // Approximate = dbTimeAtFetch + 10s elapsed (RTT is ~0 with mocks) - const expected = dbTimeAtFetch.getTime() + 10_000; - expect(result.getTime()).toBe(expected); - expect(clockRepository.getDbTime).toHaveBeenCalledTimes(1); - }); - - it('should clear cache on resetCache()', async () => { - clockRepository.getDbTime.mockResolvedValue(new Date()); - - await dbClock.getApproximateDbTime(); - dbClock.resetCache(); - await dbClock.getApproximateDbTime(); - - expect(clockRepository.getDbTime).toHaveBeenCalledTimes(2); - }); -}); diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index 5703fea513af0..ce9bae6477379 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -1,14 +1,13 @@ /* eslint-disable @typescript-eslint/unbound-method */ import { mockLogger } from '@n8n/backend-test-utils'; -import type { Project, IExecutionResponse, ExecutionRepository, ExecutionEntity } from '@n8n/db'; +import type { Project, IExecutionResponse, ExecutionRepository } from '@n8n/db'; import { mock, captor } from 'jest-mock-extended'; -import type { ErrorReporter, InstanceSettings } from 'n8n-core'; +import type { InstanceSettings } from 'n8n-core'; import type { IWorkflowBase, IRun, INode, IExecuteData, ITaskData } from 'n8n-workflow'; import { createDeferredPromise, createRunExecutionData, WAIT_INDEFINITELY } from 'n8n-workflow'; import type { ActiveExecutions } from '@/active-executions'; import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; -import type { DbClock } from '@/services/db-clock.service'; import type { OwnershipService } from '@/services/ownership.service'; import { WaitTracker } from '@/wait-tracker'; import type { WorkflowRunner } from '@/workflow-runner'; @@ -20,8 +19,6 @@ describe('WaitTracker', () => { const ownershipService = mock(); const workflowRunner = mock(); const executionRepository = mock(); - const dbClock = mock(); - const errorReporter = mock(); const multiMainSetup = mock(); const instanceSettings = mock({ isLeader: true, isMultiMain: false }); @@ -38,12 +35,9 @@ describe('WaitTracker', () => { startedAt: undefined, }); execution.workflowData = mock({ id: 'abcd' }); - // Minimal ExecutionEntity for getWaitingExecutions — only id and waitTill are used by WaitTracker - const waitingEntity = mock({ id: execution.id, waitTill: execution.waitTill }); let waitTracker: WaitTracker; beforeEach(() => { - dbClock.getApproximateDbTime.mockResolvedValue(new Date()); waitTracker = new WaitTracker( mockLogger(), executionRepository, @@ -51,20 +45,17 @@ describe('WaitTracker', () => { activeExecutions, workflowRunner, instanceSettings, - dbClock, - errorReporter, ); multiMainSetup.on.mockReturnThis(); }); afterEach(() => { jest.clearAllMocks(); - jest.clearAllTimers(); }); describe('init()', () => { it('should query DB for waiting executions if leader', () => { - executionRepository.getWaitingExecutions.mockResolvedValue([waitingEntity]); + executionRepository.getWaitingExecutions.mockResolvedValue([execution]); waitTracker.init(); @@ -94,7 +85,7 @@ describe('WaitTracker', () => { executionRepository.findSingleExecution .calledWith(execution.id) .mockResolvedValue(execution); - executionRepository.getWaitingExecutions.mockResolvedValue([waitingEntity]); + executionRepository.getWaitingExecutions.mockResolvedValue([execution]); ownershipService.getWorkflowProjectCached.mockResolvedValue(project); startExecutionSpy = jest @@ -569,224 +560,6 @@ describe('WaitTracker', () => { expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); }); - - it('should poll every 5 seconds', () => { - executionRepository.getWaitingExecutions.mockResolvedValue([]); - - const setIntervalSpy = jest.spyOn(global, 'setInterval'); - setIntervalSpy.mockClear(); // ensure no prior calls are counted - waitTracker.init(); - - expect(setIntervalSpy).toHaveBeenCalledTimes(1); - expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 5000); - }); - }); - - describe('getWaitingExecutions()', () => { - it('should use server time for triggerTime calculation', async () => { - // Server clock is 10s behind local clock - const serverTime = new Date(Date.now() - 10_000); - dbClock.getApproximateDbTime.mockResolvedValue(serverTime); - - const waitTill = new Date(Date.now() + 5_000); - const delayedExecution = mock({ id: 'delayed-exec', waitTill }); - executionRepository.getWaitingExecutions.mockResolvedValue([delayedExecution]); - - const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); - await waitTracker.getWaitingExecutions(); - - // triggerTime = waitTill - serverTime = ~15s (not ~5s from Date.now()) - const expectedDelay = waitTill.getTime() - serverTime.getTime(); - expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), expectedDelay); - }); - - it('should fire immediately for past-due executions', async () => { - // Server clock 5s ahead — waitTill is already past from the DB's perspective - const serverTime = new Date(Date.now() + 5_000); - dbClock.getApproximateDbTime.mockResolvedValue(serverTime); - - const waitTill = new Date(Date.now() + 2_000); - const pastDueExecution = mock({ id: 'past-due-exec', waitTill }); - executionRepository.getWaitingExecutions.mockResolvedValue([pastDueExecution]); - - const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); - setTimeoutSpy.mockClear(); - - const startExecutionSpy = jest - .spyOn(waitTracker, 'startExecution') - .mockImplementation(async () => {}); - - await waitTracker.getWaitingExecutions(); - - // Math.max(triggerTime, 0) clamps a negative delay to 0 — fires immediately - expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 0); - - jest.advanceTimersByTime(0); - expect(startExecutionSpy).toHaveBeenCalledWith('past-due-exec'); - }); - - it('should warn on clock skew exceeding 2 seconds', async () => { - // mockLogger().scoped() returns a new inner mock — make scoped() return itself - // so that warn calls on the scoped logger are captured on our mock - const logger = mockLogger(); - (logger.scoped as jest.Mock).mockReturnValue(logger); - const skewedWaitTracker = new WaitTracker( - logger, - executionRepository, - ownershipService, - activeExecutions, - workflowRunner, - instanceSettings, - dbClock, - errorReporter, - ); - - // Server clock is 3s ahead — exceeds 2s threshold - dbClock.getApproximateDbTime.mockResolvedValue(new Date(Date.now() + 3_000)); - executionRepository.getWaitingExecutions.mockResolvedValue([]); - - await skewedWaitTracker.getWaitingExecutions(); - - expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Clock skew detected')); - }); - - it('should delegate DB time to DbClock', async () => { - executionRepository.getWaitingExecutions.mockResolvedValue([]); - - await waitTracker.getWaitingExecutions(); - - expect(dbClock.getApproximateDbTime).toHaveBeenCalledTimes(1); - }); - }); - - describe('race condition guards', () => { - describe('overlapping poll guard', () => { - it('should skip poll when previous poll is still in progress', async () => { - // Make getWaitingExecutions hang on the first call by returning a never-resolving promise - let resolveFirstPoll!: (value: unknown[]) => void; - const slowPoll = new Promise((resolve) => { - resolveFirstPoll = resolve; - }); - executionRepository.getWaitingExecutions.mockReturnValueOnce(slowPoll as Promise); - - // Start first poll — it will block on the slow DB query - const firstPoll = waitTracker.getWaitingExecutions(); - - // Second poll should bail out immediately - executionRepository.getWaitingExecutions.mockResolvedValue([]); - await waitTracker.getWaitingExecutions(); - - // Only the first call to getWaitingExecutions should have reached the DB - expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); - - // Unblock and clean up - resolveFirstPoll([]); - await firstPoll; - }); - - it('should allow polling again after previous poll completes', async () => { - executionRepository.getWaitingExecutions.mockResolvedValue([]); - - await waitTracker.getWaitingExecutions(); - await waitTracker.getWaitingExecutions(); - - expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(2); - }); - - it('should allow polling again after previous poll errors', async () => { - executionRepository.getWaitingExecutions.mockRejectedValueOnce( - new Error('DB connection lost'), - ); - - await expect(waitTracker.getWaitingExecutions()).rejects.toThrow('DB connection lost'); - - executionRepository.getWaitingExecutions.mockResolvedValue([]); - await waitTracker.getWaitingExecutions(); - - expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(2); - }); - }); - - describe('execution stays guarded until workflowRunner.run() settles', () => { - const raceExecId = 'race-exec'; - const raceExecution = mock({ - id: raceExecId, - finished: false, - waitTill: new Date(Date.now() + 1_000), - mode: 'manual', - data: mock({ pushRef: 'push_ref', parentExecution: undefined }), - startedAt: undefined, - }); - raceExecution.workflowData = mock({ id: 'race-wf' }); - - it('should keep execution in waitingExecutions during async startExecution work', async () => { - const waitTill = new Date(Date.now() + 1_000); - const entity = mock({ id: raceExecId, waitTill }); - executionRepository.getWaitingExecutions.mockResolvedValue([entity]); - dbClock.getApproximateDbTime.mockResolvedValue(new Date()); - - // Set up the timer via poll - await waitTracker.getWaitingExecutions(); - expect(waitTracker.has(raceExecId)).toBe(true); - - // Make workflowRunner.run() hang so we can observe the guard - let resolveRun!: (value: string) => void; - const runPromise = new Promise((resolve) => { - resolveRun = resolve; - }); - workflowRunner.run.mockReturnValueOnce(runPromise); - executionRepository.findSingleExecution - .calledWith(raceExecId) - .mockResolvedValue(raceExecution); - ownershipService.getWorkflowProjectCached.mockResolvedValue(project); - - // Fire the timer — startExecution begins but blocks on workflowRunner.run() - const startPromise = waitTracker.startExecution(raceExecId); - - // Execution should STILL be in waitingExecutions (guard active) - expect(waitTracker.has(raceExecId)).toBe(true); - - // A second poll should skip this execution because the guard is still active - executionRepository.getWaitingExecutions.mockResolvedValue([entity]); - const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); - const callCountBefore = setTimeoutSpy.mock.calls.length; - - await waitTracker.getWaitingExecutions(); - - // No new timer should have been set for 'race-exec' - const newTimerCalls = setTimeoutSpy.mock.calls.slice(callCountBefore); - expect(newTimerCalls).toHaveLength(0); - - // Clean up - resolveRun('exec-id'); - await startPromise; - - // Now the guard should be released - expect(waitTracker.has(raceExecId)).toBe(false); - }); - - it('should release guard even when workflowRunner.run() throws', async () => { - executionRepository.getWaitingExecutions.mockResolvedValue([]); - dbClock.getApproximateDbTime.mockResolvedValue(new Date()); - - // Set up a waiting execution via poll - const waitTill = new Date(Date.now() + 1_000); - const entity = mock({ id: raceExecId, waitTill }); - executionRepository.getWaitingExecutions.mockResolvedValue([entity]); - await waitTracker.getWaitingExecutions(); - - executionRepository.findSingleExecution - .calledWith(raceExecId) - .mockResolvedValue(raceExecution); - ownershipService.getWorkflowProjectCached.mockResolvedValue(project); - workflowRunner.run.mockRejectedValueOnce(new Error('Runner crashed')); - - await expect(waitTracker.startExecution(raceExecId)).rejects.toThrow('Runner crashed'); - - // Guard should be released so future polls can re-schedule it - expect(waitTracker.has(raceExecId)).toBe(false); - }); - }); }); describe('multi-main setup', () => { @@ -806,8 +579,6 @@ describe('WaitTracker', () => { activeExecutions, workflowRunner, mock({ isLeader: false, isMultiMain: false }), - dbClock, - errorReporter, ); executionRepository.getWaitingExecutions.mockResolvedValue([]); diff --git a/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts b/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts index 412d35dbba444..bf66afdc84ecd 100644 --- a/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts +++ b/packages/cli/src/databases/repositories/__tests__/execution.repository.test.ts @@ -5,6 +5,7 @@ import type { IExecutionResponse } from '@n8n/db'; import { ExecutionEntity, ExecutionRepository } from '@n8n/db'; import { Container } from '@n8n/di'; import type { SelectQueryBuilder } from '@n8n/typeorm'; +import { Not, LessThanOrEqual } from '@n8n/typeorm'; import { mock } from 'jest-mock-extended'; import { BinaryDataService } from 'n8n-core'; import type { IRunExecutionData, IWorkflowBase } from 'n8n-workflow'; @@ -33,33 +34,22 @@ describe('ExecutionRepository', () => { 'on %s, should be called with expected args', async (dbType) => { globalConfig.database.type = dbType; - - const qb = { - select: jest.fn().mockReturnThis(), - where: jest.fn().mockReturnThis(), - andWhere: jest.fn().mockReturnThis(), - orderBy: jest.fn().mockReturnThis(), - getMany: jest.fn().mockResolvedValue([]), - }; - jest - .spyOn(executionRepository, 'createQueryBuilder') - .mockReturnValue(qb as unknown as SelectQueryBuilder); + entityManager.find.mockResolvedValueOnce([]); await executionRepository.getWaitingExecutions(); - expect(executionRepository.createQueryBuilder).toHaveBeenCalledWith('e'); - expect(qb.select).toHaveBeenCalledWith(['e.id', 'e.waitTill']); - - const expectedCondition = - dbType === 'postgresdb' - ? "e.waitTill <= NOW() + INTERVAL '15 seconds'" - : "e.waitTill <= datetime('now', '+15 seconds')"; - expect(qb.where).toHaveBeenCalledWith(expectedCondition); - expect(qb.andWhere).toHaveBeenCalledWith('e.status = :status', { - status: 'waiting', + expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, { + order: { waitTill: 'ASC' }, + select: ['id', 'waitTill'], + where: { + status: Not('crashed'), + waitTill: LessThanOrEqual( + dbType === 'sqlite' + ? '2023-12-28 12:36:06.789' + : new Date('2023-12-28T12:36:06.789Z'), + ), + }, }); - expect(qb.orderBy).toHaveBeenCalledWith('e.waitTill', 'ASC'); - expect(qb.getMany).toHaveBeenCalled(); }, ); }); diff --git a/packages/cli/src/services/db-clock.service.ts b/packages/cli/src/services/db-clock.service.ts deleted file mode 100644 index c9d429579238a..0000000000000 --- a/packages/cli/src/services/db-clock.service.ts +++ /dev/null @@ -1,42 +0,0 @@ -import { ClockRepository } from '@n8n/db'; -import { Service } from '@n8n/di'; - -/** - * Provides an approximation of the DB server's current time. - * - * Fetches from the DB at most once every 60s and approximates intermediate - * values by adding elapsed local wall-clock time since the last query. - * Compensates for query round-trip time using NTP-style half-RTT offset. - */ -@Service() -export class DbClock { - private cache: { dbTime: Date; localTimeAtQuery: number } | null = null; - - constructor(private readonly clockRepository: ClockRepository) {} - - async getApproximateDbTime(): Promise { - const nowMs = Date.now(); - if (!this.isCacheStale(nowMs)) { - const elapsed = nowMs - this.cache!.localTimeAtQuery; - return new Date(this.cache!.dbTime.getTime() + elapsed); - } - const beforeMs = Date.now(); - const dbTime = await this.clockRepository.getDbTime(); - const afterMs = Date.now(); - const halfRtt = (afterMs - beforeMs) / 2; - this.setCache(new Date(dbTime.getTime() + halfRtt), afterMs); - return this.cache!.dbTime; - } - - resetCache() { - this.cache = null; - } - - private isCacheStale(nowMs: number): boolean { - return this.cache === null || nowMs - this.cache.localTimeAtQuery >= 60_000; - } - - private setCache(dbTime: Date, localTimeAtQuery: number) { - this.cache = { dbTime, localTimeAtQuery }; - } -} diff --git a/packages/cli/src/wait-tracker.ts b/packages/cli/src/wait-tracker.ts index 95aeaa855d0fb..04f4a3acd0f39 100644 --- a/packages/cli/src/wait-tracker.ts +++ b/packages/cli/src/wait-tracker.ts @@ -2,12 +2,11 @@ import { Logger } from '@n8n/backend-common'; import { ExecutionRepository } from '@n8n/db'; import { OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators'; import { Service } from '@n8n/di'; -import { ErrorReporter, InstanceSettings } from 'n8n-core'; +import { InstanceSettings } from 'n8n-core'; import { UnexpectedError, type IWorkflowExecutionDataProcess } from 'n8n-workflow'; import { ActiveExecutions } from '@/active-executions'; import { ExecutionAlreadyResumingError } from '@/errors/execution-already-resuming.error'; -import { DbClock } from '@/services/db-clock.service'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowRunner } from '@/workflow-runner'; import { @@ -26,9 +25,6 @@ export class WaitTracker { mainTimer: NodeJS.Timeout; - /** Guards against overlapping poll invocations when DB queries take longer than the poll interval. */ - private isPolling = false; - constructor( private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, @@ -36,8 +32,6 @@ export class WaitTracker { private readonly activeExecutions: ActiveExecutions, private readonly workflowRunner: WorkflowRunner, private readonly instanceSettings: InstanceSettings, - private readonly dbClock: DbClock, - private readonly errorReporter: ErrorReporter, ) { this.logger = this.logger.scoped('waiting-executions'); } @@ -52,9 +46,10 @@ export class WaitTracker { @OnLeaderTakeover() private startTracking() { + // Poll every 60 seconds a list of upcoming executions this.mainTimer = setInterval(() => { void this.getWaitingExecutions(); - }, 5000); + }, 60000); void this.getWaitingExecutions(); @@ -62,64 +57,32 @@ export class WaitTracker { } async getWaitingExecutions() { - if (this.isPolling) { - this.logger.debug('Skipping poll — previous poll still in progress'); - return; - } + this.logger.debug('Querying database for waiting executions'); - this.isPolling = true; - try { - const [executions, dbTime] = await Promise.all([ - this.executionRepository.getWaitingExecutions(), - this.dbClock.getApproximateDbTime(), - ]); - - const skewMs = dbTime.getTime() - Date.now(); - if (Math.abs(skewMs) > 2000) { - this.logger.warn( - `Clock skew detected: this instance is ${Math.abs(skewMs)}ms ${skewMs > 0 ? 'behind' : 'ahead of'} the database server`, - ); - } - - if (executions.length === 0) { - return; - } + const executions = await this.executionRepository.getWaitingExecutions(); - const newExecutions = executions.filter((e) => this.waitingExecutions[e.id] === undefined); + if (executions.length === 0) { + return; + } - if (newExecutions.length > 0) { - const executionIds = newExecutions.map((e) => e.id).join(', '); - this.logger.debug( - `Found ${newExecutions.length} new waiting execution(s). Setting timer for IDs: ${executionIds}`, - ); - } + const executionIds = executions.map((execution) => execution.id).join(', '); + this.logger.debug( + `Found ${executions.length} executions. Setting timer for IDs: ${executionIds}`, + ); - for (const execution of newExecutions) { - const executionId = execution.id; - if (execution.waitTill === null || execution.waitTill === undefined) { - this.errorReporter.error( - new UnexpectedError( - 'Polling returned an execution without waitTill — this should never happen', - { extra: { executionId } }, - ), - { level: 'fatal' }, - ); - continue; - } + // Add timers for each waiting execution that they get started at the correct time - const triggerTime = execution.waitTill.getTime() - dbTime.getTime(); + for (const execution of executions) { + const executionId = execution.id; + if (this.waitingExecutions[executionId] === undefined) { + const triggerTime = execution.waitTill!.getTime() - new Date().getTime(); this.waitingExecutions[executionId] = { executionId, - timer: setTimeout( - () => { - void this.startExecution(executionId); - }, - Math.max(triggerTime, 0), - ), + timer: setTimeout(() => { + void this.startExecution(executionId); + }, triggerTime), }; } - } finally { - this.isPolling = false; } } @@ -133,71 +96,74 @@ export class WaitTracker { async startExecution(executionId: string) { this.logger.debug(`Resuming execution ${executionId}`, { executionId }); + delete this.waitingExecutions[executionId]; - try { - const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); + // Get the data to execute + const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); - if (!fullExecutionData) { - throw new UnexpectedError('Execution does not exist.', { extra: { executionId } }); - } - if (fullExecutionData.finished) { - throw new UnexpectedError('The execution did succeed and can so not be started again.'); - } + if (!fullExecutionData) { + throw new UnexpectedError('Execution does not exist.', { extra: { executionId } }); + } + if (fullExecutionData.finished) { + throw new UnexpectedError('The execution did succeed and can so not be started again.'); + } - if (!fullExecutionData.workflowData.id) { - throw new UnexpectedError('Only saved workflows can be resumed.'); - } + if (!fullExecutionData.workflowData.id) { + throw new UnexpectedError('Only saved workflows can be resumed.'); + } - const workflowId = fullExecutionData.workflowData.id; - const project = await this.ownershipService.getWorkflowProjectCached(workflowId); - - const data: IWorkflowExecutionDataProcess = { - executionMode: fullExecutionData.mode, - executionData: fullExecutionData.data, - workflowData: fullExecutionData.workflowData, - projectId: project.id, - pushRef: fullExecutionData.data.pushRef, - startedAt: fullExecutionData.startedAt, - }; - - try { - await this.workflowRunner.run(data, false, false, executionId); - } catch (error) { - if (error instanceof ExecutionAlreadyResumingError) { - this.logger.debug( - `Execution ${executionId} is already being resumed, skipping duplicate resume`, - { executionId }, - ); - return; - } - throw error; - } + const workflowId = fullExecutionData.workflowData.id; + const project = await this.ownershipService.getWorkflowProjectCached(workflowId); - const { parentExecution } = fullExecutionData.data; - if (shouldRestartParentExecution(parentExecution)) { - void this.activeExecutions - .getPostExecutePromise(executionId) - .then(async (subworkflowResults) => { - if (!subworkflowResults) return; - if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing. - await updateParentExecutionWithChildResults( - this.executionRepository, - parentExecution.executionId, - subworkflowResults, - ); - return subworkflowResults; - }) - .then((subworkflowResults) => { - if (!subworkflowResults) return; - if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing. - void this.startExecution(parentExecution.executionId); - }); + const data: IWorkflowExecutionDataProcess = { + executionMode: fullExecutionData.mode, + executionData: fullExecutionData.data, + workflowData: fullExecutionData.workflowData, + projectId: project.id, + pushRef: fullExecutionData.data.pushRef, + startedAt: fullExecutionData.startedAt, + }; + + // Start the execution again + try { + await this.workflowRunner.run(data, false, false, executionId); + } catch (error) { + if (error instanceof ExecutionAlreadyResumingError) { + // This execution is already being resumed by another child execution + // This is expected in "run once for each item" mode when multiple children complete + this.logger.debug( + `Execution ${executionId} is already being resumed, skipping duplicate resume`, + { executionId }, + ); + return; } - } finally { - delete this.waitingExecutions[executionId]; + // Rethrow any other errors + throw error; + } + + const { parentExecution } = fullExecutionData.data; + if (shouldRestartParentExecution(parentExecution)) { + // on child execution completion, resume parent execution + void this.activeExecutions + .getPostExecutePromise(executionId) + .then(async (subworkflowResults) => { + if (!subworkflowResults) return; + if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing. + await updateParentExecutionWithChildResults( + this.executionRepository, + parentExecution.executionId, + subworkflowResults, + ); + return subworkflowResults; + }) + .then((subworkflowResults) => { + if (!subworkflowResults) return; + if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing. + void this.startExecution(parentExecution.executionId); + }); } } diff --git a/packages/cli/test/integration/database/repositories/execution.repository.test.ts b/packages/cli/test/integration/database/repositories/execution.repository.test.ts index 2d9f1db1ad83b..a9327a1524562 100644 --- a/packages/cli/test/integration/database/repositories/execution.repository.test.ts +++ b/packages/cli/test/integration/database/repositories/execution.repository.test.ts @@ -61,132 +61,6 @@ describe('ExecutionRepository', () => { }); }); }); - describe('getWaitingExecutions', () => { - it('should return waiting executions within the 15s lookahead window', async () => { - const executionRepo = Container.get(ExecutionRepository); - const workflow = await createWorkflow(); - - // waitTill in the past — should be returned - await executionRepo.insert({ - workflowId: workflow.id, - mode: 'manual', - startedAt: new Date(), - status: 'waiting', - finished: false, - waitTill: new Date(Date.now() - 5_000), - createdAt: new Date(), - }); - - // waitTill 10s from now — within 15s lookahead, should be returned - await executionRepo.insert({ - workflowId: workflow.id, - mode: 'manual', - startedAt: new Date(), - status: 'waiting', - finished: false, - waitTill: new Date(Date.now() + 10_000), - createdAt: new Date(), - }); - - const results = await executionRepo.getWaitingExecutions(); - - expect(results).toHaveLength(2); - }); - - it('should exclude waiting executions beyond the 15s lookahead window', async () => { - const executionRepo = Container.get(ExecutionRepository); - const workflow = await createWorkflow(); - - // waitTill 1 hour from now — well outside the 15s lookahead - await executionRepo.insert({ - workflowId: workflow.id, - mode: 'manual', - startedAt: new Date(), - status: 'waiting', - finished: false, - waitTill: new Date(Date.now() + 3_600_000), - createdAt: new Date(), - }); - - const results = await executionRepo.getWaitingExecutions(); - - expect(results).toHaveLength(0); - }); - - it('should exclude non-waiting executions even if waitTill is in range', async () => { - const executionRepo = Container.get(ExecutionRepository); - const workflow = await createWorkflow(); - - await executionRepo.insert({ - workflowId: workflow.id, - mode: 'manual', - startedAt: new Date(), - status: 'success', - finished: true, - waitTill: new Date(), - createdAt: new Date(), - }); - - const results = await executionRepo.getWaitingExecutions(); - - expect(results).toHaveLength(0); - }); - - it('should order results by waitTill ascending', async () => { - const executionRepo = Container.get(ExecutionRepository); - const workflow = await createWorkflow(); - - const laterWaitTill = new Date(Date.now() + 5_000); - const earlierWaitTill = new Date(Date.now() - 5_000); - - const { identifiers: laterIds } = await executionRepo.insert({ - workflowId: workflow.id, - mode: 'manual', - startedAt: new Date(), - status: 'waiting', - finished: false, - waitTill: laterWaitTill, - createdAt: new Date(), - }); - - const { identifiers: earlierIds } = await executionRepo.insert({ - workflowId: workflow.id, - mode: 'manual', - startedAt: new Date(), - status: 'waiting', - finished: false, - waitTill: earlierWaitTill, - createdAt: new Date(), - }); - - const results = await executionRepo.getWaitingExecutions(); - - expect(results).toHaveLength(2); - expect(String(results[0].id)).toBe(String(earlierIds[0].id)); - expect(String(results[1].id)).toBe(String(laterIds[0].id)); - }); - - it('should only return id and waitTill fields', async () => { - const executionRepo = Container.get(ExecutionRepository); - const workflow = await createWorkflow(); - - await executionRepo.insert({ - workflowId: workflow.id, - mode: 'manual', - startedAt: new Date(), - status: 'waiting', - finished: false, - waitTill: new Date(), - createdAt: new Date(), - }); - - const results = await executionRepo.getWaitingExecutions(); - - expect(results).toHaveLength(1); - expect(Object.keys(results[0]).sort()).toEqual(['id', 'waitTill']); - }); - }); - describe('findByStopExecutionsFilter', () => { it('should find executions by status', async () => { const executionRepo = Container.get(ExecutionRepository); diff --git a/packages/nodes-base/nodes/Wait/Wait.node.ts b/packages/nodes-base/nodes/Wait/Wait.node.ts index ac3b0517bac96..f54d3b256cad0 100644 --- a/packages/nodes-base/nodes/Wait/Wait.node.ts +++ b/packages/nodes-base/nodes/Wait/Wait.node.ts @@ -591,6 +591,21 @@ export class Wait extends Webhook { } } + const waitValue = Math.max(waitTill.getTime() - new Date().getTime(), 0); + + if (waitValue < 65000) { + // If wait time is shorter than 65 seconds leave execution active because + // we just check the database every 60 seconds. + return await new Promise((resolve, _reject) => { + const timer = setTimeout(() => resolve([context.getInputData()]), waitValue); + context.onExecutionCancellation(() => { + clearTimeout(timer); + resolve([context.getInputData()]); + }); + }); + } + + // If longer than 65 seconds put execution to wait return await this.putToWait(context, waitTill); } diff --git a/packages/nodes-base/nodes/Wait/test/Wait.node.test.ts b/packages/nodes-base/nodes/Wait/test/Wait.node.test.ts index 10a058bfa6d71..fbbdcacf23946 100644 --- a/packages/nodes-base/nodes/Wait/test/Wait.node.test.ts +++ b/packages/nodes-base/nodes/Wait/test/Wait.node.test.ts @@ -6,13 +6,17 @@ import { NodeOperationError, type IExecuteFunctions } from 'n8n-workflow'; import { Wait } from '../Wait.node'; describe('Execute Wait Node', () => { + let timer: NodeJS.Timeout; + const { clearInterval, setInterval } = global; const nextDay = DateTime.now().startOf('day').plus({ days: 1 }); beforeAll(() => { + timer = setInterval(() => jest.advanceTimersByTime(1000), 10); jest.useFakeTimers().setSystemTime(new Date('2025-01-01')); }); afterAll(() => { + clearInterval(timer); jest.useRealTimers(); }); @@ -63,36 +67,45 @@ describe('Execute Wait Node', () => { }, ); + test('should resolve with input data if canceled', async () => { + const putExecutionToWaitSpy = jest.fn(); + const waitNode = new Wait(); + + let cancelSignal: (() => void) | null = null; + + const inputData = [{ json: { test: 'data' } }]; + + const executeFunctionsMock = mock({ + getNodeParameter: jest.fn().mockImplementation((paramName: string) => { + if (paramName === 'resume') return 'timeInterval'; + if (paramName === 'unit') return 'seconds'; + if (paramName === 'amount') return 60; + }), + getTimezone: jest.fn().mockReturnValue('UTC'), + putExecutionToWait: putExecutionToWaitSpy, + getInputData: jest.fn(() => inputData), + getNode: jest.fn(), + onExecutionCancellation: (handler) => { + cancelSignal = handler; + }, + }); + + const waitPromise = waitNode.execute(executeFunctionsMock); + + for (let index = 0; index < 20; index++) { + await new Promise((r) => setTimeout(r, 10)); + if (cancelSignal !== null) break; + } + + expect(cancelSignal).not.toBeNull(); + cancelSignal!(); + + await expect(waitPromise).resolves.toEqual([inputData]); + }); + describe('Validation', () => { describe('Time interval', () => { it.each([ - // Previously in-memory path (< 65s) — now all go through putToWait - { - unit: 'seconds', - amount: 5, - expectedWaitTill: () => DateTime.now().plus({ seconds: 5 }).toJSDate(), - }, - { - unit: 'seconds', - amount: 30, - expectedWaitTill: () => DateTime.now().plus({ seconds: 30 }).toJSDate(), - }, - { - unit: 'seconds', - amount: 60, - expectedWaitTill: () => DateTime.now().plus({ seconds: 60 }).toJSDate(), - }, - { - unit: 'seconds', - amount: 64, - expectedWaitTill: () => DateTime.now().plus({ seconds: 64 }).toJSDate(), - }, - // DB-persisted path (>= 65s) - { - unit: 'seconds', - amount: 66, - expectedWaitTill: () => DateTime.now().plus({ seconds: 66 }).toJSDate(), - }, { unit: 'seconds', amount: 300, @@ -116,6 +129,7 @@ describe('Execute Wait Node', () => { { unit: 'seconds', amount: 0, + mode: 'timeout', expectedWaitTill: () => DateTime.now().toJSDate(), }, { @@ -135,7 +149,7 @@ describe('Execute Wait Node', () => { }, ])( 'Validate wait unit: $unit, amount: $amount', - async ({ unit, amount, expectedWaitTill, error }) => { + async ({ unit, amount, expectedWaitTill, error, mode }) => { const putExecutionToWaitSpy = jest.fn(); const waitNode = new Wait(); const inputData = [{ json: { inputData: true } }]; @@ -152,9 +166,16 @@ describe('Execute Wait Node', () => { }); if (!error) { - // All time-based waits are now persisted to DB via putToWait - await expect(waitNode.execute(executeFunctionsMock)).resolves.not.toThrow(); - expect(putExecutionToWaitSpy).toHaveBeenCalledWith(expectedWaitTill?.()); + if (mode === 'timeout') { + // for short wait times (<65s) a simple timeout is used + const resultPromise = waitNode.execute(executeFunctionsMock); + jest.runAllTimers(); + await expect(resultPromise).resolves.toEqual([inputData]); + } else { + // for longer wait times (>=65s) the execution is put to wait + await expect(waitNode.execute(executeFunctionsMock)).resolves.not.toThrow(); + expect(putExecutionToWaitSpy).toHaveBeenCalledWith(expectedWaitTill?.()); + } } else { await expect(waitNode.execute(executeFunctionsMock)).rejects.toThrowError(error); } diff --git a/packages/nodes-base/nodes/Wait/test/Wait.workflow.json b/packages/nodes-base/nodes/Wait/test/Wait.workflow.json new file mode 100644 index 0000000000000..27fc8ff5abf7d --- /dev/null +++ b/packages/nodes-base/nodes/Wait/test/Wait.workflow.json @@ -0,0 +1,162 @@ +{ + "name": "[Unit Test] Wait Node", + "nodes": [ + { + "parameters": {}, + "id": "76e5dcfd-fdc7-472f-8832-bccc0eb122c0", + "name": "When clicking \"Execute Workflow\"", + "type": "n8n-nodes-base.manualTrigger", + "typeVersion": 1, + "position": [120, 420] + }, + { + "parameters": { + "amount": 42, + "unit": "seconds" + }, + "id": "37f2c758-6fb2-43ce-86ae-ca11ec957cbd", + "name": "Wait", + "type": "n8n-nodes-base.wait", + "typeVersion": 1, + "position": [560, 420], + "webhookId": "35edc971-c3e4-48cf-835d-4d73a4fd1fd8" + }, + { + "parameters": { + "conditions": { + "number": [ + { + "value1": "={{ parseInt($json.afterTimestamp) }}", + "operation": "largerEqual", + "value2": "={{ parseInt($json.startTimestamp) + 42 }}" + } + ] + } + }, + "id": "c5c53934-2677-4adf-a4df-b32f3b0642a2", + "name": "IF", + "type": "n8n-nodes-base.if", + "typeVersion": 1, + "position": [960, 420] + }, + { + "parameters": { + "keepOnlySet": true, + "values": { + "boolean": [ + { + "name": "success", + "value": true + } + ] + }, + "options": {} + }, + "id": "a78417b6-d3f5-4bbc-916a-d4b9d46961cc", + "name": "Set1", + "type": "n8n-nodes-base.set", + "typeVersion": 1, + "position": [1180, 400] + }, + { + "parameters": { + "value": "={{ $now }}", + "dataPropertyName": "afterTimestamp", + "toFormat": "X", + "options": {} + }, + "id": "94f042ea-49d5-44ea-9ccf-93dac8d27d4a", + "name": "After", + "type": "n8n-nodes-base.dateTime", + "typeVersion": 1, + "position": [760, 420] + }, + { + "parameters": { + "value": "={{ $now }}", + "dataPropertyName": "startTimestamp", + "toFormat": "X", + "options": {} + }, + "id": "43f8a396-1bf7-484e-962c-120f677dfa51", + "name": "Before", + "type": "n8n-nodes-base.dateTime", + "typeVersion": 1, + "position": [360, 420] + } + ], + "pinData": { + "Set1": [ + { + "json": { + "success": true + } + } + ] + }, + "connections": { + "When clicking \"Execute Workflow\"": { + "main": [ + [ + { + "node": "Before", + "type": "main", + "index": 0 + } + ] + ] + }, + "Wait": { + "main": [ + [ + { + "node": "After", + "type": "main", + "index": 0 + } + ] + ] + }, + "IF": { + "main": [ + [ + { + "node": "Set1", + "type": "main", + "index": 0 + } + ] + ] + }, + "After": { + "main": [ + [ + { + "node": "IF", + "type": "main", + "index": 0 + } + ] + ] + }, + "Before": { + "main": [ + [ + { + "node": "Wait", + "type": "main", + "index": 0 + } + ] + ] + } + }, + "active": false, + "settings": {}, + "versionId": "8ed794a0-5c04-4b8a-9a49-02c2c7f8003f", + "id": "500", + "meta": { + "instanceId": "8c8c5237b8e37b006a7adce87f4369350c58e41f3ca9de16196d3197f69eabcd" + }, + "tags": [] +}