-
-
Notifications
You must be signed in to change notification settings - Fork 2k
🔥 feat: add lightweight SSE middleware #4239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
5a8a4f1
1aacd75
cfe7659
a039e4a
301ca87
1ff8f6d
dcd817d
7228535
8333370
e71b08f
f0c3543
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| --- | ||
| id: sse | ||
| --- | ||
|
|
||
| # SSE | ||
|
|
||
| The SSE middleware provides the transport pieces for Server-Sent Events: response headers, event formatting, flushing, heartbeat comments, and disconnect detection through `Flush` errors. | ||
|
|
||
| It intentionally does not include a hub, topics, authentication, replay storage, metrics, or external pub/sub bridges. Those are application concerns that can be composed around the stream handler. | ||
|
|
||
| ## Signatures | ||
|
|
||
| ```go | ||
| func New(config ...Config) fiber.Handler | ||
| ``` | ||
|
|
||
| ## Examples | ||
|
|
||
| Import the middleware package: | ||
|
|
||
| ```go | ||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| "github.com/gofiber/fiber/v3" | ||
| "github.com/gofiber/fiber/v3/middleware/sse" | ||
| ) | ||
| ``` | ||
|
|
||
| Once your Fiber app is initialized, mount an SSE endpoint like this: | ||
|
|
||
| ```go | ||
| app.Get("/events", sse.New(sse.Config{ | ||
| Retry: 5 * time.Second, | ||
| Handler: func(c fiber.Ctx, stream *sse.Stream) error { | ||
| return stream.Event(sse.Event{ | ||
| Name: "message", | ||
| Data: fiber.Map{"message": "hello"}, | ||
| }) | ||
| }, | ||
| })) | ||
| ``` | ||
|
|
||
| For long-running streams, subscribe each client to its own event channel and stop when the client disconnects. | ||
| A single shared channel load-balances messages across clients; use a fan-out source when every client must receive every event: | ||
|
|
||
| ```go | ||
| type Broker interface { | ||
| Subscribe(ctx context.Context) (<-chan string, error) | ||
| } | ||
|
|
||
| app.Get("/events", sse.New(sse.Config{ | ||
| Handler: func(c fiber.Ctx, stream *sse.Stream) error { | ||
| events, err := broker.Subscribe(stream.Context()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case msg, ok := <-events: | ||
| if !ok { | ||
| return nil | ||
| } | ||
| if err := stream.Event(sse.Event{Name: "message", Data: msg}); err != nil { | ||
| return err | ||
| } | ||
| case <-stream.Done(): | ||
| return stream.Err() | ||
| } | ||
| } | ||
| }, | ||
| })) | ||
| ``` | ||
|
|
||
| `stream.Context()` is canceled when the stream ends or a write fails, which makes it convenient to pass into database, broker, or gRPC calls: | ||
|
|
||
| ```go | ||
| app.Get("/events", sse.New(sse.Config{ | ||
| Handler: func(c fiber.Ctx, stream *sse.Stream) error { | ||
| rows, err := db.QueryContext(stream.Context(), "SELECT id FROM jobs") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| return stream.Comment("connected") | ||
| }, | ||
| })) | ||
| ``` | ||
|
|
||
| ## Config | ||
|
|
||
| | Property | Type | Description | Default | | ||
| |:------------------|:-----------------------------|:----------------------------------------------|:--------------------| | ||
| | Next | `func(fiber.Ctx) bool` | Skip when the function returns `true`. | `nil` | | ||
| | Handler | `sse.Handler` | Writes events to the stream. | `nil` | | ||
| | OnClose | `func(fiber.Ctx, error)` | Called when the stream ends, with `nil` when the handler returned successfully and no stream write failed. | `nil` | | ||
| | Retry | `time.Duration` | Initial EventSource reconnect delay. | `0` | | ||
| | HeartbeatInterval | `time.Duration` | Interval for SSE comment heartbeats. | `15 * time.Second` | | ||
| | DisableHeartbeat | `bool` | Disable automatic heartbeat comments. | `false` | | ||
|
|
||
| ## Default Config | ||
|
|
||
| ```go | ||
| var ConfigDefault = Config{ | ||
| Next: nil, | ||
| Handler: nil, | ||
| OnClose: nil, | ||
| Retry: 0, | ||
| HeartbeatInterval: 15 * time.Second, | ||
| DisableHeartbeat: false, | ||
| } | ||
| ``` | ||
|
|
||
| ## Stream | ||
|
|
||
| ```go | ||
| func (s *Stream) Event(event Event) error | ||
| func (s *Stream) Comment(comment string) error | ||
| func (s *Stream) Retry(retry time.Duration) error | ||
| func (s *Stream) Context() context.Context | ||
| func (s *Stream) Done() <-chan struct{} | ||
| func (s *Stream) Err() error | ||
| func (s *Stream) LastEventID() string | ||
| ``` | ||
|
|
||
| Every write is flushed. A failed flush closes `Done`, stores the error returned by `Err`, and lets the handler stop without relying on `fasthttp.RequestCtx.Done`, which is not a per-client disconnect signal. After a normal handler return, `Done` is closed and `Context()` is canceled while `Err()` remains `nil`; writes after that return `sse: stream closed`. | ||
|
|
||
| `Config.Retry` sends the initial reconnect delay when the stream opens. `Event.Retry` changes the reconnect delay for a specific event, following the SSE wire format. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| package sse | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/gofiber/fiber/v3" | ||
| ) | ||
|
|
||
| // Handler writes events to a single SSE stream. | ||
| type Handler func(c fiber.Ctx, stream *Stream) error | ||
|
|
||
| // Config defines the config for middleware. | ||
| type Config struct { | ||
| // Next defines a function to skip this middleware when returned true. | ||
| // | ||
| // Optional. Default: nil | ||
| Next func(c fiber.Ctx) bool | ||
|
Comment on lines
+14
to
+17
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SSE is a terminal middleware in nature, since In this case, I think To avoid confusion, I think we should remove this from the SSE config. |
||
|
|
||
| // Handler writes events to the stream. | ||
| // | ||
| // Required. | ||
| Handler Handler | ||
|
|
||
| // OnClose is called after the stream handler returns or the client disconnects. | ||
| // | ||
| // Optional. Default: nil | ||
| OnClose func(c fiber.Ctx, err error) | ||
|
|
||
| // Retry controls the reconnection delay sent to clients. | ||
| // Values less than or equal to zero disable the initial retry field. | ||
| // | ||
| // Optional. Default: 0 | ||
| Retry time.Duration | ||
|
|
||
| // HeartbeatInterval controls comment heartbeats used to keep intermediaries | ||
| // from closing idle streams and to detect disconnected clients. | ||
| // When DisableHeartbeat is false, values less than or equal to zero are | ||
| // replaced by the default interval. | ||
| // | ||
| // Optional. Default: 15 * time.Second | ||
| HeartbeatInterval time.Duration | ||
|
|
||
| // DisableHeartbeat disables automatic comment heartbeats. | ||
| // | ||
| // Optional. Default: false | ||
| DisableHeartbeat bool | ||
| } | ||
|
|
||
| // ConfigDefault is the default config. | ||
| var ConfigDefault = Config{ | ||
| Next: nil, | ||
| Handler: nil, | ||
| OnClose: nil, | ||
| Retry: 0, | ||
| HeartbeatInterval: 15 * time.Second, | ||
| DisableHeartbeat: false, | ||
| } | ||
|
|
||
| // Helper function to set default values. | ||
| func configDefault(config ...Config) Config { | ||
| if len(config) < 1 { | ||
| return ConfigDefault | ||
| } | ||
|
|
||
| cfg := config[0] | ||
| if !cfg.DisableHeartbeat && cfg.HeartbeatInterval <= 0 { | ||
| cfg.HeartbeatInterval = ConfigDefault.HeartbeatInterval | ||
| } | ||
| return cfg | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.