Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,62 @@ describe('MessageEventBusLogWriter.readLoggedMessagesFromFile', () => {
);
});

it('uses per-file count so prior file accumulation does not abort the next file', async () => {
const maxMessagesPerParse = 5;
setMaxMessagesPerParse(maxMessagesPerParse);
writer = new MessageEventBusLogWriter();

// File 1: 4 unconfirmed messages (below limit)
const lines1: string[] = [];
for (let i = 0; i < 4; i++) {
lines1.push(makeWorkflowStartedLine(`old-id-${i}`, `old-exec-${i}`));
}
const logFile1 = writeLogFile('old.log', lines1);

// File 2: 4 unconfirmed messages (below limit per-file, but 8 total)
const lines2: string[] = [];
for (let i = 0; i < 4; i++) {
lines2.push(makeWorkflowStartedLine(`new-id-${i}`, `new-exec-${i}`));
}
const logFile2 = writeLogFile('new.log', lines2);

const results = {
loggedMessages: [] as EventMessageTypes[],
sentMessages: [] as EventMessageTypes[],
unfinishedExecutions: {} as Record<string, EventMessageTypes[]>,
};

await writer.readLoggedMessagesFromFile(results, 'unsent', logFile1);
await writer.readLoggedMessagesFromFile(results, 'unsent', logFile2);

// Both files should be fully parsed (8 total, each file under limit)
expect(results.loggedMessages).toHaveLength(8);
expect(logger.warn).not.toHaveBeenCalled();
});

it('does not apply the guard in "all" mode since confirms do not prune', async () => {
const maxMessagesPerParse = 5;
setMaxMessagesPerParse(maxMessagesPerParse);
writer = new MessageEventBusLogWriter();

const lines: string[] = [];
for (let i = 0; i < 20; i++) {
lines.push(makeWorkflowStartedLine(`id-${i}`, `exec-${i}`));
}
const logFile = writeLogFile('all-mode.log', lines);

const results = {
loggedMessages: [] as EventMessageTypes[],
sentMessages: [] as EventMessageTypes[],
unfinishedExecutions: {} as Record<string, EventMessageTypes[]>,
};

await writer.readLoggedMessagesFromFile(results, 'all', logFile);

expect(results.loggedMessages).toHaveLength(20);
expect(logger.warn).not.toHaveBeenCalled();
});

it('does not abort when confirms prune the working set below the limit', async () => {
const maxMessagesPerParse = 5;
setMaxMessagesPerParse(maxMessagesPerParse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,27 @@ export class MessageEventBusLogWriter {
if (logFileName && existsSync(logFileName)) {
try {
const stream = createReadStream(logFileName);
stream.on('error', () => {}); // absorb errors after destroy()
const rl = readline.createInterface({
input: stream,
crlfDelay: Infinity,
});
// Safety guard: abort if the in-memory working set grows too large.
// Safety guard: abort if the per-file working set grows too large.
// Healthy files stay small because confirm lines prune loggedMessages as
// they stream; legacy files with orphaned messages (pre-PR #27334) are
// the pathological case this guards against.
// The guard is skipped in 'all' mode because confirms don't prune there,
// so the count would grow monotonically even for healthy files.
const maxMessagesPerParse = this.globalConfig.eventBus.logWriter.maxMessagesPerParse;
const baselineCount = results.loggedMessages.length;
let aborted = false;
rl.on('line', (line) => {
if (aborted) return;
this.processLoggedLine(line, results, mode, logFileName);
if (results.loggedMessages.length > maxMessagesPerParse) {
if (
mode !== 'all' &&
results.loggedMessages.length - baselineCount > maxMessagesPerParse
) {
aborted = true;
this.logger.warn(
`Event log ${logFileName} exceeded ${maxMessagesPerParse} in-memory messages during parse; aborting to prevent out-of-memory. Some unfinished execution recovery may be skipped. Tune via N8N_EVENTBUS_LOGWRITER_MAXMESSAGESPERPARSE.`,
Expand Down
Loading