🔥 feat: Add SSE middleware#4225
Conversation
Add production-grade Server-Sent Events middleware built natively for Fiber's fasthttp architecture with proper client disconnect detection. Features: Hub-based broker, 3 priority lanes, NATS-style topic wildcards, adaptive throttling, connection groups, JWT/ticket auth, cache invalidation helpers, Prometheus metrics, Last-Event-ID replay, Redis/NATS fan-out, and graceful Kubernetes-style drain. 91% test coverage, golangci-lint clean, go test -race clean. Resolves gofiber#4194
Remove non-core files in response to maintainer feedback: - auth.go (JWT/Ticket auth — users handle via OnConnect) - metrics.go (Prometheus/JSON — use Fiber monitor middleware instead) - invalidation.go (helpers — just wrappers over hub.Publish) - domain_event.go (DomainEvent/Progress/Complete — also wrappers) - fanout.go (Redis/NATS bridge — userland concern) - MemoryReplayer impl (kept Replayer interface for pluggable backends) Core retained: Hub, topic routing, 3 priority lanes, NATS wildcards, connection groups, adaptive throttling, graceful drain, replayer interface. Coverage: 82.3%, lint clean, race-free.
- Restore fanout.go and all 7 FanOut tests (Test_SSE_FanOut, Test_SSE_FanOut_Cancel, Test_SSE_FanOutMulti, Test_SSE_FanOut_Transform, Test_SSE_FanOut_TransformNil, Test_SSE_FanOut_RetryOnError, Test_SSE_FanOut_BuildEvent_ConfigDefaults) - Clean stale package godoc (drop JWT/ticket auth and Prometheus mentions) - Document FanOut usage in docs/middleware/sse.md with Redis example Coverage: 83.9%, race-free, lint clean.
…gnment Breaking changes: - Remove Next field from Config: SSE is a terminal middleware and Next had undefined behavior (handler runs after ctx release). - Rename coalescer -> Dispatcher with clearer SSE semantics: AddEvent (FIFO event lane) + AddState (keyed state lane) + WriteTo. - Replace FanOut method API with Config.Bridges []BridgeConfig and SubscriberBridge interface. Bridges start automatically and stop when Hub.Shutdown is called (no more dangling CancelFunc). Zero-allocation hot path: - MarshaledEvent.WriteTo now builds the frame in a pooled bytebufferpool.ByteBuffer -- 0 allocs/op (was 381 B, 8 allocs). - nextEventID uses strconv.FormatUint instead of fmt.Sprintf. - writeComment / writeRetry share the pooled buffer path. Correctness: - Call c.Abandon() before SendStreamWriter so Fiber does not recycle the ctx while fasthttp is still running the stream writer. - Replayer errors are logged and continue (best-effort replay) instead of silently dropping. File organization: - sse.go (671 -> 154 lines) now only hosts the package doc, New(), NewWithHub(), and generateID(). - hub.go collects Hub struct + all Hub methods (run loop, routing, flush, heartbeats, shutdown, lifecycle watchers). - bridge.go holds SubscriberBridge interface + BridgeConfig + the bridge goroutine driver. Tests: - Add end-to-end tests driving the middleware over a real TCP listener and validating response headers + wire format (retry, connected, multi-line data, sanitized id/event against injection). - Rewrite FanOut tests as Bridge tests. - t.Parallel() on every test function. Coverage 90.7%, golangci-lint clean, go test -race clean. Addresses gofiber#4196 review feedback from @grivera64 and @gaby.
|
Thanks for opening this pull request! 🎉 Please check out our contributing guidelines. If you need help or want to chat with us, join us on Discord https://gofiber.io/discord |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a new Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Publisher as Publisher
participant Hub as Hub
participant Dispatcher as Dispatcher
participant Connection as Connection
participant Client as Client
Publisher->>Hub: Publish(Event)
Hub->>Hub: routeEvent(topic, group, TTL, priority)
alt PriorityInstant (P0)
Hub->>Connection: trySend(MarshaledEvent)
Connection->>Client: write SSE frame
else PriorityBatched (P1)
Hub->>Dispatcher: AddEvent(MarshaledEvent)
else PriorityCoalesced (P2)
Hub->>Dispatcher: AddState(CoalesceKey, MarshaledEvent)
end
Hub->>Dispatcher: flushAll() periodically
Dispatcher->>Connection: drain queued events
Connection->>Client: write SSE frames
sequenceDiagram
autonumber
participant ExternalPubSub as ExternalPubSub
participant Bridge as Bridge
participant Hub as Hub
participant Connection as Connection
participant Client as Client
ExternalPubSub->>Bridge: message(payload)
Bridge->>Hub: buildBridgeEvent(payload) → Event
Hub->>Hub: routeEvent(...)
Hub->>Connection: deliver per priority
Connection->>Client: SSE frames
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a native Server-Sent Events (SSE) middleware for Fiber v3, featuring a hub-based broker with topic routing, priority lanes, and adaptive throttling. The review identifies several critical issues, including a potential hang during shutdown where context deadlines are ignored, and a race condition in the connection setup that could result in lost events. Performance concerns were raised regarding linear scans for wildcard and group-based routing, as well as high GC pressure in the event delivery hot path. Furthermore, the shutdown sequence for control messages is currently unreliable, and the documentation incorrectly includes a removed configuration property.
|
|
||
| // matchWildcardConns adds wildcard-subscribed connections that match the event topics. | ||
| func (h *Hub) matchWildcardConns(event *Event, seen map[string]struct{}) { | ||
| for connID := range h.wildcardConns { |
There was a problem hiding this comment.
Iterating over all wildcard connections and performing string splits for every topic in every event is a significant performance bottleneck (O(N_wildcards * N_topics)). This will lead to high CPU usage and excessive allocations in the event routing hot path. Consider pre-splitting patterns during connection registration or using a more efficient matching structure like a trie to optimize wildcard routing.
References
- Hot paths in event routing should avoid O(N) scans and redundant allocations to maintain high throughput.
There was a problem hiding this comment.
Same as my above comment, I don't think we need to run these matches on each publish, but rather just subscribe connections to events on creation.
| if len(event.Group) == 0 { | ||
| return | ||
| } | ||
| for connID, conn := range h.connections { |
There was a problem hiding this comment.
Scanning all active connections for group-only events is an O(N) operation that will not scale well as the number of connections grows. If metadata-based filtering is intended for multi-tenant isolation at scale, consider indexing connections by metadata keys to allow for more efficient targeted delivery.
References
- Avoid linear scans over large collections in performance-critical paths.
| // When both Topics and Group are set, only connections matching BOTH are | ||
| // included (intersection semantics) to prevent tenant/topic leaks (CRITICAL-1). | ||
| func (h *Hub) matchConnections(event *Event) map[string]struct{} { | ||
| seen := make(map[string]struct{}) |
There was a problem hiding this comment.
Allocating a new map for every event routing operation in the hot path increases GC pressure. Consider using a sync.Pool to reuse these maps and reduce allocations during high-frequency event publishing.
References
- Use object pooling for frequently allocated temporary objects in hot paths to reduce GC overhead.
There was a problem hiding this comment.
Instead of querying matched connections, shouldn't we just subscribe each connection to their intended event? From my understanding, the events that a connection expects doesn't change during the lifetime of the connection. If so, doing this matchConnections check isn't necessary, and we can remove it.
There was a problem hiding this comment.
Actionable comments posted: 12
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/middleware/sse.md`:
- Around line 100-120: The docs example and config table still reference removed
APIs (PubSubSubscriber, FanOut, FanOutConfig, FanOut and Config.Next); update
them to the current bridge API by replacing those symbols with SubscriberBridge
and BridgeConfig, change the example to show creating/adding a SubscriberBridge
(using the Hub method that registers bridges in your codebase) and passing a
BridgeConfig-powered subscriber implementation, and update the config table to
remove the Next row and add a Bridges row that documents Config.Bridges. Ensure
the sample subscriber type (previously redisSubscriber) and the callback usage
are shown within the BridgeConfig/SubscriberBridge pattern so copy-pasting
matches the new API.
In `@docs/whats_new.md`:
- Around line 3143-3162: The SSE section (the paragraph starting with the "####
SSE" header) is misplaced under the migration guide's "### 🧬 Middlewares" area;
move that entire SSE block so it appears before the top-level "## 🔌 Addons"
header (or into the main top-level Middlewares section if one exists) so the TOC
matches content, then run make markdown to lint and fix formatting; look for the
"#### SSE" header and relocate the contiguous code fence and description until
the next header, ensuring surrounding headers remain intact.
In `@middleware/sse/bridge.go`:
- Around line 71-85: The loop currently resumes immediately when
cfg.Subscriber.Subscribe returns nil (err == nil) before ctx cancellation, which
can spin; change the logic around the Subscribe return handling so that an early
nil return is treated as retryable: if err != nil OR (err == nil && ctx.Err() ==
nil) then log a warning (use logBridgeError or an appropriate logger referencing
cfg.Channel) and perform the same select { case <-time.After(bridgeRetryDelay):
case <-ctx.Done(): return } as done for non-nil errors; keep the same references
to cfg.Subscriber.Subscribe, h.buildBridgeEvent, logBridgeError,
bridgeRetryDelay and ctx so the retry path is applied for both error and
unexpected-nil cases.
In `@middleware/sse/event.go`:
- Around line 140-146: The Data field and comment text are not being sanitized,
allowing CR characters to create injected SSE fields; in the loop that writes
data lines (referencing me.Data and strings.SplitSeq) apply
sanitizeSSEField(me.Data) or explicitly strip '\r' (and ensure '\n' remains the
splitter) before iterating so no CR can terminate the data line, and likewise
call sanitizeSSEField() on comment text in writeComment() (or strip CR/LF there)
so comments cannot inject id/event/retry fields; update the functions that write
data lines and writeComment to use the sanitized value instead of raw me.Data or
raw comment text.
In `@middleware/sse/hub.go`:
- Around line 240-259: watchShutdown races sending the "server-shutdown" event
with run's immediate connection closes so writeLoop may exit before the event is
flushed; change the shutdown handling so either run stops closing connections
directly on h.shutdown and defers to watchShutdown (which sends the
MarshaledEvent via conn.trySend, sleeps shutdownDrainDelay, then calls
conn.Close), or move the shutdown notification + time.Sleep(shutdownDrainDelay)
into the same shutdown path used by run; update references: Hub.watchShutdown,
Hub.run, Connection.trySend, Connection.Close, conn.done, writeLoop, and
shutdownDrainDelay to ensure only one owner closes the connection after the
drain delay so the shutdown event can be written.
- Around line 117-135: Shutdown currently calls h.bridges.Wait() directly which
can block forever and ignore ctx; change Hub.Shutdown to wait for bridges with a
cancellable select: after initiating shutdown (setting h.draining, calling
h.bridgeCancel, closing h.shutdown, etc.) spawn a goroutine that calls
h.bridges.Wait() and closes a local "bridgesDone" channel when complete, then
replace the direct h.bridges.Wait() with a select that waits on either
bridgesDone or ctx.Done(), returning ctx error if the context expires; keep
existing behavior for closing h.stopped and honoring h.shutdownOnce.
- Around line 80-93: The Publish method can still enqueue events after shutdown
closes; update Publish (method Publish on Hub) to first check the hub's draining
state (h.draining) and refuse new publishes when draining is true: if draining,
increment h.metrics.eventsDropped and return immediately. Ensure the draining
check is done atomically or under the same mutex used to set draining so it
cannot race with shutdown; only proceed to the select that writes to h.events
when draining is false. Keep the existing handling of h.shutdown, h.events,
h.metrics.eventsPublished and h.metrics.eventsDropped but prevent any send to
h.events if h.draining is set.
- Around line 370-374: The Replayer.Store call currently swallows errors; change
the call in the h.cfg.Replayer != nil && len(event.Group) == 0 block to capture
the returned error from h.cfg.Replayer.Store(me, event.Topics), and if non-nil
log it (using the hub's logger, e.g. h.log or h.logger) with a clear message
like "replayer store failed" and relevant context (me, event.Topics) but
otherwise continue so replay remains best-effort; keep the behavior of not
returning or failing on error.
In `@middleware/sse/sse_test.go`:
- Line 1673: The test contains bare receives on the bridge subscription channel
(e.g., the "subscribed" receive at the shown diff) that can hang; replace each
bare "<-subscribed" with a select using a timeout (time.After) following the
same pattern used in Test_SSE_Bridge_Publishes so the test fails fast if
subscription startup regresses, and for places expecting multiple messages (the
other occurrences at the noted ranges) wrap receives in a small loop that uses
the same select+timeout pattern to await each expected delivery.
In `@middleware/sse/sse.go`:
- Around line 126-136: The replay/register race causes events published between
Replay start and hub.register <- conn to be lost; make the handoff atomic by
moving registration into the replay path or by introducing an atomic
register-and-replay action: modify initStream (or add a new
hub.RegisterAndReplay(conn, lastEventID)) so it registers the connection with
the hub before taking the replay snapshot or, alternatively, have hub.register
accept a struct {conn, lastEventID, replaying} and handle replay atomically
inside the hub loop while buffering live events and de-duping by event ID;
ensure the hub.shutdown handling remains intact (select on shutdown) and that
de-dup logic uses the same lastEventID comparison used by Replay.
- Around line 81-84: The middleware currently returns cfg.OnConnect errors to
clients via c.Status(fiber.StatusForbidden).SendString(err.Error()), which can
leak sensitive auth/tenant details; change this to log the error (using the
existing logger or return it to a higher layer) and respond with a fixed generic
403 body (e.g., "forbidden" or no body) instead. Locate the OnConnect handling
block referencing cfg.OnConnect and
c.Status(fiber.StatusForbidden).SendString(err.Error()), replace the
SendString(err.Error()) call with a generic response, and ensure the original
err is logged (not returned) so diagnostic detail stays out of client responses.
In `@middleware/sse/throttle.go`:
- Around line 19-27: newAdaptiveThrottler constructs minInterval and maxInterval
from baseInterval but doesn't ensure minInterval <= baseInterval <= maxInterval,
which can invert throttling for extreme FlushInterval values; in
newAdaptiveThrottler (and the adaptiveThrottler fields minInterval, maxInterval,
baseInterval) clamp minInterval and maxInterval around baseInterval after
computing them so that if minInterval > baseInterval you set minInterval =
baseInterval, and if maxInterval < baseInterval you set maxInterval =
baseInterval, preserving the invariant min <= base <= max.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 4aab376b-31c8-4fc7-855e-5db406134f65
📒 Files selected for processing (15)
docs/middleware/sse.mddocs/whats_new.mdmiddleware/sse/bridge.gomiddleware/sse/config.gomiddleware/sse/connection.gomiddleware/sse/dispatcher.gomiddleware/sse/event.gomiddleware/sse/example_test.gomiddleware/sse/hub.gomiddleware/sse/replayer.gomiddleware/sse/sse.gomiddleware/sse/sse_test.gomiddleware/sse/stats.gomiddleware/sse/throttle.gomiddleware/sse/topic.go
| #### SSE | ||
|
|
||
| The new SSE middleware provides Server-Sent Events for Fiber, built natively on the fasthttp `SendStreamWriter` API. It includes a Hub-based broker with topic routing, three priority lanes (instant/batched/coalesced), NATS-style topic wildcards, connection groups for metadata-based filtering, adaptive throttling, graceful drain, and pluggable Last-Event-ID replay. Fully compatible with the standard SSE wire format and any `EventSource`-style client. | ||
|
|
||
| ```go | ||
| handler, hub := sse.NewWithHub(sse.Config{ | ||
| OnConnect: func(c fiber.Ctx, conn *sse.Connection) error { | ||
| conn.Topics = []string{"notifications"} | ||
| return nil | ||
| }, | ||
| }) | ||
| app.Get("/events", handler) | ||
|
|
||
| // Publish from any handler or worker | ||
| hub.Publish(sse.Event{ | ||
| Type: "update", | ||
| Data: "hello", | ||
| Topics: []string{"notifications"}, | ||
| }) | ||
| ``` |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check file size first
wc -l docs/whats_new.mdRepository: gofiber/fiber
Length of output: 79
🏁 Script executed:
# Read the section in question (lines 3140-3165)
sed -n '3140,3165p' docs/whats_new.mdRepository: gofiber/fiber
Length of output: 1000
🏁 Script executed:
# Check what section the SSE content is under by examining lines before 3143
sed -n '3100,3145p' docs/whats_new.mdRepository: gofiber/fiber
Length of output: 1793
🏁 Script executed:
# Find the migration guide section markers
rg -n "^## .*Migration|^### .*Middlewares|^## 🔌 Addons" docs/whats_new.md | head -20Repository: gofiber/fiber
Length of output: 125
🏁 Script executed:
# Check the TOC around line 60
sed -n '50,70p' docs/whats_new.mdRepository: gofiber/fiber
Length of output: 695
🏁 Script executed:
# Verify if make markdown target exists
test -f Makefile && grep -n "markdown" MakefileRepository: gofiber/fiber
Length of output: 284
Move the SSE section to the main content area before the Addons section.
The SSE middleware is described as a new feature, not a migration guide, yet it is currently nested under the migration guide's ### 🧬 Middlewares section (line 2794). However, the main TOC lists it as part of the top-level Middlewares section. This mismatch will cause incorrect navigation. Move the SSE section before ## 🔌 Addons (line 1695) to align with the TOC structure, or relocate it to the main Middlewares section if one exists before the Addons section. After updating, run make markdown to lint the file per coding guidelines.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/whats_new.md` around lines 3143 - 3162, The SSE section (the paragraph
starting with the "#### SSE" header) is misplaced under the migration guide's
"### 🧬 Middlewares" area; move that entire SSE block so it appears before the
top-level "## 🔌 Addons" header (or into the main top-level Middlewares section
if one exists) so the TOC matches content, then run make markdown to lint and
fix formatting; look for the "#### SSE" header and relocate the contiguous code
fence and description until the next header, ensuring surrounding headers remain
intact.
| err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) { | ||
| if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil { | ||
| h.Publish(*event) | ||
| } | ||
| }) | ||
|
|
||
| if err != nil && ctx.Err() == nil { | ||
| logBridgeError(cfg.Channel, err) | ||
| select { | ||
| case <-time.After(bridgeRetryDelay): | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Throttle unexpected nil returns from Subscribe.
If a subscriber returns nil before ctx is canceled, this loop immediately resubscribes and can spin hot on closed streams. Treat any early return as retryable and apply the same delay.
Proposed fix
err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil {
h.Publish(*event)
}
})
- if err != nil && ctx.Err() == nil {
- logBridgeError(cfg.Channel, err)
+ if ctx.Err() == nil {
+ if err != nil {
+ logBridgeError(cfg.Channel, err)
+ } else {
+ logBridgeError(cfg.Channel, context.Canceled)
+ }
select {
case <-time.After(bridgeRetryDelay):
case <-ctx.Done():
return
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) { | |
| if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil { | |
| h.Publish(*event) | |
| } | |
| }) | |
| if err != nil && ctx.Err() == nil { | |
| logBridgeError(cfg.Channel, err) | |
| select { | |
| case <-time.After(bridgeRetryDelay): | |
| case <-ctx.Done(): | |
| return | |
| } | |
| } | |
| } | |
| err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) { | |
| if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil { | |
| h.Publish(*event) | |
| } | |
| }) | |
| if ctx.Err() == nil { | |
| if err != nil { | |
| logBridgeError(cfg.Channel, err) | |
| } else { | |
| logBridgeError(cfg.Channel, context.Canceled) | |
| } | |
| select { | |
| case <-time.After(bridgeRetryDelay): | |
| case <-ctx.Done(): | |
| return | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@middleware/sse/bridge.go` around lines 71 - 85, The loop currently resumes
immediately when cfg.Subscriber.Subscribe returns nil (err == nil) before ctx
cancellation, which can spin; change the logic around the Subscribe return
handling so that an early nil return is treated as retryable: if err != nil OR
(err == nil && ctx.Err() == nil) then log a warning (use logBridgeError or an
appropriate logger referencing cfg.Channel) and perform the same select { case
<-time.After(bridgeRetryDelay): case <-ctx.Done(): return } as done for non-nil
errors; keep the same references to cfg.Subscriber.Subscribe,
h.buildBridgeEvent, logBridgeError, bridgeRetryDelay and ctx so the retry path
is applied for both error and unexpected-nil cases.
| // strings.SplitSeq("", "\n") yields "", correctly writing "data: \n" | ||
| // for empty data. | ||
| for line := range strings.SplitSeq(me.Data, "\n") { | ||
| buf.WriteString("data: ") | ||
| buf.WriteString(line) | ||
| buf.WriteByte('\n') | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Are carriage returns valid line terminators in the HTML Server-Sent Events event stream parsing algorithm?
💡 Result:
Yes, carriage returns (CR, U+000D) are valid line terminators in the HTML Server-Sent Events (SSE) event stream parsing algorithm.
Citations:
- 1: https://html.spec.whatwg.org/C/server-sent-events.html
- 2: https://html.spec.whatwg.org/multipage/server-sent-events.html
- 3: https://html.spec.whatwg.org/server-sent-events.html
- 4: https://html.spec.whatwg.org/dev/server-sent-events.html
- 5: https://www.w3.org/TR/2009/WD-eventsource-20091029/
🏁 Script executed:
# First, let's locate and examine the event.go file
find . -name "event.go" -path "*/middleware/sse/*" | head -5Repository: gofiber/fiber
Length of output: 82
🏁 Script executed:
# Read the file to understand the implementation
cat -n middleware/sse/event.go | head -180Repository: gofiber/fiber
Length of output: 6198
🏁 Script executed:
# Check if sanitizeSSEField exists and what it does
rg -A 10 "func sanitizeSSEField" middleware/sse/Repository: gofiber/fiber
Length of output: 683
Strip CR characters from me.Data and sanitize comment text to prevent SSE injection.
Lines 140-146 use strings.SplitSeq(me.Data, "\n") which only splits on LF, leaving bare CR characters intact. Since SSE parsers (per WHATWG spec) treat CR as a line terminator, a payload like "hello\rid: injected" will be written as "data: hello\r" (broken into two lines), allowing an attacker to inject fake id:, event:, or retry: fields.
The same issue affects writeComment() at lines 160-162, which writes raw text without sanitization. Unlike ID and Type (which are protected by sanitizeSSEField()), the Data field bypasses this protection entirely.
Apply sanitizeSSEField() to Data before splitting, or strip both CR and LF during the line iteration as shown in the proposed fix. Also sanitize comment text.
🛡️ Proposed framing fix
+func writeDataLines(buf *bytebufferpool.ByteBuffer, data string) {
+ start := 0
+ for i := 0; i < len(data); i++ {
+ if data[i] != '\n' && data[i] != '\r' {
+ continue
+ }
+ buf.WriteString("data: ")
+ buf.WriteString(data[start:i])
+ buf.WriteByte('\n')
+ if data[i] == '\r' && i+1 < len(data) && data[i+1] == '\n' {
+ i++
+ }
+ start = i + 1
+ }
+ buf.WriteString("data: ")
+ buf.WriteString(data[start:])
+ buf.WriteByte('\n')
+}
+
// WriteTo writes the SSE-formatted event to w following the Server-Sent
// Events specification. It assembles the frame in a pooled buffer so the
// hot path performs a single Write syscall with zero fmt allocations.
@@
- // strings.SplitSeq("", "\n") yields "", correctly writing "data: \n"
- // for empty data.
- for line := range strings.SplitSeq(me.Data, "\n") {
- buf.WriteString("data: ")
- buf.WriteString(line)
- buf.WriteByte('\n')
- }
+ writeDataLines(buf, me.Data)
buf.WriteByte('\n')
@@
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
buf.WriteString(": ")
- buf.WriteString(text)
+ buf.WriteString(sanitizeSSEField(text))
buf.WriteString("\n\n")Also applies to: 160-162
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@middleware/sse/event.go` around lines 140 - 146, The Data field and comment
text are not being sanitized, allowing CR characters to create injected SSE
fields; in the loop that writes data lines (referencing me.Data and
strings.SplitSeq) apply sanitizeSSEField(me.Data) or explicitly strip '\r' (and
ensure '\n' remains the splitter) before iterating so no CR can terminate the
data line, and likewise call sanitizeSSEField() on comment text in
writeComment() (or strip CR/LF there) so comments cannot inject id/event/retry
fields; update the functions that write data lines and writeComment to use the
sanitized value instead of raw me.Data or raw comment text.
Bot review (gemini-code-assist, coderabbitai) surfaced 2 critical and several major issues. All addressed without introducing new API surface. Critical - Close the replay/live delivery gap: register the connection BEFORE writing the preamble and replay, so live events buffer in conn.send instead of being missed during the preamble window. Monotonic event IDs guarantee no duplicates with a strictly-after replayer. - Normalise CR and CRLF to LF before splitting the data field: the HTML SSE spec treats all three as line terminators, so caller data containing "\r" or "\r\n" could otherwise produce malformed frames. Major - Shutdown now honors ctx while waiting for bridges: wedged bridges no longer hang Shutdown past its deadline. - Publish rejects early while draining so a racing Shutdown can't inflate EventsPublished with events the run loop will never dispatch. - Shutdown event ordering: run loop now broadcasts server-shutdown to all conns, sleeps drainDelay, THEN closes, replacing the per-conn watchShutdown goroutines whose Close() could beat the flush. - Bridge loop applies the retry backoff to any early return (not only errors) so a misbehaving Subscribe that returns nil immediately cannot spin hot. - OnConnect errors no longer leak to clients: the middleware now returns a generic 403 body, keeping tenant / user identifiers out of the unauthenticated response. - adaptiveThrottler clamps min and max around baseInterval so extreme configs don't invert the throttling policy. Minor - Bridge tests wrap bare channel receives in select+timeout so a subscription regression fails fast instead of hanging the test run. - Replayer.Store errors are logged (was silent); replay stays best-effort. - docs/middleware/sse.md: drop the `Next` row (SSE is terminal), replace the `FanOut` / `PubSubSubscriber` example with `Config.Bridges` + `SubscriberBridge`, add a Bridges row to the config table. Verification: go build / go vet / go test -race / golangci-lint — all clean on middleware/sse.
|
Thanks for the bot reviews — worked through the critical and major findings. New commit: vinod-morya/fiber@0c1aeb4 Critical
Major
Minor
Out of scope for this pass (happy to file follow-ups if you'd like them in this PR):
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@middleware/sse/event.go`:
- Around line 141-145: Change the Retry emission guard so MarshaledEvent.Retry
only emits when > 0 (not >= 0) to avoid emitting "retry: 0" from
default-constructed MarshaledEvent; update the MarshaledEvent public doc comment
(remove the "-1 means omit" sentinel wording and describe omission by
zero/unset) and adjust marshalEvent usage if needed to keep behavior, and
tighten Test_SSE_MarshaledEvent_WriteTo (and any Replayer implementations tests)
to assert that retry is absent when Retry is not explicitly set.
In `@middleware/sse/sse_test.go`:
- Around line 439-458: The test Test_SSE_MarshaledEvent_WriteTo is passing
silently because a zero-value Retry (0) produces a "retry: 0" line; update the
test to explicitly assert that the output does not include any retry line by
adding a require.NotContains(t, output, "retry:") (or require.NotContains for
"retry: ") after producing output; locate the test function
Test_SSE_MarshaledEvent_WriteTo and the MarshaledEvent/WriteTo interaction to
ensure the expectation matches the intended sentinel behavior for Retry.
In `@middleware/sse/sse.go`:
- Around line 80-90: The code currently discards the original error from
cfg.OnConnect; replace the silent return of fiber.NewError(...) with returning
the original error wrapped so the app-level Fiber ErrorHandler can log
diagnostics — e.g. on err from cfg.OnConnect(c, conn) return fmt.Errorf("sse:
OnConnect failed: %w", err) instead of fiber.NewError(...). Update the error
handling in the block that calls cfg.OnConnect (symbols: cfg.OnConnect, c, conn,
err) to wrap and return the original error; alternatively, if you prefer to keep
a sanitized client message, log err via the app logger before returning the
generic fiber.NewError.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: c0bc58d7-e1c5-454e-8e66-6b049b10c015
📒 Files selected for processing (7)
docs/middleware/sse.mdmiddleware/sse/bridge.gomiddleware/sse/event.gomiddleware/sse/hub.gomiddleware/sse/sse.gomiddleware/sse/sse_test.gomiddleware/sse/throttle.go
✅ Files skipped from review due to trivial changes (2)
- docs/middleware/sse.md
- middleware/sse/hub.go
🚧 Files skipped from review as they are similar to previous changes (1)
- middleware/sse/bridge.go
- MarshaledEvent.WriteTo now requires a strictly positive Retry before emitting the `retry:` directive. Per the SSE spec `retry: 0` tells clients to reconnect immediately, so the zero value of the int field (which external Replayer implementations may leave unset) could trigger a reconnect storm during replay. The field doc now states explicitly that non-positive values are omitted. - Add Test_SSE_MarshaledEvent_WriteTo_RetryZeroOmitted to pin the new behaviour, plus a NotContains assertion on the existing happy-path test so a regression that re-emits `retry: 0` fails loudly. - OnConnect: the rejected-connection path now logs the original error via Fiber's log package (matching the replayer-error logging already used elsewhere) before returning a generic 403 body. Operators retain the diagnostic signal (auth-fail vs rate-limit vs tenant-mismatch) while the unauthenticated client still sees only "forbidden". Docstring updated to match actual behaviour.
|
Addressed the follow-up findings: vinod-morya/fiber@ea972cf
|
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
middleware/sse/event.go (1)
178-184:⚠️ Potential issue | 🟠 MajorSanitize comment text before writing the SSE frame.
textcan contain CR/LF and break out of the comment line intoid:,event:,retry:, ordata:fields. This was previously flagged and remains unresolved forwriteComment.🛡️ Proposed fix
buf.WriteString(": ") - buf.WriteString(text) + buf.WriteString(sanitizeSSEField(text)) buf.WriteString("\n\n")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/event.go` around lines 178 - 184, The writeComment function is vulnerable because text may contain CR/LF and can break out of the SSE comment into other fields; sanitize text before writing by removing or normalizing newline characters (e.g., replace all '\r' and '\n' with a single space and collapse consecutive whitespace) so the output written by writeComment always produces a single comment line starting with ": " and ending with "\n\n". Locate writeComment and apply the sanitization to the text variable before writing to the buffer (preserve use of bytebufferpool and existing buffer writes).
🧹 Nitpick comments (2)
middleware/sse/sse.go (2)
87-92: OnConnect error handling looks good.Logging the real error server-side while returning a generic 403 to clients resolves the info-leak concern from the prior review and gives operators the diagnostic signal they need.
One small consideration:
log.Warnfwill fire on every rejected connection, which can become noisy (and potentially a log-amplification vector) under an auth-spam/scan flood. If your deployments typically see high-volume unauthenticated probes, considerlog.Debugfhere — or leave it to the application via a configurable hook. Non-blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/sse.go` around lines 87 - 92, The current OnConnect rejection logs using log.Warnf inside the OnConnect error branch (cfg.OnConnect and the block that returns fiber.NewError) which may be too noisy under auth-spam; change the logging here to a lower-severity or configurable option — e.g., replace log.Warnf with log.Debugf (or call a configurable logger/hook provided via cfg) so rejected connections are recorded at debug level by default while still returning the generic 403 via fiber.NewError; ensure the change is made in the cfg.OnConnect error handling block where log.Warnf is called.
94-102: Minor: topics check runs after OnConnect; consider ordering & nil-metadata handling.Two small observations on this block:
- The "no topics subscribed" 400 is returned after
OnConnecthas already executed (and potentially done work like DB lookups for auth). That's the only way topics can be set, so the ordering is necessary — but consider whether returning 400 here is the right signal to clients vs. 403/204, since from the client's perspective a successful auth still ends in an error. Not a blocker, just worth a doc note.len(conn.Metadata)andmaps.Copy(frozen, conn.Metadata)both tolerate a nil source map, so this is safe. After line 98,conn.Metadatabecomes a non-nil (possibly empty) map, which is a subtle behavior change vs. whatevernewConnectionreturned. Fine, just flagging in case downstream code distinguishes nil from empty.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/sse.go` around lines 94 - 102, Reorder and adjust handling: if topics can be determined before auth, validate conn.Topics before calling OnConnect; otherwise, if topics are only set inside OnConnect, change the error returned when topics are empty from 400 to 403 to reflect successful auth but no subscriptions (return c.Status(fiber.StatusForbidden)...), and preserve nil-vs-empty metadata by only creating/assigning the frozen map when conn.Metadata != nil (i.e., wrap the make/maps.Copy/conn.Metadata = frozen block in a nil check for conn.Metadata) so newConnection’s nil semantics aren’t lost.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@middleware/sse/event.go`:
- Around line 55-57: MarshaledEvent's exported fields (notably ID and Type) must
be sanitized at the write boundary to prevent CR/LF injection from external
Replayer implementations; update the WriteTo method (and any other final write
path that emits SSE, e.g., where marshalEvent output is written) to
validate/escape or strip CR and LF characters from MarshaledEvent.ID and
MarshaledEvent.Type before emitting any SSE lines, ensuring all SSE field writes
use the cleaned values; keep marshalEvent unchanged but treat WriteTo as the
canonical safety boundary for sanitization.
- Around line 108-123: Remove the special-case branch that handles
json.Marshaler (the switch case that calls v.MarshalJSON()) in
middleware/sse/event.go so that all values flow through the default json.Marshal
path (which safely handles typed-nil implementations); update the logic that
sets me.Data to rely only on json.Marshal's result and error handling; and add a
regression test that constructs a typed-nil pointer type implementing
json.Marshaler, sends it through the SSE encoding path that uses me.Data, and
asserts it does not panic and produces a valid SSE frame (or an error JSON)
instead.
---
Duplicate comments:
In `@middleware/sse/event.go`:
- Around line 178-184: The writeComment function is vulnerable because text may
contain CR/LF and can break out of the SSE comment into other fields; sanitize
text before writing by removing or normalizing newline characters (e.g., replace
all '\r' and '\n' with a single space and collapse consecutive whitespace) so
the output written by writeComment always produces a single comment line
starting with ": " and ending with "\n\n". Locate writeComment and apply the
sanitization to the text variable before writing to the buffer (preserve use of
bytebufferpool and existing buffer writes).
---
Nitpick comments:
In `@middleware/sse/sse.go`:
- Around line 87-92: The current OnConnect rejection logs using log.Warnf inside
the OnConnect error branch (cfg.OnConnect and the block that returns
fiber.NewError) which may be too noisy under auth-spam; change the logging here
to a lower-severity or configurable option — e.g., replace log.Warnf with
log.Debugf (or call a configurable logger/hook provided via cfg) so rejected
connections are recorded at debug level by default while still returning the
generic 403 via fiber.NewError; ensure the change is made in the cfg.OnConnect
error handling block where log.Warnf is called.
- Around line 94-102: Reorder and adjust handling: if topics can be determined
before auth, validate conn.Topics before calling OnConnect; otherwise, if topics
are only set inside OnConnect, change the error returned when topics are empty
from 400 to 403 to reflect successful auth but no subscriptions (return
c.Status(fiber.StatusForbidden)...), and preserve nil-vs-empty metadata by only
creating/assigning the frozen map when conn.Metadata != nil (i.e., wrap the
make/maps.Copy/conn.Metadata = frozen block in a nil check for conn.Metadata) so
newConnection’s nil semantics aren’t lost.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 2bab182b-4d07-4bf9-9eaf-81276f661158
📒 Files selected for processing (3)
middleware/sse/event.gomiddleware/sse/sse.gomiddleware/sse/sse_test.go
✅ Files skipped from review due to trivial changes (1)
- middleware/sse/sse_test.go
Addresses the third-pass bot findings plus additional classes of bugs found by walking the surface from an attacker / misbehaving-caller angle. Reported - event.go: WriteTo now applies sanitizeSSEField to ID and Type at the write boundary, not only inside marshalEvent. External Replayer implementations constructing MarshaledEvent directly can no longer inject additional SSE fields via embedded \r/\n. Defense in depth — WriteTo is the last line between an event and the client. - event.go: removed the explicit `case json.Marshaler:` branch. A typed-nil pointer whose pointer-type implements MarshalJSON matches the interface and the manual call panicked when the method dereferenced the receiver. json.Marshal in the default branch is nil-safe (emits "null"), so the special case wasn't buying anything anyway. Adversarial sweep - bridge.go: wrap the user-supplied Transform invocation in a recover. A panic in Transform previously propagated into the Subscriber callback and (depending on implementation) tore down the bridge goroutine, leaking h.bridges.Done() and hanging Shutdown forever. - hub.go: validate ALL BridgeConfigs (nil Subscriber, empty Channel) before starting any goroutine. The previous ordering panicked mid- loop, leaving earlier bridges' goroutines running with no owner to cancel them. - event.go: writeRetry now returns a no-op when ms <= 0, matching the MarshaledEvent.WriteTo semantics. A 0 or negative retry hint would otherwise tell clients to reconnect immediately. Tests - Test_SSE_MarshaledEvent_WriteTo_SanitizesInjectionAtBoundary asserts exactly one id line, one event line, and one frame terminator even when ID/Type contain injection attempts. - Test_SSE_MarshaledEvent_WriteTo_TypedNilJSONMarshaler constructs a typed-nil pointer with a dereferencing MarshalJSON and asserts it produces `data: null` without panicking. Verification: go build / go vet / go test -race / golangci-lint — clean.
|
Addressed the third-pass findings plus a short adversarial sweep on related paths: vinod-morya/fiber@9fc8e2a Reported
Adversarial sweep (same class of issues, caught while touching the files)
Tests
|
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
middleware/sse/event.go (1)
181-187:⚠️ Potential issue | 🟡 MinorSanitize comment text before framing.
writeCommentstill writes rawtext; embedded CR/LF can break out of the comment line if this helper is ever reused with dynamic text. Keep the write boundary defensive likeWriteTo.🛡️ Proposed hardening
func writeComment(w io.Writer, text string) error { buf := bytebufferpool.Get() defer bytebufferpool.Put(buf) buf.WriteString(": ") - buf.WriteString(text) + buf.WriteString(sanitizeSSEField(text)) buf.WriteString("\n\n") if _, err := w.Write(buf.B); err != nil {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/event.go` around lines 181 - 187, The writeComment helper currently writes raw text which allows embedded CR or LF to break the SSE framing; update writeComment to sanitize the input text (e.g., replace any '\r' and '\n' characters with a safe placeholder like a single space or remove them) before writing into the bytebufferpool buffer so the produced ": " comment line cannot be split into new SSE lines—mirror the defensive treatment used by WriteTo and keep references to writeComment and bytebufferpool in your change.middleware/sse/hub.go (1)
88-104:⚠️ Potential issue | 🟠 MajorSynchronize
Publishwith shutdown state.The draining check can still race:
Publishmay readdraining=false,Shutdownthen closesh.shutdown, and the finalselectmay choose the ready buffered send over the shutdown case. That still allows post-drain enqueue andEventsPublishedinflation.🔒 Proposed direction
func (h *Hub) Publish(event Event) { //nolint:gocritic // hugeParam: public API, value semantics preferred + if event.TTL > 0 && event.CreatedAt.IsZero() { + event.CreatedAt = time.Now() + } + + h.mu.RLock() + defer h.mu.RUnlock() + // Reject early if the hub is draining. Without this, a concurrent // Shutdown() can race with Publish() and enqueue an event the run // loop will never dispatch — inflating EventsPublished and leaving // the caller under the false impression the event was delivered. if h.draining.Load() { h.metrics.eventsDropped.Add(1) return } - if event.TTL > 0 && event.CreatedAt.IsZero() { - event.CreatedAt = time.Now() - } select { case h.events <- event: h.metrics.eventsPublished.Add(1) @@ func (h *Hub) Shutdown(ctx context.Context) error { - h.draining.Store(true) h.shutdownOnce.Do(func() { + h.mu.Lock() + h.draining.Store(true) if h.bridgeCancel != nil { h.bridgeCancel() } close(h.shutdown) + h.mu.Unlock() })Also applies to: 134-141
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@middleware/sse/hub.go` around lines 88 - 104, The current Publish method can still race with Shutdown because a buffered send can win the select even after draining was set; fix by checking h.shutdown non-blockingly immediately before attempting to send and by performing the send with a select that includes <-h.shutdown so shutdown is honored if it becomes ready between checks: in Hub.Publish, replace the plain send path by first doing a non-blocking select { case <-h.shutdown: /*drop*/; default: } and then attempt the send with select { case h.events <- event: h.metrics.eventsPublished.Add(1); case <-h.shutdown: /*drop*/ } (also keep the initial h.draining.Load() early-return and metrics update on drop), and apply the same pattern to the other Publish variant around lines 134-141 to eliminate the race.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@middleware/sse/hub.go`:
- Around line 55-68: The hub goroutine is started before validating cfg.Bridges,
which can leak hub.run() if validation panics; move the BridgeConfig validation
(checking BridgeConfig.Subscriber != nil and BridgeConfig.Channel != "") to run
before calling go hub.run() in NewWithHub (or the function that constructs the
hub), so validate all cfg.Bridges first (iterating over cfg.Bridges and
panicking with the existing messages for index i) and only after validation call
go hub.run() and then proceed to launch per-bridge goroutines.
In `@middleware/sse/sse_test.go`:
- Around line 1885-1918: The test currently can pass without observing a retry
because it waits only 100ms while bridgeRetryDelay is 3s; update
Test_SSE_Bridge_RetriesOnError to actually observe a retry by either overriding
the retry delay to a short duration before creating the hub (e.g., set
bridgeRetryDelay to a small value) or by waiting long enough for the real retry
(sleep > bridgeRetryDelay) and then assert attempts.Load() >= 2; refer to
Test_SSE_Bridge_RetriesOnError, bridgeRetryDelay, mockBridge.onSubscribe,
NewWithHub and hub.Shutdown to locate and modify the test accordingly.
---
Duplicate comments:
In `@middleware/sse/event.go`:
- Around line 181-187: The writeComment helper currently writes raw text which
allows embedded CR or LF to break the SSE framing; update writeComment to
sanitize the input text (e.g., replace any '\r' and '\n' characters with a safe
placeholder like a single space or remove them) before writing into the
bytebufferpool buffer so the produced ": " comment line cannot be split into new
SSE lines—mirror the defensive treatment used by WriteTo and keep references to
writeComment and bytebufferpool in your change.
In `@middleware/sse/hub.go`:
- Around line 88-104: The current Publish method can still race with Shutdown
because a buffered send can win the select even after draining was set; fix by
checking h.shutdown non-blockingly immediately before attempting to send and by
performing the send with a select that includes <-h.shutdown so shutdown is
honored if it becomes ready between checks: in Hub.Publish, replace the plain
send path by first doing a non-blocking select { case <-h.shutdown: /*drop*/;
default: } and then attempt the send with select { case h.events <- event:
h.metrics.eventsPublished.Add(1); case <-h.shutdown: /*drop*/ } (also keep the
initial h.draining.Load() early-return and metrics update on drop), and apply
the same pattern to the other Publish variant around lines 134-141 to eliminate
the race.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: f98bd7f7-ea45-4885-b456-007e057b024b
📒 Files selected for processing (4)
middleware/sse/bridge.gomiddleware/sse/event.gomiddleware/sse/hub.gomiddleware/sse/sse_test.go
- hub.go: move BridgeConfig validation ahead of `go hub.run()`. The
previous ordering started the run-loop goroutine first, then
panicked on bad config — leaking a zombie run loop because
NewWithHub never returns the hub to its caller for Shutdown.
Validation is now the first thing NewWithHub does; a bad config
aborts before any goroutine is spawned.
- bridge.go / sse_test.go: make Test_SSE_Bridge_RetriesOnError
actually exercise a retry. The old test slept 100ms with the retry
delay pinned at 3s, so it passed after the FIRST error without
ever observing a retry. Tightened it to:
- swap the package-level bridgeRetryDelay to 20ms for the test
(restored via t.Cleanup — not run in parallel to avoid
cross-test interference)
- block the second Subscribe call on ctx.Done and signal via a
channel so the test waits deterministically for attempts == 2
before shutting down
bridgeRetryDelay is now a package var rather than const so tests
can override it without exposing a public config knob.
Verification: go build / go vet / go test -race / golangci-lint — clean.
|
Addressed the fourth-pass findings: vinod-morya/fiber@5b3665e
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@middleware/sse/sse_test.go`:
- Around line 1597-1613: Test_SSE_Shutdown_Timeout currently calls t.Parallel()
but returns before hub.run() finishes (which waits on <-h.shutdown then calls
broadcastShutdown and time.Sleep(shutdownDrainDelay)), causing goroutine leaks
and races on hub.connections under mu.Lock; fix by removing t.Parallel() or by
keeping it and then waiting for hub.stopped after triggering shutdown (use the
hub.shutdown close as already done) and/or call hub.Shutdown with a context that
has a generous timeout (context.WithTimeout) and assert the returned error
matches the canceled/timeout condition; reference hub.run, hub.shutdown,
hub.stopped, hub.Shutdown, broadcastShutdown, shutdownDrainDelay, and
Test_SSE_Shutdown_Timeout when making the change.
- Around line 1415-1434: The test Test_SSE_Publish_BufferFull currently only
checks stats.EventsPublished > 0 which doesn't confirm drop-on-full behavior;
update the test to also assert that stats.EventsDropped (or the appropriate drop
counter returned by hub.Stats()) is positive after flooding the hub with events,
ensuring the Publish non-blocking path is exercised; locate the Publish method
and the hub.Stats() call in Test_SSE_Publish_BufferFull and add an assertion
that the drop counter > 0 (while keeping EventsPublished check) so the test
fails if Publish becomes blocking.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 724ebc10-e78e-4a1a-9f64-ced1ee6f8611
📒 Files selected for processing (3)
middleware/sse/bridge.gomiddleware/sse/hub.gomiddleware/sse/sse_test.go
- Test_SSE_Publish_BufferFull: assert EventsDropped > 0 in addition to EventsPublished > 0. The previous assertion would pass even if the non-blocking `default:` branch in Publish regressed to blocking behavior — dropped-counter is the actual invariant this test pins. - Test_SSE_Shutdown_Timeout: remove t.Parallel() and await hub.stopped before returning. The old form exited while run() was still inside the shutdown path (~200ms drain delay), letting the goroutine outlive the test and mutate hub.connections concurrently with other parallel tests. Also now asserts the expected ctx.Canceled error — previously the assertion was punted. Verification: go build / go vet / go test -race / golangci-lint — clean.
|
Addressed the fifth-pass findings: vinod-morya/fiber@2b6b56f
|
|
I will check it tomorrow |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4225 +/- ##
==========================================
- Coverage 91.15% 90.98% -0.18%
==========================================
Files 123 133 +10
Lines 12076 12766 +690
==========================================
+ Hits 11008 11615 +607
- Misses 669 723 +54
- Partials 399 428 +29
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@vinod-morya The lint workflow is failing. The |
CI was failing on two fronts after main was merged in:
- `lint / lint` — a `//nolint:gosec` directive on the bridge context
became unused on the CI linter version, and nolintlint blocked the
build.
- `codecov/patch` — 84% patch coverage with 103 uncovered lines.
Lint fix
- hub.go: replace the stored `bridgeCancel` CancelFunc with a goroutine
tied to `hub.shutdown`. `close(h.shutdown)` now fans the cancel out
alongside the run loop and watchers, and the cancel is visible at
goroutine scope so gosec G118 no longer needs suppressing. Removes
the `bridgeCancel` field and the matching nil-check in `Shutdown`.
Coverage (89.4% → 91.3% local; +7pp vs. CI base)
New targeted unit tests cover previously-untested branches:
- `Publish`: drop-during-drain path; TTL stamping of CreatedAt
- `writeRetry`: non-positive ms no-op
- `trackEventType`: empty type falling back to "message"
- `matchGroupConns`: empty-group early return
- `watchLifetime`: no-op when MaxLifetime <= 0
- `replayEvents`: nil Replayer; empty Last-Event-ID; replayer returning
an error (best-effort log+continue); full
write-and-flush success path
- `initStream`: propagates the first write error
- `sendConnectedEvent`: propagates write error
- `writeLoop`: heartbeat branch, real-event branch, and done-exit
branch all exercised via a `failingWriter` helper
`go build` / `go vet` / `go test -race` / `golangci-lint run` — clean.
|
Thanks @gaby — addressed both items in vinod-morya/fiber@43bc1fe. Lint. The Coverage. Added focused unit tests for previously-untested branches — overall
The @claude audit is also welcome — happy to act on anything it surfaces. |
|
@claude Conduct a security audit of this implementation to identify issues such as data races and potential panics during runtime. |
There was a problem hiding this comment.
Pull request overview
Introduces a new first-class middleware/sse package to provide Server-Sent Events streaming for Fiber v3, built around a Hub-based broker for routing, buffering/coalescing, and graceful shutdown.
Changes:
- Added SSE middleware core: Hub event loop, connection lifecycle, topic wildcard routing, dispatch/coalescing, adaptive throttling, and optional replay/bridge integrations.
- Added extensive unit + E2E tests and runnable examples covering wire format, sanitization, replay, bridging, and shutdown behavior.
- Updated Fiber v3 docs to include the new SSE middleware and usage examples.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| middleware/sse/sse.go | Public middleware entrypoints (New, NewWithHub) and stream setup (SendStreamWriter, Abandon, headers). |
| middleware/sse/config.go | Config surface + defaults for flush/heartbeat/lifetime/buffers, hooks, replayer and bridges. |
| middleware/sse/hub.go | Hub run loop, publish pipeline, routing (topics/groups), heartbeats, draining/shutdown, bridge lifecycle. |
| middleware/sse/connection.go | Per-connection channels, write loop, heartbeat handling, lifecycle bookkeeping. |
| middleware/sse/event.go | Event model, marshaling, SSE wire formatting, injection sanitization, pooled writes. |
| middleware/sse/dispatcher.go | Per-connection buffering/coalescing for P1/P2 delivery lanes. |
| middleware/sse/topic.go | NATS-style topic wildcard matching helpers used by routing. |
| middleware/sse/throttle.go | Adaptive throttler to modulate flush cadence based on per-connection backpressure. |
| middleware/sse/bridge.go | Subscriber bridge interface/config, retry loop, transform handling, publish into hub. |
| middleware/sse/replayer.go | Replayer interface for Last-Event-ID replay. |
| middleware/sse/stats.go | Hub stats/metrics structs + event-type counters. |
| middleware/sse/sse_test.go | Large suite of unit + E2E tests validating SSE behavior, replay, bridges, shutdown semantics. |
| middleware/sse/example_test.go | Runnable examples demonstrating basic usage, priorities, and wildcards. |
| docs/whats_new.md | Adds SSE section to v3 “What’s New” and TOC entry. |
| docs/middleware/sse.md | New middleware documentation page with usage patterns and config reference. |
|
|
||
| if len(conn.Topics) == 0 { | ||
| return c.Status(fiber.StatusBadRequest).SendString("no topics subscribed") | ||
| } |
There was a problem hiding this comment.
Only Metadata is defensive-copied (“frozen”) after OnConnect, but conn.Topics is left as the caller-provided slice. If an OnConnect implementation retains conn and mutates Topics from another goroutine, the hub will read it concurrently (e.g., add/removeConnection, match logic), causing a data race. Consider copying conn.Topics (and optionally validating non-empty topic strings) before registering the connection, similar to the Metadata freeze.
| if len(conn.Topics) == 0 { | |
| return c.Status(fiber.StatusBadRequest).SendString("no topics subscribed") | |
| } | |
| // Freeze topics — defensive copy to prevent concurrent mutation | |
| // after the connection is registered with the hub. | |
| topics := make([]string, len(conn.Topics)) | |
| copy(topics, conn.Topics) | |
| conn.Topics = topics | |
| if len(conn.Topics) == 0 { | |
| return c.Status(fiber.StatusBadRequest).SendString("no topics subscribed") | |
| } | |
| for _, topic := range conn.Topics { | |
| if topic == "" { | |
| return c.Status(fiber.StatusBadRequest).SendString("empty topic subscribed") | |
| } | |
| } |
| // Give the hub time to register the connection before publishing. | ||
| time.Sleep(50 * time.Millisecond) |
There was a problem hiding this comment.
These E2E tests rely on fixed time.Sleep delays to wait for hub registration/dispatch, which can be flaky under load or slower CI runners. Prefer a synchronization signal (e.g., a channel closed from OnConnect/OnDisconnect) or require.Eventually polling a deterministic condition instead of sleeping a hard-coded duration.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
grivera64
left a comment
There was a problem hiding this comment.
Thanks for making the PR, I've left a partial review on the changes below.
I also want to ask, would it be better to limit the SSE middleware to only the required features necessary for Server-Side Events? Features like topics, priority, and tags are more application specific rather than SSE protocol specific. Since this would be a core SSE middleware, we should avoid forcing every user to have the overhead and complexity of a Pub/Sub engine, even if they only need a simple stream.
To make these features optional, we could add the extra logic as independent add-ons. For example, a Hub (or event stream) can be optionally added to sse.Config{} to take advantage of hub.Publish(). Same thing goes for Dispatcher, Replayer, and other optional features.
By separating the core SSE features from extra features, the SSE middleware stays simple while also providing an interface for more complex use cases.
| func Test_SSE_E2E_HeadersAndConnectedFrame(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| handler, hub := NewWithHub(Config{ | ||
| MaxLifetime: 500 * time.Millisecond, | ||
| HeartbeatInterval: 100 * time.Millisecond, | ||
| FlushInterval: 50 * time.Millisecond, | ||
| OnConnect: func(_ fiber.Ctx, conn *Connection) error { | ||
| conn.Topics = []string{"updates"} | ||
| return nil | ||
| }, | ||
| }) | ||
| defer func() { | ||
| ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | ||
| defer cancel() | ||
| require.NoError(t, hub.Shutdown(ctx)) | ||
| }() | ||
|
|
||
| base, cleanup := startSSEServer(t, handler) | ||
| defer cleanup() | ||
|
|
||
| req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, base+"/events", http.NoBody) | ||
| require.NoError(t, err) | ||
|
|
||
| resp, err := (&http.Client{Timeout: 5 * time.Second}).Do(req) | ||
| require.NoError(t, err) | ||
| t.Cleanup(func() { require.NoError(t, resp.Body.Close()) }) | ||
|
|
||
| // RFC 8895 + W3C SSE: Content-Type must be text/event-stream. | ||
| require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type")) | ||
| require.Equal(t, "no-cache", resp.Header.Get("Cache-Control")) | ||
| require.Equal(t, "keep-alive", resp.Header.Get("Connection")) | ||
| require.Equal(t, "no", resp.Header.Get("X-Accel-Buffering")) | ||
| require.Equal(t, http.StatusOK, resp.StatusCode) | ||
|
|
||
| br := bufio.NewReader(resp.Body) | ||
|
|
||
| // First: retry directive frame from writeRetry. | ||
| retryFrame := readSSEFrame(t, br) | ||
| require.Contains(t, retryFrame, "retry: 3000") | ||
|
|
||
| // Second: connected event with connection_id and topics. | ||
| connectedFrame := readSSEFrame(t, br) | ||
| require.Contains(t, connectedFrame, "event: connected") | ||
| require.Contains(t, connectedFrame, "connection_id") | ||
| require.Contains(t, connectedFrame, `"topics":["updates"]`) | ||
| } |
There was a problem hiding this comment.
Can you re-write these tests using our standard app.Test() method? And please use stricter checks to ensure that the output of the middleware is exactly as expected to avoid any regressions in the future.
For example:
func Test_SSE_E2E_HeadersAndConnectedFrame(t *testing.T) {
t.Parallel()
app := fiber.New()
app.Get("/events", New(Config{
MaxLifetime: 500 * time.Millisecond,
HeartbeatInterval: 100 * time.Millisecond,
FlushInterval: 50 * time.Millisecond,
OnConnect: func(_ fiber.Ctx, conn *Connection) error {
conn.Topics = []string{"updates"}
// Manual ID for deterministic testing
conn.ID = "fiber"
return nil
},
}))
req, err := http.NewRequest("GET", "/events", nil)
require.NoError(t, err)
resp, err := app.Test(req, fiber.TestConfig{
Timeout: 5 * time.Second,
})
require.NoError(t, err)
// RFC 8895 + W3C SSE: Content-Type must be text/event-stream.
require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type"))
require.Equal(t, "no-cache", resp.Header.Get("Cache-Control"))
require.Equal(t, "keep-alive", resp.Header.Get("Connection"))
require.Equal(t, "no", resp.Header.Get("X-Accel-Buffering"))
require.Equal(t, http.StatusOK, resp.StatusCode)
body, err := io.ReadAll(resp.Body)
defer resp.Body.Close()
require.NoError(t, err)
frames := strings.Split(string(body), "\n\n")
require.Equal(t, "retry: 3000", frames[0])
require.Equal(t, "id: evt_1\nevent: connected\ndata: {\"connection_id\":\"fiber\",\"topics\":[\"updates\"]}", frames[1])
}| Type: "order-created", | ||
| Data: `{"id":"ord_123","total":99}`, | ||
| Topics: []string{"orders"}, | ||
| Priority: PriorityInstant, |
There was a problem hiding this comment.
In addition to the previous comment, can you add checks that verify the Priority field works as expected? Based on the test, having any priority here wouldn't affect the outcome of the test.
| hub.Publish(Event{ | ||
| Type: "log", | ||
| Data: "line1\nline2\nline3", | ||
| Topics: []string{"logs"}, | ||
| Priority: PriorityInstant, | ||
| }) |
There was a problem hiding this comment.
Similar to the priority field, the type and topics should be strictly checked here.
| require.Equal(t, 1, idLines, "id: injection must be sanitized") | ||
| require.Equal(t, 1, eventLines, "event: injection must be sanitized") |
There was a problem hiding this comment.
Since the injection is sanitized, we should also have a check for what it should have sanitized to.
| // RetryMS is the reconnection interval hint sent to clients via the | ||
| // retry: directive on connect. | ||
| // | ||
| // Optional. Default: 3000 | ||
| RetryMS int |
There was a problem hiding this comment.
Let's also use type time.Duration here, and rename to ClientRetryInterval or something similar to be consistent with the other configs here.
|
|
||
| // matchWildcardConns adds wildcard-subscribed connections that match the event topics. | ||
| func (h *Hub) matchWildcardConns(event *Event, seen map[string]struct{}) { | ||
| for connID := range h.wildcardConns { |
There was a problem hiding this comment.
Same as my above comment, I don't think we need to run these matches on each publish, but rather just subscribe connections to events on creation.
| if len(event.Group) == 0 { | ||
| return | ||
| } | ||
| for connID, conn := range h.connections { |
| conns := make([]*Connection, 0, len(h.connections)) | ||
| for _, conn := range h.connections { | ||
| if !conn.IsClosed() && !conn.paused.Load() { | ||
| conns = append(conns, conn) | ||
| } | ||
| } |
There was a problem hiding this comment.
Is creating a slice here necessary? This adds extra memory allocations on each flush, but it is only used in the for loop and GC'ed afterwards.
| // watchLifetime starts a goroutine that closes the connection after | ||
| // MaxLifetime has elapsed. | ||
| func (h *Hub) watchLifetime(conn *Connection) { | ||
| if h.cfg.MaxLifetime <= 0 { | ||
| return | ||
| } | ||
| go func() { | ||
| timer := time.NewTimer(h.cfg.MaxLifetime) | ||
| defer timer.Stop() | ||
| select { | ||
| case <-timer.C: | ||
| conn.Close() | ||
| case <-conn.done: | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
Do we really need a separate goroutine to track lifetime per connection? In c.SendStreamWriter(), no extra close logic is necessary by default. Spawning in too many goroutines will add some overhead.
The error returned by a failed write or flush should be enough to tell a connection closed.
There was a problem hiding this comment.
We should remove this example_test.go in the middleware folder. This could be added separately in the docs/middleware/sse.md file, or as a recipe in the future.
There was a problem hiding this comment.
Agree, this should be removed and added as a a recipe
|
replaced by #4239 |
Context
Opening this as a fresh PR per @grivera64's request on #4196 so the review UI can anchor feedback to specific commits and lines. #4196 will be closed and linked back here for history.
Scope
Slim, focused SSE middleware built natively for Fiber v3's
fasthttparchitecture. Everything that wasn't strictly SSE-mechanics (JWT, Prometheus, invalidation helpers, domain-event wrappers, REST invalidation routes) has been removed and kept in the standalone downstream library. What remains is the core hub + connection lifecycle + wire format.What's included
Hub— single goroutine event loop, topic-based routingPriorityInstant/PriorityBatched/PriorityCoalescedorders.*,analytics.>)Hub.Shutdown(ctx)Replayerinterface (Last-Event-IDreplay, in-memory impl included)SubscriberBridge— auto-started bridges from external pub/sub (Redis, NATS, …) declared inConfig.Bridges, canceled/awaited on shutdownWhat was deliberately left out (lives downstream)
Review history
This PR represents the v1.1 refactor addressing all 6 points from @grivera64's review on #4196 (#4196 (comment)) and aligning with Fiber's internal middleware patterns (session, cache, limiter, keyauth).
Summary of the v1.1 changes vs. the earlier slim push:
NextfromConfigcoalescer→DispatcherwithAddEvent/AddState/WriteToFanOut()→Config.Bridges []BridgeConfig+SubscriberBridgeMarshaledEvent.WriteToviabytebufferpool: 0 B/op, 0 allocs (was 381 B / 8 allocs) at 26 ns/opContent-Type,retry,connected, multi-linedata,id/eventinjection sanitisationsse.go671→154 lines; all Hub methods inhub.go; bridges inbridge.goPlus:
c.Abandon()beforeSendStreamWriter(preventsfiber.Ctxpool reuse mid-stream)t.Parallel()on every test perAGENTS.mdStatus
go build,go vet,go test -race— all cleangolangci-lint run— 0 issuesNotes
Happy to split the diff into smaller commits on this branch if that would make review easier — just say the word and I'll layer it in without force-pushing. Also happy to adjust scope either direction (further shrink or grow) based on what fits best for Fiber's middleware collection.
Thanks again @grivera64 and @gaby for the patient review — the design here owes a lot to your feedback.
Closes #4196