diff --git a/packages/cli/src/eventbus/message-event-bus-writer/__tests__/message-event-bus-log-writer.test.ts b/packages/cli/src/eventbus/message-event-bus-writer/__tests__/message-event-bus-log-writer.test.ts index 660a42c71b1ba..ad6263f71a4d7 100644 --- a/packages/cli/src/eventbus/message-event-bus-writer/__tests__/message-event-bus-log-writer.test.ts +++ b/packages/cli/src/eventbus/message-event-bus-writer/__tests__/message-event-bus-log-writer.test.ts @@ -86,6 +86,62 @@ describe('MessageEventBusLogWriter.readLoggedMessagesFromFile', () => { ); }); + it('uses per-file count so prior file accumulation does not abort the next file', async () => { + const maxMessagesPerParse = 5; + setMaxMessagesPerParse(maxMessagesPerParse); + writer = new MessageEventBusLogWriter(); + + // File 1: 4 unconfirmed messages (below limit) + const lines1: string[] = []; + for (let i = 0; i < 4; i++) { + lines1.push(makeWorkflowStartedLine(`old-id-${i}`, `old-exec-${i}`)); + } + const logFile1 = writeLogFile('old.log', lines1); + + // File 2: 4 unconfirmed messages (below limit per-file, but 8 total) + const lines2: string[] = []; + for (let i = 0; i < 4; i++) { + lines2.push(makeWorkflowStartedLine(`new-id-${i}`, `new-exec-${i}`)); + } + const logFile2 = writeLogFile('new.log', lines2); + + const results = { + loggedMessages: [] as EventMessageTypes[], + sentMessages: [] as EventMessageTypes[], + unfinishedExecutions: {} as Record, + }; + + await writer.readLoggedMessagesFromFile(results, 'unsent', logFile1); + await writer.readLoggedMessagesFromFile(results, 'unsent', logFile2); + + // Both files should be fully parsed (8 total, each file under limit) + expect(results.loggedMessages).toHaveLength(8); + expect(logger.warn).not.toHaveBeenCalled(); + }); + + it('does not apply the guard in "all" mode since confirms do not prune', async () => { + const maxMessagesPerParse = 5; + setMaxMessagesPerParse(maxMessagesPerParse); + writer = new MessageEventBusLogWriter(); + + const lines: string[] = []; + for (let i = 0; i < 20; i++) { + lines.push(makeWorkflowStartedLine(`id-${i}`, `exec-${i}`)); + } + const logFile = writeLogFile('all-mode.log', lines); + + const results = { + loggedMessages: [] as EventMessageTypes[], + sentMessages: [] as EventMessageTypes[], + unfinishedExecutions: {} as Record, + }; + + await writer.readLoggedMessagesFromFile(results, 'all', logFile); + + expect(results.loggedMessages).toHaveLength(20); + expect(logger.warn).not.toHaveBeenCalled(); + }); + it('does not abort when confirms prune the working set below the limit', async () => { const maxMessagesPerParse = 5; setMaxMessagesPerParse(maxMessagesPerParse); diff --git a/packages/cli/src/eventbus/message-event-bus-writer/message-event-bus-log-writer.ts b/packages/cli/src/eventbus/message-event-bus-writer/message-event-bus-log-writer.ts index 64a7e64bf8bfb..c75d9fd81bbdd 100644 --- a/packages/cli/src/eventbus/message-event-bus-writer/message-event-bus-log-writer.ts +++ b/packages/cli/src/eventbus/message-event-bus-writer/message-event-bus-log-writer.ts @@ -206,20 +206,27 @@ export class MessageEventBusLogWriter { if (logFileName && existsSync(logFileName)) { try { const stream = createReadStream(logFileName); + stream.on('error', () => {}); // absorb errors after destroy() const rl = readline.createInterface({ input: stream, crlfDelay: Infinity, }); - // Safety guard: abort if the in-memory working set grows too large. + // Safety guard: abort if the per-file working set grows too large. // Healthy files stay small because confirm lines prune loggedMessages as // they stream; legacy files with orphaned messages (pre-PR #27334) are // the pathological case this guards against. + // The guard is skipped in 'all' mode because confirms don't prune there, + // so the count would grow monotonically even for healthy files. const maxMessagesPerParse = this.globalConfig.eventBus.logWriter.maxMessagesPerParse; + const baselineCount = results.loggedMessages.length; let aborted = false; rl.on('line', (line) => { if (aborted) return; this.processLoggedLine(line, results, mode, logFileName); - if (results.loggedMessages.length > maxMessagesPerParse) { + if ( + mode !== 'all' && + results.loggedMessages.length - baselineCount > maxMessagesPerParse + ) { aborted = true; this.logger.warn( `Event log ${logFileName} exceeded ${maxMessagesPerParse} in-memory messages during parse; aborting to prevent out-of-memory. Some unfinished execution recovery may be skipped. Tune via N8N_EVENTBUS_LOGWRITER_MAXMESSAGESPERPARSE.`,