diff --git a/benchmarks/eventsource/eventsource-stream.mjs b/benchmarks/eventsource/eventsource-stream.mjs new file mode 100644 index 00000000000..7d0e5d80685 --- /dev/null +++ b/benchmarks/eventsource/eventsource-stream.mjs @@ -0,0 +1,81 @@ +import { bench, group, run } from 'mitata' +import { EventSourceStream } from '../../lib/web/eventsource/eventsource-stream.js' + +const EVENT_COUNT = 250 + +function buildEventStream (count) { + let content = '' + + for (let i = 0; i < count; i++) { + content += `id: ${i}\n` + content += `event: message-${i % 4}\n` + content += `retry: ${1000 + (i % 5)}\n` + content += `data: ${'x'.repeat(64)}-${i}\n` + content += `data: ${'y'.repeat(64)}-${i}\n\n` + } + + return Buffer.from(content, 'utf8') +} + +function splitBuffer (buffer, chunkSize) { + const chunks = [] + + for (let i = 0; i < buffer.length; i += chunkSize) { + chunks.push(buffer.subarray(i, Math.min(i + chunkSize, buffer.length))) + } + + return chunks +} + +function parseChunks (chunks, expectedEvents) { + let events = 0 + + const stream = new EventSourceStream({ + push () { + events++ + return true + } + }) + + for (let i = 0; i < chunks.length; i++) { + stream.write(chunks[i]) + } + + if (events !== expectedEvents) { + throw new Error(`Expected ${expectedEvents} events, got ${events}`) + } +} + +const payload = buildEventStream(EVENT_COUNT) +const payloadWithBOM = Buffer.concat([Buffer.from('\uFEFF', 'utf8'), payload]) + +const scenarios = { + 'single chunk': splitBuffer(payload, payload.length), + '256-byte chunks': splitBuffer(payload, 256), + '64-byte chunks': splitBuffer(payload, 64), + '8-byte chunks': splitBuffer(payload, 8), + '1-byte chunks': splitBuffer(payload, 1) +} + +const bomScenarios = { + 'BOM + 8-byte chunks': splitBuffer(payloadWithBOM, 8), + 'BOM + 1-byte chunks': splitBuffer(payloadWithBOM, 1) +} + +group(`EventSourceStream parsing (${EVENT_COUNT} events)`, () => { + for (const [name, chunks] of Object.entries(scenarios)) { + bench(name, () => { + parseChunks(chunks, EVENT_COUNT) + }) + } +}) + +group('EventSourceStream parsing with BOM', () => { + for (const [name, chunks] of Object.entries(bomScenarios)) { + bench(name, () => { + parseChunks(chunks, EVENT_COUNT) + }) + } +}) + +await run() diff --git a/lib/web/eventsource/eventsource-stream.js b/lib/web/eventsource/eventsource-stream.js index d24e8f6a1b1..7b9e2f8cbba 100644 --- a/lib/web/eventsource/eventsource-stream.js +++ b/lib/web/eventsource/eventsource-stream.js @@ -23,6 +23,49 @@ const COLON = 0x3A */ const SPACE = 0x20 +const DATA = Buffer.from('data') +const EVENT = Buffer.from('event') +const ID = Buffer.from('id') +const RETRY = Buffer.from('retry') + +function isASCIINumberBytes (buffer, start) { + if (start >= buffer.length) { + return false + } + + for (let i = start; i < buffer.length; i++) { + if (buffer[i] < 0x30 || buffer[i] > 0x39) { + return false + } + } + + return true +} + +function isValidLastEventIdBytes (buffer, start) { + for (let i = start; i < buffer.length; i++) { + if (buffer[i] === 0x00) { + return false + } + } + + return true +} + +function isFieldName (line, length, field) { + if (length !== field.length) { + return false + } + + for (let i = 0; i < length; i++) { + if (line[i] !== field[i]) { + return false + } + } + + return true +} + /** * @typedef {object} EventSourceStreamEvent * @type {object} @@ -63,11 +106,14 @@ class EventSourceStream extends Transform { eventEndCheck = false /** - * @type {Buffer|null} + * @type {Buffer[]} */ - buffer = null + chunks = [] + chunkIndex = 0 pos = 0 + lineChunkIndex = 0 + linePos = 0 event = { data: undefined, @@ -107,92 +153,20 @@ class EventSourceStream extends Transform { return } - // Cache the chunk in the buffer, as the data might not be complete while - // processing it - // TODO: Investigate if there is a more performant way to handle - // incoming chunks - // see: https://github.com/nodejs/undici/issues/2630 - if (this.buffer) { - this.buffer = Buffer.concat([this.buffer, chunk]) - } else { - this.buffer = chunk - } + this.chunks.push(chunk) // Strip leading byte-order-mark if we opened the stream and started // the processing of the incoming data if (this.checkBOM) { - switch (this.buffer.length) { - case 1: - // Check if the first byte is the same as the first byte of the BOM - if (this.buffer[0] === BOM[0]) { - // If it is, we need to wait for more data - callback() - return - } - // Set the checkBOM flag to false as we don't need to check for the - // BOM anymore - this.checkBOM = false - - // The buffer only contains one byte so we need to wait for more data - callback() - return - case 2: - // Check if the first two bytes are the same as the first two bytes - // of the BOM - if ( - this.buffer[0] === BOM[0] && - this.buffer[1] === BOM[1] - ) { - // If it is, we need to wait for more data, because the third byte - // is needed to determine if it is the BOM or not - callback() - return - } - - // Set the checkBOM flag to false as we don't need to check for the - // BOM anymore - this.checkBOM = false - break - case 3: - // Check if the first three bytes are the same as the first three - // bytes of the BOM - if ( - this.buffer[0] === BOM[0] && - this.buffer[1] === BOM[1] && - this.buffer[2] === BOM[2] - ) { - // If it is, we can drop the buffered data, as it is only the BOM - this.buffer = Buffer.alloc(0) - // Set the checkBOM flag to false as we don't need to check for the - // BOM anymore - this.checkBOM = false - - // Await more data - callback() - return - } - // If it is not the BOM, we can start processing the data - this.checkBOM = false - break - default: - // The buffer is longer than 3 bytes, so we can drop the BOM if it is - // present - if ( - this.buffer[0] === BOM[0] && - this.buffer[1] === BOM[1] && - this.buffer[2] === BOM[2] - ) { - // Remove the BOM from the buffer - this.buffer = this.buffer.subarray(3) - } - - // Set the checkBOM flag to false as we don't need to check for the - this.checkBOM = false - break + if (this.handleBOM()) { + callback() + return } } - while (this.pos < this.buffer.length) { + while (this.hasCurrentByte()) { + const byte = this.currentByte() + // If the previous line ended with an end-of-line, we need to check // if the next character is also an end-of-line. if (this.eventEndCheck) { @@ -205,10 +179,9 @@ class EventSourceStream extends Transform { if (this.crlfCheck) { // If the current character is a line feed, we can remove it // from the buffer and reset the crlfCheck flag - if (this.buffer[this.pos] === LF) { - this.buffer = this.buffer.subarray(this.pos + 1) - this.pos = 0 + if (byte === LF) { this.crlfCheck = false + this.consumeCurrentByte() // It is possible that the line feed is not the end of the // event. We need to check if the next character is an @@ -224,19 +197,17 @@ class EventSourceStream extends Transform { this.crlfCheck = false } - if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) { + if (byte === LF || byte === CR) { // If the current character is a carriage return, we need to // set the crlfCheck flag to true, as we need to check if the // next character is a line feed so we can remove it from the // buffer - if (this.buffer[this.pos] === CR) { + if (byte === CR) { this.crlfCheck = true } - this.buffer = this.buffer.subarray(this.pos + 1) - this.pos = 0 - if ( - this.event.data !== undefined || this.event.event || this.event.id !== undefined || this.event.retry) { + this.consumeCurrentByte() + if (this.hasPendingEvent()) { this.processEvent(this.event) } this.clearEvent() @@ -250,22 +221,18 @@ class EventSourceStream extends Transform { // If the current character is an end-of-line, we can process the // line - if (this.buffer[this.pos] === LF || this.buffer[this.pos] === CR) { + if (byte === LF || byte === CR) { // If the current character is a carriage return, we need to // set the crlfCheck flag to true, as we need to check if the // next character is a line feed - if (this.buffer[this.pos] === CR) { + if (byte === CR) { this.crlfCheck = true } // In any case, we can process the line as we reached an // end-of-line character - this.parseLine(this.buffer.subarray(0, this.pos), this.event) - - // Remove the processed line from the buffer - this.buffer = this.buffer.subarray(this.pos + 1) - // Reset the position as we removed the processed line from the buffer - this.pos = 0 + this.parseLine(this.readLine(), this.event) + this.consumeCurrentByte() // A line was processed and this could be the end of the event. We need // to check if the next line is empty to determine if the event is // finished. @@ -273,7 +240,7 @@ class EventSourceStream extends Transform { continue } - this.pos++ + this.advanceCursor() } callback() @@ -298,64 +265,53 @@ class EventSourceStream extends Transform { return } - let field = '' - let value = '' + let fieldLength = line.length + let valueStart = line.length // If the line contains a U+003A COLON character (:) if (colonPosition !== -1) { - // Collect the characters on the line before the first U+003A COLON - // character (:), and let field be that string. - // TODO: Investigate if there is a more performant way to extract the - // field - // see: https://github.com/nodejs/undici/issues/2630 - field = line.subarray(0, colonPosition).toString('utf8') + fieldLength = colonPosition // Collect the characters on the line after the first U+003A COLON // character (:), and let value be that string. // If value starts with a U+0020 SPACE character, remove it from value. - let valueStart = colonPosition + 1 + valueStart = colonPosition + 1 if (line[valueStart] === SPACE) { ++valueStart } - // TODO: Investigate if there is a more performant way to extract the - // value - // see: https://github.com/nodejs/undici/issues/2630 - value = line.subarray(valueStart).toString('utf8') - - // Otherwise, the string is not empty but does not contain a U+003A COLON - // character (:) - } else { - // Process the field using the steps described below, using the whole - // line as the field name, and the empty string as the field value. - field = line.toString('utf8') - value = '' } - // Modify the event with the field name and value. The value is also - // decoded as UTF-8 - switch (field) { - case 'data': - if (event[field] === undefined) { - event[field] = value - } else { - event[field] += `\n${value}` - } - break - case 'retry': - if (isASCIINumber(value)) { - event[field] = value - } - break - case 'id': - if (isValidLastEventId(value)) { - event[field] = value - } - break - case 'event': - if (value.length > 0) { - event[field] = value - } - break + if (isFieldName(line, fieldLength, DATA)) { + const value = line.toString('utf8', valueStart) + + if (event.data === undefined) { + event.data = value + } else { + event.data += `\n${value}` + } + return + } + + if (isFieldName(line, fieldLength, RETRY)) { + if (isASCIINumberBytes(line, valueStart)) { + event.retry = line.toString('utf8', valueStart) + } + return + } + + if (isFieldName(line, fieldLength, ID)) { + if (isValidLastEventIdBytes(line, valueStart)) { + event.id = line.toString('utf8', valueStart) + } + return + } + + if (isFieldName(line, fieldLength, EVENT)) { + const value = line.toString('utf8', valueStart) + + if (value.length > 0) { + event.event = value + } } } @@ -385,12 +341,151 @@ class EventSourceStream extends Transform { } clearEvent () { - this.event = { - data: undefined, - event: undefined, - id: undefined, - retry: undefined + this.event.data = undefined + this.event.event = undefined + this.event.id = undefined + this.event.retry = undefined + } + + hasPendingEvent () { + return this.event.data !== undefined || + this.event.event !== undefined || + this.event.id !== undefined || + this.event.retry !== undefined + } + + hasCurrentByte () { + return this.chunkIndex < this.chunks.length && + this.pos < this.chunks[this.chunkIndex].length + } + + currentByte () { + return this.chunks[this.chunkIndex][this.pos] + } + + consumeCurrentByte () { + this.advanceCursor() + this.syncLineStartToCursor() + } + + advanceCursor () { + this.pos++ + + while (this.chunkIndex < this.chunks.length && this.pos >= this.chunks[this.chunkIndex].length) { + this.chunkIndex++ + this.pos = 0 + } + } + + syncLineStartToCursor () { + this.lineChunkIndex = this.chunkIndex + this.linePos = this.pos + this.dropConsumedChunks() + } + + dropConsumedChunks () { + while (this.lineChunkIndex > 0) { + this.chunks.shift() + this.lineChunkIndex-- + this.chunkIndex-- + } + + if (this.chunkIndex === this.chunks.length) { + this.chunks.length = 0 + this.chunkIndex = 0 + this.pos = 0 + this.lineChunkIndex = 0 + this.linePos = 0 + } + } + + readLine () { + if (this.lineChunkIndex === this.chunkIndex) { + return this.chunks[this.chunkIndex].subarray(this.linePos, this.pos) + } + + const chunks = [] + let length = 0 + + for (let i = this.lineChunkIndex; i <= this.chunkIndex; i++) { + const chunk = this.chunks[i] + const start = i === this.lineChunkIndex ? this.linePos : 0 + const end = i === this.chunkIndex ? this.pos : chunk.length + const slice = chunk.subarray(start, end) + length += slice.length + chunks.push(slice) + } + + return Buffer.concat(chunks, length) + } + + peekBufferedByte (offset) { + let chunkIndex = this.lineChunkIndex + let pos = this.linePos + + while (chunkIndex < this.chunks.length) { + const chunk = this.chunks[chunkIndex] + const remaining = chunk.length - pos + + if (offset < remaining) { + return chunk[pos + offset] + } + + offset -= remaining + chunkIndex++ + pos = 0 + } + } + + discardLeadingBytes (count) { + while (count > 0 && this.lineChunkIndex < this.chunks.length) { + const chunk = this.chunks[this.lineChunkIndex] + const remaining = chunk.length - this.linePos + + if (count < remaining) { + this.linePos += count + count = 0 + } else { + count -= remaining + this.lineChunkIndex++ + this.linePos = 0 + } + } + + this.chunkIndex = this.lineChunkIndex + this.pos = this.linePos + this.dropConsumedChunks() + } + + handleBOM () { + const first = this.peekBufferedByte(0) + const second = this.peekBufferedByte(1) + const third = this.peekBufferedByte(2) + + if (second === undefined) { + if (first === BOM[0]) { + return true + } + + this.checkBOM = false + return true + } + + if (third === undefined) { + if (first === BOM[0] && second === BOM[1]) { + return true + } + + this.checkBOM = false + return false } + + if (first === BOM[0] && second === BOM[1] && third === BOM[2]) { + this.discardLeadingBytes(3) + } + + this.checkBOM = false + return !this.hasCurrentByte() } }