-
-
Notifications
You must be signed in to change notification settings - Fork 2k
🔥 feat: add lightweight SSE middleware #4239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
5a8a4f1
🔥 feat: add lightweight SSE middleware
ReneWerner87 1aacd75
fix: address SSE review feedback
ReneWerner87 cfe7659
test: cover SSE edge cases
ReneWerner87 a039e4a
fix: preserve late SSE stream errors
ReneWerner87 301ca87
test: cover SSE close and writer errors
ReneWerner87 1ff8f6d
fix: address SSE coverage review
ReneWerner87 dcd817d
fix: refine SSE event field handling
ReneWerner87 7228535
Merge branch 'main' into codex/sse-middleware
ReneWerner87 8333370
Merge branch 'main' into codex/sse-middleware
ReneWerner87 e71b08f
fix: address SSE review comments
ReneWerner87 f0c3543
fix: refine SSE stream lifecycle handling
ReneWerner87 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| --- | ||
| id: sse | ||
| --- | ||
|
|
||
| # SSE | ||
|
|
||
| The SSE middleware provides the transport pieces for Server-Sent Events: response headers, event formatting, flushing, heartbeat comments, and disconnect detection through `Flush` errors. | ||
|
|
||
| It intentionally does not include a hub, topics, authentication, replay storage, metrics, or external pub/sub bridges. Those are application concerns that can be composed around the stream handler. | ||
|
|
||
| ## Signatures | ||
|
|
||
| ```go | ||
| func New(config ...Config) fiber.Handler | ||
| ``` | ||
|
|
||
| ## Examples | ||
|
|
||
| Import the middleware package: | ||
|
|
||
| ```go | ||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| "github.com/gofiber/fiber/v3" | ||
| "github.com/gofiber/fiber/v3/middleware/sse" | ||
| ) | ||
| ``` | ||
|
|
||
| Once your Fiber app is initialized, mount an SSE endpoint like this: | ||
|
|
||
| ```go | ||
| app.Get("/events", sse.New(sse.Config{ | ||
| Retry: 5 * time.Second, | ||
| Handler: func(c fiber.Ctx, stream *sse.Stream) error { | ||
| return stream.Event(sse.Event{ | ||
| Name: "message", | ||
| Data: fiber.Map{"message": "hello"}, | ||
| }) | ||
| }, | ||
| })) | ||
| ``` | ||
|
|
||
| For long-running streams, subscribe each client to its own event channel and stop when the client disconnects. | ||
| A single shared channel load-balances messages across clients; use a fan-out source when every client must receive every event: | ||
|
|
||
| ```go | ||
| type Broker interface { | ||
| Subscribe(ctx context.Context) (<-chan string, error) | ||
| } | ||
|
|
||
| app.Get("/events", sse.New(sse.Config{ | ||
| Handler: func(c fiber.Ctx, stream *sse.Stream) error { | ||
| events, err := broker.Subscribe(stream.Context()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case msg, ok := <-events: | ||
| if !ok { | ||
| return nil | ||
| } | ||
| if err := stream.Event(sse.Event{Name: "message", Data: msg}); err != nil { | ||
| return err | ||
| } | ||
| case <-stream.Done(): | ||
| return stream.Err() | ||
| } | ||
| } | ||
| }, | ||
| })) | ||
| ``` | ||
|
|
||
| `stream.Context()` is canceled when the stream ends or a write fails, which makes it convenient to pass into database, broker, or gRPC calls: | ||
|
|
||
| ```go | ||
| app.Get("/events", sse.New(sse.Config{ | ||
| Handler: func(c fiber.Ctx, stream *sse.Stream) error { | ||
| rows, err := db.QueryContext(stream.Context(), "SELECT id FROM jobs") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| return stream.Comment("connected") | ||
| }, | ||
| })) | ||
| ``` | ||
|
|
||
| ## Config | ||
|
|
||
| | Property | Type | Description | Default | | ||
| |:------------------|:-----------------------------|:----------------------------------------------|:--------------------| | ||
| | Next | `func(fiber.Ctx) bool` | Skip when the function returns `true`. | `nil` | | ||
| | Handler | `sse.Handler` | Writes events to the stream. | `nil` | | ||
| | OnClose | `func(fiber.Ctx, error)` | Called when the stream ends, with `nil` when the handler returned successfully and no stream write failed. | `nil` | | ||
| | Retry | `time.Duration` | Initial EventSource reconnect delay. | `0` | | ||
| | HeartbeatInterval | `time.Duration` | Interval for SSE comment heartbeats. | `15 * time.Second` | | ||
| | DisableHeartbeat | `bool` | Disable automatic heartbeat comments. When disabled, disconnected clients may not be detected until the next write. | `false` | | ||
|
|
||
| ## Default Config | ||
|
|
||
| ```go | ||
| var ConfigDefault = Config{ | ||
| Next: nil, | ||
| Handler: nil, | ||
| OnClose: nil, | ||
| Retry: 0, | ||
| HeartbeatInterval: 15 * time.Second, | ||
| DisableHeartbeat: false, | ||
| } | ||
| ``` | ||
|
|
||
| ## Stream | ||
|
|
||
| ```go | ||
| func (s *Stream) Event(event Event) error | ||
| func (s *Stream) Comment(comment string) error | ||
| func (s *Stream) Retry(retry time.Duration) error | ||
| func (s *Stream) Context() context.Context | ||
| func (s *Stream) Done() <-chan struct{} | ||
| func (s *Stream) Err() error | ||
| func (s *Stream) LastEventID() string | ||
| ``` | ||
|
|
||
| Every write is flushed. A failed flush closes `Done`, stores the error returned by `Err`, and lets the handler stop without relying on `fasthttp.RequestCtx.Done`, which is not a per-client disconnect signal. After a normal handler return, `Done` is closed and `Context()` is canceled while `Err()` remains `nil`; writes after that return `sse: stream closed`. | ||
|
|
||
| Automatic heartbeat comments keep idle streams active and make silent client disconnects observable through the next flush error. If heartbeats are disabled, a handler waiting on an external source might not notice a disconnected client until it writes again. Stopping a stream waits for an in-flight heartbeat write to finish, so a very slow client can delay shutdown until the underlying write unblocks. | ||
|
|
||
| `Config.Retry` sends the initial reconnect delay when the stream opens. `Event.Retry` changes the reconnect delay for a specific event, following the SSE wire format. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| package sse | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/gofiber/fiber/v3" | ||
| ) | ||
|
|
||
| // Handler writes events to a single SSE stream. | ||
| type Handler func(c fiber.Ctx, stream *Stream) error | ||
|
|
||
| // Config defines the config for middleware. | ||
| type Config struct { | ||
| // Next defines a function to skip this middleware when returned true. | ||
| // | ||
| // Optional. Default: nil | ||
| Next func(c fiber.Ctx) bool | ||
|
|
||
| // Handler writes events to the stream. | ||
| // | ||
| // Required. | ||
| Handler Handler | ||
|
|
||
| // OnClose is called after the stream handler returns or the client disconnects. | ||
| // | ||
| // Optional. Default: nil | ||
| OnClose func(c fiber.Ctx, err error) | ||
|
|
||
| // Retry controls the reconnection delay sent to clients. | ||
| // Values less than or equal to zero disable the initial retry field. | ||
| // | ||
| // Optional. Default: 0 | ||
| Retry time.Duration | ||
|
|
||
| // HeartbeatInterval controls comment heartbeats used to keep intermediaries | ||
| // from closing idle streams and to detect disconnected clients. | ||
| // When DisableHeartbeat is false, values less than or equal to zero are | ||
| // replaced by the default interval. | ||
| // | ||
| // Optional. Default: 15 * time.Second | ||
| HeartbeatInterval time.Duration | ||
|
|
||
| // DisableHeartbeat disables automatic comment heartbeats. | ||
| // | ||
| // Optional. Default: false | ||
| DisableHeartbeat bool | ||
| } | ||
|
|
||
| // ConfigDefault is the default config. | ||
| var ConfigDefault = Config{ | ||
| Next: nil, | ||
| Handler: nil, | ||
| OnClose: nil, | ||
| Retry: 0, | ||
| HeartbeatInterval: 15 * time.Second, | ||
| DisableHeartbeat: false, | ||
| } | ||
|
|
||
| // Helper function to set default values. | ||
| func configDefault(config ...Config) Config { | ||
| if len(config) < 1 { | ||
| return ConfigDefault | ||
| } | ||
|
|
||
| cfg := config[0] | ||
| if !cfg.DisableHeartbeat && cfg.HeartbeatInterval <= 0 { | ||
| cfg.HeartbeatInterval = ConfigDefault.HeartbeatInterval | ||
| } | ||
| return cfg | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| package sse | ||
|
|
||
| const mimeTextEventStream = "text/event-stream" |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SSE is a terminal middleware in nature, since
c.SendStreamWriter()doesn't run until after all handlers in the call chain return.In this case, I think
Next()would convey that you can usefiber.Ctxafter the middleware when it would lead to undefined behavior.To avoid confusion, I think we should remove this from the SSE config.