Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions docs/middleware/sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
---
id: sse
---

# SSE

Server-Sent Events middleware for [Fiber](https://github.com/gofiber/fiber) built natively on Fiber's fasthttp architecture. It provides a Hub-based event broker with topic routing, three priority lanes (instant/batched/coalesced), NATS-style topic wildcards, adaptive per-connection throttling, connection groups, graceful drain, and pluggable Last-Event-ID replay.

The middleware is fully compatible with the standard SSE wire format β€” any client that speaks Server-Sent Events (browser `EventSource`, `curl -N`, or any HTTP client that reads `text/event-stream`) works with it.

## Signatures

```go
func New(config ...Config) fiber.Handler
func NewWithHub(config ...Config) (fiber.Handler, *Hub)
```

`New` returns just the handler; use `NewWithHub` when you need access to the hub for publishing events.

## Examples

Import the middleware package:

```go
import (
"github.com/gofiber/fiber/v3"
"github.com/gofiber/fiber/v3/middleware/sse"
)
```

Once your Fiber app is initialized, create an SSE handler and hub:

```go
// Basic usage β€” subscribe all clients to "notifications"
handler, hub := sse.NewWithHub(sse.Config{
OnConnect: func(c fiber.Ctx, conn *sse.Connection) error {
conn.Topics = []string{"notifications"}
return nil
},
})
app.Get("/events", handler)

// Publish an event from any goroutine
hub.Publish(sse.Event{
Type: "update",
Data: "hello",
Topics: []string{"notifications"},
})
```

Use NATS-style wildcards to subscribe to multiple related topics:

```go
handler, hub := sse.NewWithHub(sse.Config{
OnConnect: func(c fiber.Ctx, conn *sse.Connection) error {
// Match orders.created, orders.updated, orders.deleted
conn.Topics = []string{"orders.*"}
return nil
},
})
```

Use connection groups (metadata-based filtering) for multi-tenant isolation:

```go
handler, hub := sse.NewWithHub(sse.Config{
OnConnect: func(c fiber.Ctx, conn *sse.Connection) error {
tenantID := c.Locals("tenant_id").(string)
conn.Metadata["tenant_id"] = tenantID
conn.Topics = []string{"orders"}
return nil
},
})

// Publish only to connections in tenant "t_123"
hub.Publish(sse.Event{
Type: "order-created",
Data: orderJSON,
Topics: []string{"orders"},
Group: map[string]string{"tenant_id": "t_123"},
})
```

Use event coalescing to reduce traffic for high-frequency updates:

```go
// Coalesced: if progress goes 5%β†’8% in one flush window,
// only the latest value is sent.
for i := 1; i <= 100; i++ {
hub.Publish(sse.Event{
Type: "progress",
Data: fmt.Sprintf(`{"pct":%d}`, i),
Topics: []string{"import"},
Priority: sse.PriorityCoalesced,
CoalesceKey: "import-progress",
})
}
```

Fan out from an external pub/sub system (Redis, NATS, etc.) into the hub. Implement the `SubscriberBridge` interface and declare it on `Config.Bridges` β€” the middleware auto-starts each bridge and cancels/awaits them on `hub.Shutdown`, so there are no `CancelFunc`s for the caller to track.

```go
type redisSubscriber struct{ client *redis.Client }

func (r *redisSubscriber) Subscribe(ctx context.Context, channel string, onMessage func(string)) error {
sub := r.client.Subscribe(ctx, channel)
defer sub.Close()
for msg := range sub.Channel() {
onMessage(msg.Payload)
}
return ctx.Err()
}

handler, hub := sse.NewWithHub(sse.Config{
Bridges: []sse.BridgeConfig{{
Subscriber: &redisSubscriber{client: rdb},
Channel: "notifications",
Topic: "notifications",
EventType: "notification",
}},
})
app.Get("/events", handler)
```

Graceful shutdown with deadline:

```go
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := hub.Shutdown(ctx); err != nil {
log.Errorf("sse drain failed: %v", err)
}
```

Authentication is left to the user via `OnConnect`. Note that browser `EventSource` cannot send custom headers, so if you need token authentication, consider passing the token via a query parameter or a short-lived ticket exchanged on a separate endpoint.

## Config

| Property | Type | Description | Default |
| :---------------- | :------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------- | :------------- |
| OnConnect | `func(fiber.Ctx, *Connection) error` | Called when a new client connects. Set `conn.Topics` and `conn.Metadata` here. Return error to reject (sends 403). | `nil` |
| OnDisconnect | `func(*Connection)` | Called after a client disconnects. | `nil` |
| OnPause | `func(*Connection)` | Called when a connection is paused (browser tab hidden). | `nil` |
| OnResume | `func(*Connection)` | Called when a connection is resumed (browser tab visible). | `nil` |
| Replayer | `Replayer` | Pluggable Last-Event-ID replay backend. If nil, replay is disabled. | `nil` |
| Bridges | `[]BridgeConfig` | Auto-started bridges from external pub/sub systems. Each implements `SubscriberBridge`. Canceled on `hub.Shutdown`. | `nil` |
| FlushInterval | `time.Duration` | How often batched (P1) and coalesced (P2) events are flushed to clients. Instant (P0) events bypass this. | `2s` |
| HeartbeatInterval | `time.Duration` | How often a comment is sent to idle connections to detect disconnects and prevent proxy timeouts. | `30s` |
| MaxLifetime | `time.Duration` | Maximum duration a single SSE connection can stay open. Set to -1 for unlimited. | `30m` |
| SendBufferSize | `int` | Per-connection channel buffer. If full, events are dropped. | `256` |
| RetryMS | `int` | Reconnection interval hint sent to clients via the `retry:` directive on connect. | `3000` |

The SSE middleware is **terminal** β€” the returned handler hijacks the response stream and never calls `c.Next()`. For the same reason `Config` does not include a `Next` field: placing handlers after the SSE middleware has no defined effect.

## Default Config

```go
var ConfigDefault = Config{
FlushInterval: 2 * time.Second,
SendBufferSize: 256,
HeartbeatInterval: 30 * time.Second,
MaxLifetime: 30 * time.Minute,
RetryMS: 3000,
}
```
22 changes: 22 additions & 0 deletions docs/whats_new.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Here's a quick overview of the changes in Fiber `v3`:
- [Proxy](#proxy)
- [Recover](#recover)
- [Session](#session)
- [SSE](#sse)
- [πŸ”Œ Addons](#-addons)
- [πŸ“‹ Migration guide](#-migration-guide)

Expand Down Expand Up @@ -3138,3 +3139,24 @@ app.Use(session.New(session.Config{

See the [Session Middleware Migration Guide](./middleware/session.md#migration-guide)
for complete details.

#### SSE

The new SSE middleware provides Server-Sent Events for Fiber, built natively on the fasthttp `SendStreamWriter` API. It includes a Hub-based broker with topic routing, three priority lanes (instant/batched/coalesced), NATS-style topic wildcards, connection groups for metadata-based filtering, adaptive throttling, graceful drain, and pluggable Last-Event-ID replay. Fully compatible with the standard SSE wire format and any `EventSource`-style client.

```go
handler, hub := sse.NewWithHub(sse.Config{
OnConnect: func(c fiber.Ctx, conn *sse.Connection) error {
conn.Topics = []string{"notifications"}
return nil
},
})
app.Get("/events", handler)

// Publish from any handler or worker
hub.Publish(sse.Event{
Type: "update",
Data: "hello",
Topics: []string{"notifications"},
})
```
Comment on lines +3160 to +3179
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟑 Minor

🧩 Analysis chain

🏁 Script executed:

# Check file size first
wc -l docs/whats_new.md

Repository: gofiber/fiber

Length of output: 79


🏁 Script executed:

# Read the section in question (lines 3140-3165)
sed -n '3140,3165p' docs/whats_new.md

Repository: gofiber/fiber

Length of output: 1000


🏁 Script executed:

# Check what section the SSE content is under by examining lines before 3143
sed -n '3100,3145p' docs/whats_new.md

Repository: gofiber/fiber

Length of output: 1793


🏁 Script executed:

# Find the migration guide section markers
rg -n "^## .*Migration|^### .*Middlewares|^## πŸ”Œ Addons" docs/whats_new.md | head -20

Repository: gofiber/fiber

Length of output: 125


🏁 Script executed:

# Check the TOC around line 60
sed -n '50,70p' docs/whats_new.md

Repository: gofiber/fiber

Length of output: 695


🏁 Script executed:

# Verify if make markdown target exists
test -f Makefile && grep -n "markdown" Makefile

Repository: gofiber/fiber

Length of output: 284


Move the SSE section to the main content area before the Addons section.

The SSE middleware is described as a new feature, not a migration guide, yet it is currently nested under the migration guide's ### 🧬 Middlewares section (line 2794). However, the main TOC lists it as part of the top-level Middlewares section. This mismatch will cause incorrect navigation. Move the SSE section before ## πŸ”Œ Addons (line 1695) to align with the TOC structure, or relocate it to the main Middlewares section if one exists before the Addons section. After updating, run make markdown to lint the file per coding guidelines.

πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/whats_new.md` around lines 3143 - 3162, The SSE section (the paragraph
starting with the "#### SSE" header) is misplaced under the migration guide's
"### 🧬 Middlewares" area; move that entire SSE block so it appears before the
top-level "## πŸ”Œ Addons" header (or into the main top-level Middlewares section
if one exists) so the TOC matches content, then run make markdown to lint and
fix formatting; look for the "#### SSE" header and relocate the contiguous code
fence and description until the next header, ensuring surrounding headers remain
intact.

142 changes: 142 additions & 0 deletions middleware/sse/bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package sse

import (
"context"
"time"

"github.com/gofiber/fiber/v3/log"
)

// bridgeRetryDelay is how long the hub waits before retrying a failed
// SubscriberBridge.Subscribe call.
const bridgeRetryDelay = 3 * time.Second

// SubscriberBridge adapts an external pub/sub system (Redis, NATS, Kafka,
// etc.) so incoming messages can be forwarded into the hub as SSE events.
//
// Implementations must block until ctx is canceled and return ctx.Err()
// so the hub can distinguish intentional shutdown from subscriber failure.
type SubscriberBridge interface {
// Subscribe listens on channel and invokes onMessage for each received
// payload. It must return when ctx is canceled.
Subscribe(ctx context.Context, channel string, onMessage func(payload string)) error
}

// BridgeConfig wires a SubscriberBridge into the hub. Populate one of these
// for each external channel you want to forward events from.
type BridgeConfig struct {
// Subscriber is the pub/sub implementation. Required.
Subscriber SubscriberBridge

// Transform optionally transforms the raw payload into a fully-formed
// Event. Return nil to skip the message. If Transform is nil, the
// payload is used as Event.Data with the defaults below.
Transform func(payload string) *Event

// Channel is the pub/sub channel to subscribe to. Required.
Channel string

// Topic is the SSE topic forwarded events are tagged with.
// Defaults to Channel if empty.
Topic string

// EventType is the SSE event: field set on forwarded events.
EventType string

// CoalesceKey for PriorityCoalesced events.
CoalesceKey string

// TTL for forwarded events. Zero means no expiration.
TTL time.Duration

// Priority for forwarded events. PriorityInstant (0) is the default.
Priority Priority
}

// runBridge consumes a single BridgeConfig, publishing incoming payloads
// until ctx is canceled. Retries on Subscribe errors with bridgeRetryDelay.
//
// A nil Subscriber is a programming error caught at hub startup (see
// NewWithHub) so runBridge assumes cfg.Subscriber is non-nil.
func (h *Hub) runBridge(ctx context.Context, cfg BridgeConfig) { //nolint:gocritic // hugeParam: value semantics preferred
topic := cfg.Topic
if topic == "" {
topic = cfg.Channel
}

for {
select {
case <-ctx.Done():
return
default:
}

// Wrap the callback in a recover so a panic inside the caller-
// supplied Transform can't tear down the bridge goroutine (which
// would leak h.bridges.Done() and hang Shutdown forever).
err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
defer func() {
if r := recover(); r != nil {
log.Errorf("sse: bridge transform panic, message dropped channel=%s panic=%v",
cfg.Channel, r)
}
}()
if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil {
h.Publish(*event)
}
})

if ctx.Err() != nil {
return
}

// Any early return β€” error or unexpected nil from a well-behaved
// subscriber β€” is treated as retryable. Without the backoff on
// nil, a misbehaving subscriber that returns immediately would
// spin this loop hot.
if err != nil {
logBridgeError(cfg.Channel, err)
}
select {
case <-time.After(bridgeRetryDelay):
case <-ctx.Done():
return
}
}
Comment on lines +78 to +106
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Throttle unexpected nil returns from Subscribe.

If a subscriber returns nil before ctx is canceled, this loop immediately resubscribes and can spin hot on closed streams. Treat any early return as retryable and apply the same delay.

Proposed fix
 		err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
 			if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil {
 				h.Publish(*event)
 			}
 		})
 
-		if err != nil && ctx.Err() == nil {
-			logBridgeError(cfg.Channel, err)
+		if ctx.Err() == nil {
+			if err != nil {
+				logBridgeError(cfg.Channel, err)
+			} else {
+				logBridgeError(cfg.Channel, context.Canceled)
+			}
 			select {
 			case <-time.After(bridgeRetryDelay):
 			case <-ctx.Done():
 				return
 			}
 		}
πŸ“ Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil {
h.Publish(*event)
}
})
if err != nil && ctx.Err() == nil {
logBridgeError(cfg.Channel, err)
select {
case <-time.After(bridgeRetryDelay):
case <-ctx.Done():
return
}
}
}
err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil {
h.Publish(*event)
}
})
if ctx.Err() == nil {
if err != nil {
logBridgeError(cfg.Channel, err)
} else {
logBridgeError(cfg.Channel, context.Canceled)
}
select {
case <-time.After(bridgeRetryDelay):
case <-ctx.Done():
return
}
}
}
πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/bridge.go` around lines 71 - 85, The loop currently resumes
immediately when cfg.Subscriber.Subscribe returns nil (err == nil) before ctx
cancellation, which can spin; change the logic around the Subscribe return
handling so that an early nil return is treated as retryable: if err != nil OR
(err == nil && ctx.Err() == nil) then log a warning (use logBridgeError or an
appropriate logger referencing cfg.Channel) and perform the same select { case
<-time.After(bridgeRetryDelay): case <-ctx.Done(): return } as done for non-nil
errors; keep the same references to cfg.Subscriber.Subscribe,
h.buildBridgeEvent, logBridgeError, bridgeRetryDelay and ctx so the retry path
is applied for both error and unexpected-nil cases.

}

// buildBridgeEvent creates an Event from a raw pub/sub payload.
// When Transform is set, the transform function controls all event fields;
// only missing Topics and Type are filled from config defaults.
// When Transform is not set, the event is built entirely from config defaults.
func (*Hub) buildBridgeEvent(cfg *BridgeConfig, topic, payload string) *Event {
if cfg.Transform != nil {
transformed := cfg.Transform(payload)
if transformed == nil {
return nil
}
event := *transformed
if len(event.Topics) == 0 {
event.Topics = []string{topic}
}
if event.Type == "" {
event.Type = cfg.EventType
}
return &event
}

return &Event{
Type: cfg.EventType,
Data: payload,
Topics: []string{topic},
Priority: cfg.Priority,
CoalesceKey: cfg.CoalesceKey,
TTL: cfg.TTL,
}
}

// logBridgeError logs a bridge subscriber error. Retries continue after
// bridgeRetryDelay regardless of error type.
func logBridgeError(channel string, err error) {
log.Warnf("sse: bridge subscriber error, retrying channel=%s error=%v", channel, err)
}
Loading