Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/@n8n/config/src/configs/event-bus.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ class LogWriterConfig {
/** Base filename for event log files (extension and rotation suffix are added). */
@Env('N8N_EVENTBUS_LOGWRITER_LOGBASENAME')
logBaseName: string = 'n8nEventLog';

/**
* Safety tripwire: per-file cap on concurrently unconfirmed messages held in memory
* during startup log parsing. Aborts the file if exceeded, to prevent OOM on legacy
* logs with many orphaned messages. Tune up if healthy workloads hit false positives.
*/
@Env('N8N_EVENTBUS_LOGWRITER_MAXMESSAGESPERPARSE')
maxMessagesPerParse: number = 10_000;
}

const recoveryModeSchema = z.enum(['simple', 'extensive']);
Expand Down
1 change: 1 addition & 0 deletions packages/@n8n/config/test/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ describe('GlobalConfig', () => {
keepLogCount: 3,
logBaseName: 'n8nEventLog',
maxFileSizeInKB: 10240,
maxMessagesPerParse: 10_000,
},
},
externalHooks: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { Logger } from '@n8n/backend-common';
import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { EventMessageTypeNames } from 'n8n-workflow';
import { mkdtempSync, rmSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';

import type { EventMessageTypes } from '../../event-message-classes';
import { MessageEventBusLogWriter } from '../message-event-bus-log-writer';

jest.unmock('node:fs');
jest.unmock('node:fs/promises');

describe('MessageEventBusLogWriter.readLoggedMessagesFromFile', () => {
let tempDir: string;
let logger: ReturnType<typeof mock<Logger>>;
let writer: MessageEventBusLogWriter;

const makeWorkflowStartedLine = (id: string, executionId: string) =>
JSON.stringify({
__type: EventMessageTypeNames.workflow,
id,
ts: '2026-04-16T12:00:00.000Z',
eventName: 'n8n.workflow.started',
message: 'n8n.workflow.started',
payload: { executionId },
});

const makeConfirmLine = (id: string) =>
JSON.stringify({
__type: EventMessageTypeNames.confirm,
confirm: id,
ts: '2026-04-16T12:00:00.000Z',
source: { id: '', name: '' },
});

const writeLogFile = (fileName: string, lines: string[]): string => {
const path = join(tempDir, fileName);
writeFileSync(path, lines.join('\n') + '\n');
return path;
};

const setMaxMessagesPerParse = (maxMessagesPerParse: number) => {
const globalConfig = mock<GlobalConfig>({
eventBus: { logWriter: { maxMessagesPerParse, keepLogCount: 3 } },
});
Container.set(GlobalConfig, globalConfig);
};

beforeEach(() => {
tempDir = mkdtempSync(join(tmpdir(), 'eventbus-log-writer-test-'));
logger = mock<Logger>();
Container.set(Logger, logger);
});

afterEach(() => {
rmSync(tempDir, { recursive: true, force: true });
Container.reset();
});

it('aborts parsing and warns when the in-memory working set exceeds the configured max', async () => {
const maxMessagesPerParse = 5;
setMaxMessagesPerParse(maxMessagesPerParse);
writer = new MessageEventBusLogWriter();

const lines: string[] = [];
for (let i = 0; i < 100; i++) {
lines.push(makeWorkflowStartedLine(`id-${i}`, `exec-${i}`));
}
const logFile = writeLogFile('bloated.log', lines);

const results = {
loggedMessages: [] as EventMessageTypes[],
sentMessages: [] as EventMessageTypes[],
unfinishedExecutions: {} as Record<string, EventMessageTypes[]>,
};

await writer.readLoggedMessagesFromFile(results, 'unsent', logFile);

expect(results.loggedMessages.length).toBeLessThan(100);
expect(results.loggedMessages.length).toBeLessThanOrEqual(maxMessagesPerParse + 1);
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining('exceeded 5 in-memory messages during parse'),
);
});

it('uses per-file count so prior file accumulation does not abort the next file', async () => {
const maxMessagesPerParse = 5;
setMaxMessagesPerParse(maxMessagesPerParse);
writer = new MessageEventBusLogWriter();

// File 1: 4 unconfirmed messages (below limit)
const lines1: string[] = [];
for (let i = 0; i < 4; i++) {
lines1.push(makeWorkflowStartedLine(`old-id-${i}`, `old-exec-${i}`));
}
const logFile1 = writeLogFile('old.log', lines1);

// File 2: 4 unconfirmed messages (below limit per-file, but 8 total)
const lines2: string[] = [];
for (let i = 0; i < 4; i++) {
lines2.push(makeWorkflowStartedLine(`new-id-${i}`, `new-exec-${i}`));
}
const logFile2 = writeLogFile('new.log', lines2);

const results = {
loggedMessages: [] as EventMessageTypes[],
sentMessages: [] as EventMessageTypes[],
unfinishedExecutions: {} as Record<string, EventMessageTypes[]>,
};

await writer.readLoggedMessagesFromFile(results, 'unsent', logFile1);
await writer.readLoggedMessagesFromFile(results, 'unsent', logFile2);

// Both files should be fully parsed (8 total, each file under limit)
expect(results.loggedMessages).toHaveLength(8);
expect(logger.warn).not.toHaveBeenCalled();
});

it('does not apply the guard in "all" mode since confirms do not prune', async () => {
const maxMessagesPerParse = 5;
setMaxMessagesPerParse(maxMessagesPerParse);
writer = new MessageEventBusLogWriter();

const lines: string[] = [];
for (let i = 0; i < 20; i++) {
lines.push(makeWorkflowStartedLine(`id-${i}`, `exec-${i}`));
}
const logFile = writeLogFile('all-mode.log', lines);

const results = {
loggedMessages: [] as EventMessageTypes[],
sentMessages: [] as EventMessageTypes[],
unfinishedExecutions: {} as Record<string, EventMessageTypes[]>,
};

await writer.readLoggedMessagesFromFile(results, 'all', logFile);

expect(results.loggedMessages).toHaveLength(20);
expect(logger.warn).not.toHaveBeenCalled();
});

it('does not abort when confirms prune the working set below the limit', async () => {
const maxMessagesPerParse = 5;
setMaxMessagesPerParse(maxMessagesPerParse);
writer = new MessageEventBusLogWriter();

const lines: string[] = [];
for (let i = 0; i < 100; i++) {
const id = `id-${i}`;
lines.push(makeWorkflowStartedLine(id, `exec-${i}`));
lines.push(makeConfirmLine(id));
}
const logFile = writeLogFile('healthy.log', lines);

const results = {
loggedMessages: [] as EventMessageTypes[],
sentMessages: [] as EventMessageTypes[],
unfinishedExecutions: {} as Record<string, EventMessageTypes[]>,
};

await writer.readLoggedMessagesFromFile(results, 'unsent', logFile);

expect(results.loggedMessages).toHaveLength(0);
expect(logger.warn).not.toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -205,51 +205,40 @@ export class MessageEventBusLogWriter {
): Promise<ReadMessagesFromLogFileResult> {
if (logFileName && existsSync(logFileName)) {
try {
const stream = createReadStream(logFileName);
stream.on('error', (error) => {
if ((error as NodeJS.ErrnoException).code !== 'ERR_STREAM_DESTROYED') {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we swallow all errors and never throw?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, errors are logged but not propagated. I think this makes sense, as we don't want this recovery process (furthermore a single log file reading) to hard fail the whole instance. It's a not critical recovery AFAIK (for instance, we've allowed ourselves to tamper with those files in cloud medic system to solve instances failures)

this.logger.error(`Error reading logged messages from file: ${logFileName}`, {
error,
});
}
});
const rl = readline.createInterface({
input: createReadStream(logFileName),
input: stream,
crlfDelay: Infinity,
});
// Safety guard: abort if the per-file working set grows too large.
// Healthy files stay small because confirm lines prune loggedMessages as
// they stream; legacy files with orphaned messages (pre-PR #27334) are
// the pathological case this guards against.
// The guard is skipped in 'all' mode because confirms don't prune there,
// so the count would grow monotonically even for healthy files.
const maxMessagesPerParse = this.globalConfig.eventBus.logWriter.maxMessagesPerParse;
const baselineCount = results.loggedMessages.length;
let aborted = false;
rl.on('line', (line) => {
try {
const json = jsonParse(line);
if (isEventMessageOptions(json) && json.__type !== undefined) {
const msg = this.getEventMessageObjectByType(json);
if (msg !== null) results.loggedMessages.push(msg);
if (msg?.eventName && msg.payload?.executionId) {
const executionId = msg.payload.executionId as string;
switch (msg.eventName) {
case 'n8n.workflow.started':
if (!Object.keys(results.unfinishedExecutions).includes(executionId)) {
results.unfinishedExecutions[executionId] = [];
}
results.unfinishedExecutions[executionId] = [msg];
break;
case 'n8n.workflow.success':
case 'n8n.workflow.failed':
case 'n8n.execution.throttled':
case 'n8n.execution.started-during-bootup':
delete results.unfinishedExecutions[executionId];
break;
case 'n8n.node.started':
case 'n8n.node.finished':
if (!Object.keys(results.unfinishedExecutions).includes(executionId)) {
results.unfinishedExecutions[executionId] = [];
}
results.unfinishedExecutions[executionId].push(msg);
break;
}
}
}
if (isEventMessageConfirm(json) && mode !== 'all') {
const removedMessage = remove(results.loggedMessages, (e) => e.id === json.confirm);
if (mode === 'sent') {
results.sentMessages.push(...removedMessage);
}
}
} catch (error) {
this.logger.error(
`Error reading line messages from file: ${logFileName}, line: ${line}, ${error.message}}`,
if (aborted) return;
this.processLoggedLine(line, results, mode, logFileName);
if (
mode !== 'all' &&
results.loggedMessages.length - baselineCount > maxMessagesPerParse
) {
aborted = true;
this.logger.warn(
`Event log ${logFileName} exceeded ${maxMessagesPerParse} in-memory messages during parse; aborting to prevent out-of-memory. Some unfinished execution recovery may be skipped. Tune via N8N_EVENTBUS_LOGWRITER_MAXMESSAGESPERPARSE.`,
);
rl.close();
stream.destroy();
}
});
// wait for stream to finish before continue
Expand All @@ -261,6 +250,56 @@ export class MessageEventBusLogWriter {
return results;
}

// eslint-disable-next-line complexity
private processLoggedLine(
line: string,
results: ReadMessagesFromLogFileResult,
mode: EventMessageReturnMode,
logFileName: string,
): void {
try {
const json = jsonParse(line);
if (isEventMessageOptions(json) && json.__type !== undefined) {
const msg = this.getEventMessageObjectByType(json);
if (msg !== null) results.loggedMessages.push(msg);
if (msg?.eventName && msg.payload?.executionId) {
const executionId = msg.payload.executionId as string;
switch (msg.eventName) {
case 'n8n.workflow.started':
if (!Object.keys(results.unfinishedExecutions).includes(executionId)) {
results.unfinishedExecutions[executionId] = [];
}
results.unfinishedExecutions[executionId] = [msg];
break;
case 'n8n.workflow.success':
case 'n8n.workflow.failed':
case 'n8n.execution.throttled':
case 'n8n.execution.started-during-bootup':
delete results.unfinishedExecutions[executionId];
break;
case 'n8n.node.started':
case 'n8n.node.finished':
if (!Object.keys(results.unfinishedExecutions).includes(executionId)) {
results.unfinishedExecutions[executionId] = [];
}
results.unfinishedExecutions[executionId].push(msg);
break;
}
}
}
if (isEventMessageConfirm(json) && mode !== 'all') {
const removedMessage = remove(results.loggedMessages, (e) => e.id === json.confirm);
if (mode === 'sent') {
results.sentMessages.push(...removedMessage);
}
}
} catch (error) {
this.logger.error(
`Error reading line messages from file: ${logFileName}, line: ${line}, ${error.message}}`,
);
}
}

getLogFileName(counter?: number): string {
if (counter) {
return `${MessageEventBusLogWriter.options.logFullBasePath}-${counter}.log`;
Expand Down
Loading