Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { In, LessThan, And, Not, type SelectQueryBuilder } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { In, LessThan, And, Not } from '@n8n/typeorm';

import type { IExecutionResponse } from 'entities/types-db';

Expand Down Expand Up @@ -409,61 +407,6 @@ describe('ExecutionRepository', () => {
});
});

describe('getWaitingExecutions()', () => {
const globalConfig = Container.get(GlobalConfig);
const originalDbType = globalConfig.database.type;

let queryBuilder: jest.Mocked<SelectQueryBuilder<ExecutionEntity>>;

beforeEach(() => {
queryBuilder = mock<SelectQueryBuilder<ExecutionEntity>>();
queryBuilder.select.mockReturnThis();
queryBuilder.where.mockReturnThis();
queryBuilder.andWhere.mockReturnThis();
queryBuilder.orderBy.mockReturnThis();
queryBuilder.getMany.mockResolvedValue([]);
jest.spyOn(executionRepository, 'createQueryBuilder').mockReturnValue(queryBuilder);
});

afterEach(() => {
globalConfig.database.type = originalDbType;
});

it('should filter by status = waiting', async () => {
await executionRepository.getWaitingExecutions();

expect(queryBuilder.andWhere).toHaveBeenCalledWith('e.status = :status', {
status: 'waiting',
});
});

it('should use a DB-clock lookahead condition for PostgreSQL', async () => {
globalConfig.database.type = 'postgresdb';

await executionRepository.getWaitingExecutions();

expect(queryBuilder.where).toHaveBeenCalledWith(
expect.stringContaining("NOW() + INTERVAL '15 seconds'"),
);
});

it('should use a DB-clock lookahead condition for SQLite', async () => {
globalConfig.database.type = 'sqlite';

await executionRepository.getWaitingExecutions();

expect(queryBuilder.where).toHaveBeenCalledWith(
expect.stringContaining("datetime('now', '+15 seconds')"),
);
});

it('should order results by waitTill ASC', async () => {
await executionRepository.getWaitingExecutions();

expect(queryBuilder.orderBy).toHaveBeenCalledWith('e.waitTill', 'ASC');
});
});

describe('setRunning', () => {
beforeEach(() => {
entityManager.transaction.mockImplementation(async (fn: unknown) => {
Expand Down
32 changes: 0 additions & 32 deletions packages/@n8n/db/src/repositories/clock.repository.ts

This file was deleted.

33 changes: 20 additions & 13 deletions packages/@n8n/db/src/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,21 +602,28 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return await this.delete({ id: In(executionIds) });
}

async getWaitingExecutions(): Promise<Array<Pick<ExecutionEntity, 'id' | 'waitTill'>>> {
// DB-clock lookahead: 5s poll + 10s buffer = 15s window.
async getWaitingExecutions() {
// Find all the executions which should be triggered in the next 70 seconds
const waitTill = new Date(Date.now() + 70000);
const where: FindOptionsWhere<ExecutionEntity> = {
waitTill: LessThanOrEqual(waitTill),
status: Not('crashed'),
};

const dbType = this.globalConfig.database.type;
if (dbType === 'sqlite') {
// This is needed because of issue in TypeORM <> SQLite:
// https://github.com/typeorm/typeorm/issues/2286
where.waitTill = LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(waitTill));
}

const lookaheadCondition =
dbType === 'postgresdb'
? "e.waitTill <= NOW() + INTERVAL '15 seconds'"
: "e.waitTill <= datetime('now', '+15 seconds')";

return await this.createQueryBuilder('e')
.select(['e.id', 'e.waitTill'])
.where(lookaheadCondition)
.andWhere('e.status = :status', { status: 'waiting' })
.orderBy('e.waitTill', 'ASC')
.getMany();
return await this.findMultipleExecutions({
select: ['id', 'waitTill'],
where,
order: {
waitTill: 'ASC',
},
});
}

async getExecutionsCountForPublicApi(params: {
Expand Down
1 change: 0 additions & 1 deletion packages/@n8n/db/src/repositories/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ export { ApiKeyRepository } from './api-key.repository';
export { AuthIdentityRepository } from './auth-identity.repository';
export { AuthProviderSyncHistoryRepository } from './auth-provider-sync-history.repository';
export { BinaryDataRepository } from './binary-data.repository';
export { ClockRepository } from './clock.repository';
export { CredentialsRepository } from './credentials.repository';
export { CredentialDependencyRepository } from './credential-dependency.repository';
export { ExecutionAnnotationRepository } from './execution-annotation.repository';
Expand Down
77 changes: 0 additions & 77 deletions packages/cli/src/__tests__/db-clock.service.test.ts

This file was deleted.

Loading
Loading