-
-
Notifications
You must be signed in to change notification settings - Fork 2k
π₯ feat: Add SSE middleware #4225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1ca6cb3
fa71e85
2ef3803
d6e5c73
0c1aeb4
ea972cf
9fc8e2a
5b3665e
2b6b56f
05f91b1
43bc1fe
8f6554b
b3862e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,165 @@ | ||
| --- | ||
| id: sse | ||
| --- | ||
|
|
||
| # SSE | ||
|
|
||
| Server-Sent Events middleware for [Fiber](https://github.com/gofiber/fiber) built natively on Fiber's fasthttp architecture. It provides a Hub-based event broker with topic routing, three priority lanes (instant/batched/coalesced), NATS-style topic wildcards, adaptive per-connection throttling, connection groups, graceful drain, and pluggable Last-Event-ID replay. | ||
|
|
||
| The middleware is fully compatible with the standard SSE wire format β any client that speaks Server-Sent Events (browser `EventSource`, `curl -N`, or any HTTP client that reads `text/event-stream`) works with it. | ||
|
|
||
| ## Signatures | ||
|
|
||
| ```go | ||
| func New(config ...Config) fiber.Handler | ||
| func NewWithHub(config ...Config) (fiber.Handler, *Hub) | ||
| ``` | ||
|
|
||
| `New` returns just the handler; use `NewWithHub` when you need access to the hub for publishing events. | ||
|
|
||
| ## Examples | ||
|
|
||
| Import the middleware package: | ||
|
|
||
| ```go | ||
| import ( | ||
| "github.com/gofiber/fiber/v3" | ||
| "github.com/gofiber/fiber/v3/middleware/sse" | ||
| ) | ||
| ``` | ||
|
|
||
| Once your Fiber app is initialized, create an SSE handler and hub: | ||
|
|
||
| ```go | ||
| // Basic usage β subscribe all clients to "notifications" | ||
| handler, hub := sse.NewWithHub(sse.Config{ | ||
| OnConnect: func(c fiber.Ctx, conn *sse.Connection) error { | ||
| conn.Topics = []string{"notifications"} | ||
| return nil | ||
| }, | ||
| }) | ||
| app.Get("/events", handler) | ||
|
|
||
| // Publish an event from any goroutine | ||
| hub.Publish(sse.Event{ | ||
| Type: "update", | ||
| Data: "hello", | ||
| Topics: []string{"notifications"}, | ||
| }) | ||
| ``` | ||
|
|
||
| Use NATS-style wildcards to subscribe to multiple related topics: | ||
|
|
||
| ```go | ||
| handler, hub := sse.NewWithHub(sse.Config{ | ||
| OnConnect: func(c fiber.Ctx, conn *sse.Connection) error { | ||
| // Match orders.created, orders.updated, orders.deleted | ||
| conn.Topics = []string{"orders.*"} | ||
| return nil | ||
| }, | ||
| }) | ||
| ``` | ||
|
|
||
| Use connection groups (metadata-based filtering) for multi-tenant isolation: | ||
|
|
||
| ```go | ||
| handler, hub := sse.NewWithHub(sse.Config{ | ||
| OnConnect: func(c fiber.Ctx, conn *sse.Connection) error { | ||
| tenantID := c.Locals("tenant_id").(string) | ||
| conn.Metadata["tenant_id"] = tenantID | ||
| conn.Topics = []string{"orders"} | ||
| return nil | ||
| }, | ||
| }) | ||
|
|
||
| // Publish only to connections in tenant "t_123" | ||
| hub.Publish(sse.Event{ | ||
| Type: "order-created", | ||
| Data: orderJSON, | ||
| Topics: []string{"orders"}, | ||
| Group: map[string]string{"tenant_id": "t_123"}, | ||
| }) | ||
| ``` | ||
|
|
||
| Use event coalescing to reduce traffic for high-frequency updates: | ||
|
|
||
| ```go | ||
| // Coalesced: if progress goes 5%β8% in one flush window, | ||
| // only the latest value is sent. | ||
| for i := 1; i <= 100; i++ { | ||
| hub.Publish(sse.Event{ | ||
| Type: "progress", | ||
| Data: fmt.Sprintf(`{"pct":%d}`, i), | ||
| Topics: []string{"import"}, | ||
| Priority: sse.PriorityCoalesced, | ||
| CoalesceKey: "import-progress", | ||
| }) | ||
| } | ||
| ``` | ||
|
|
||
| Fan out from an external pub/sub system (Redis, NATS, etc.) into the hub. Implement the `SubscriberBridge` interface and declare it on `Config.Bridges` β the middleware auto-starts each bridge and cancels/awaits them on `hub.Shutdown`, so there are no `CancelFunc`s for the caller to track. | ||
|
|
||
| ```go | ||
| type redisSubscriber struct{ client *redis.Client } | ||
|
|
||
| func (r *redisSubscriber) Subscribe(ctx context.Context, channel string, onMessage func(string)) error { | ||
| sub := r.client.Subscribe(ctx, channel) | ||
| defer sub.Close() | ||
| for msg := range sub.Channel() { | ||
| onMessage(msg.Payload) | ||
| } | ||
| return ctx.Err() | ||
| } | ||
|
|
||
| handler, hub := sse.NewWithHub(sse.Config{ | ||
| Bridges: []sse.BridgeConfig{{ | ||
| Subscriber: &redisSubscriber{client: rdb}, | ||
| Channel: "notifications", | ||
| Topic: "notifications", | ||
| EventType: "notification", | ||
| }}, | ||
| }) | ||
| app.Get("/events", handler) | ||
| ``` | ||
|
|
||
| Graceful shutdown with deadline: | ||
|
|
||
| ```go | ||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| defer cancel() | ||
| if err := hub.Shutdown(ctx); err != nil { | ||
| log.Errorf("sse drain failed: %v", err) | ||
| } | ||
| ``` | ||
|
|
||
| Authentication is left to the user via `OnConnect`. Note that browser `EventSource` cannot send custom headers, so if you need token authentication, consider passing the token via a query parameter or a short-lived ticket exchanged on a separate endpoint. | ||
|
|
||
| ## Config | ||
|
|
||
| | Property | Type | Description | Default | | ||
| | :---------------- | :------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------- | :------------- | | ||
| | OnConnect | `func(fiber.Ctx, *Connection) error` | Called when a new client connects. Set `conn.Topics` and `conn.Metadata` here. Return error to reject (sends 403). | `nil` | | ||
| | OnDisconnect | `func(*Connection)` | Called after a client disconnects. | `nil` | | ||
| | OnPause | `func(*Connection)` | Called when a connection is paused (browser tab hidden). | `nil` | | ||
| | OnResume | `func(*Connection)` | Called when a connection is resumed (browser tab visible). | `nil` | | ||
| | Replayer | `Replayer` | Pluggable Last-Event-ID replay backend. If nil, replay is disabled. | `nil` | | ||
| | Bridges | `[]BridgeConfig` | Auto-started bridges from external pub/sub systems. Each implements `SubscriberBridge`. Canceled on `hub.Shutdown`. | `nil` | | ||
| | FlushInterval | `time.Duration` | How often batched (P1) and coalesced (P2) events are flushed to clients. Instant (P0) events bypass this. | `2s` | | ||
| | HeartbeatInterval | `time.Duration` | How often a comment is sent to idle connections to detect disconnects and prevent proxy timeouts. | `30s` | | ||
| | MaxLifetime | `time.Duration` | Maximum duration a single SSE connection can stay open. Set to -1 for unlimited. | `30m` | | ||
| | SendBufferSize | `int` | Per-connection channel buffer. If full, events are dropped. | `256` | | ||
| | RetryMS | `int` | Reconnection interval hint sent to clients via the `retry:` directive on connect. | `3000` | | ||
|
|
||
| The SSE middleware is **terminal** β the returned handler hijacks the response stream and never calls `c.Next()`. For the same reason `Config` does not include a `Next` field: placing handlers after the SSE middleware has no defined effect. | ||
|
|
||
| ## Default Config | ||
|
|
||
| ```go | ||
| var ConfigDefault = Config{ | ||
| FlushInterval: 2 * time.Second, | ||
| SendBufferSize: 256, | ||
| HeartbeatInterval: 30 * time.Second, | ||
| MaxLifetime: 30 * time.Minute, | ||
| RetryMS: 3000, | ||
| } | ||
| ``` |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,143 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| package sse | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/gofiber/fiber/v3/log" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // bridgeRetryDelay is how long the hub waits before retrying a failed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // SubscriberBridge.Subscribe call. Package-level var (not const) so tests | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // can shorten it to observe retry behavior deterministically. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var bridgeRetryDelay = 3 * time.Second | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // SubscriberBridge adapts an external pub/sub system (Redis, NATS, Kafka, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // etc.) so incoming messages can be forwarded into the hub as SSE events. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Implementations must block until ctx is canceled and return ctx.Err() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // so the hub can distinguish intentional shutdown from subscriber failure. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type SubscriberBridge interface { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Subscribe listens on channel and invokes onMessage for each received | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // payload. It must return when ctx is canceled. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Subscribe(ctx context.Context, channel string, onMessage func(payload string)) error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // BridgeConfig wires a SubscriberBridge into the hub. Populate one of these | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // for each external channel you want to forward events from. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type BridgeConfig struct { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Subscriber is the pub/sub implementation. Required. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Subscriber SubscriberBridge | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Transform optionally transforms the raw payload into a fully-formed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Event. Return nil to skip the message. If Transform is nil, the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // payload is used as Event.Data with the defaults below. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Transform func(payload string) *Event | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Channel is the pub/sub channel to subscribe to. Required. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Channel string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Topic is the SSE topic forwarded events are tagged with. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Defaults to Channel if empty. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Topic string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // EventType is the SSE event: field set on forwarded events. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| EventType string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // CoalesceKey for PriorityCoalesced events. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CoalesceKey string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TTL for forwarded events. Zero means no expiration. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TTL time.Duration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Priority for forwarded events. PriorityInstant (0) is the default. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Priority Priority | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // runBridge consumes a single BridgeConfig, publishing incoming payloads | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // until ctx is canceled. Retries on Subscribe errors with bridgeRetryDelay. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // A nil Subscriber is a programming error caught at hub startup (see | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // NewWithHub) so runBridge assumes cfg.Subscriber is non-nil. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (h *Hub) runBridge(ctx context.Context, cfg BridgeConfig) { //nolint:gocritic // hugeParam: value semantics preferred | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic := cfg.Topic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if topic == "" { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic = cfg.Channel | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Wrap the callback in a recover so a panic inside the caller- | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // supplied Transform can't tear down the bridge goroutine (which | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // would leak h.bridges.Done() and hang Shutdown forever). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| defer func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if r := recover(); r != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Errorf("sse: bridge transform panic, message dropped channel=%s panic=%v", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg.Channel, r) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| h.Publish(*event) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if ctx.Err() != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Any early return β error or unexpected nil from a well-behaved | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // subscriber β is treated as retryable. Without the backoff on | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // nil, a misbehaving subscriber that returns immediately would | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // spin this loop hot. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logBridgeError(cfg.Channel, err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-time.After(bridgeRetryDelay): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+78
to
+106
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Throttle unexpected nil returns from If a subscriber returns Proposed fix err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
if event := h.buildBridgeEvent(&cfg, topic, payload); event != nil {
h.Publish(*event)
}
})
- if err != nil && ctx.Err() == nil {
- logBridgeError(cfg.Channel, err)
+ if ctx.Err() == nil {
+ if err != nil {
+ logBridgeError(cfg.Channel, err)
+ } else {
+ logBridgeError(cfg.Channel, context.Canceled)
+ }
select {
case <-time.After(bridgeRetryDelay):
case <-ctx.Done():
return
}
}π Committable suggestion
Suggested change
π€ Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // buildBridgeEvent creates an Event from a raw pub/sub payload. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // When Transform is set, the transform function controls all event fields; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // only missing Topics and Type are filled from config defaults. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // When Transform is not set, the event is built entirely from config defaults. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (*Hub) buildBridgeEvent(cfg *BridgeConfig, topic, payload string) *Event { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if cfg.Transform != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| transformed := cfg.Transform(payload) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if transformed == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event := *transformed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if len(event.Topics) == 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event.Topics = []string{topic} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if event.Type == "" { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event.Type = cfg.EventType | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return &event | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return &Event{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Type: cfg.EventType, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Data: payload, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Topics: []string{topic}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Priority: cfg.Priority, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CoalesceKey: cfg.CoalesceKey, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TTL: cfg.TTL, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // logBridgeError logs a bridge subscriber error. Retries continue after | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // bridgeRetryDelay regardless of error type. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func logBridgeError(channel string, err error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Warnf("sse: bridge subscriber error, retrying channel=%s error=%v", channel, err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π§© Analysis chain
π Script executed:
# Check file size first wc -l docs/whats_new.mdRepository: gofiber/fiber
Length of output: 79
π Script executed:
Repository: gofiber/fiber
Length of output: 1000
π Script executed:
Repository: gofiber/fiber
Length of output: 1793
π Script executed:
Repository: gofiber/fiber
Length of output: 125
π Script executed:
Repository: gofiber/fiber
Length of output: 695
π Script executed:
Repository: gofiber/fiber
Length of output: 284
Move the SSE section to the main content area before the Addons section.
The SSE middleware is described as a new feature, not a migration guide, yet it is currently nested under the migration guide's
### 𧬠Middlewaressection (line 2794). However, the main TOC lists it as part of the top-level Middlewares section. This mismatch will cause incorrect navigation. Move the SSE section before## π Addons(line 1695) to align with the TOC structure, or relocate it to the main Middlewares section if one exists before the Addons section. After updating, runmake markdownto lint the file per coding guidelines.π€ Prompt for AI Agents