Skip to content
9 changes: 7 additions & 2 deletions ctx_interface_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 119 additions & 0 deletions docs/middleware/sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
id: sse
---

# SSE

The SSE middleware provides the transport pieces for Server-Sent Events: response headers, event formatting, flushing, heartbeat comments, and disconnect detection through `Flush` errors.

It intentionally does not include a hub, topics, authentication, replay storage, metrics, or external pub/sub bridges. Those are application concerns that can be composed around the stream handler.

## Signatures

```go
func New(config ...Config) fiber.Handler
```

## Examples

Import the middleware package:

```go
import (
"time"

"github.com/gofiber/fiber/v3"
"github.com/gofiber/fiber/v3/middleware/sse"
)
```

Once your Fiber app is initialized, mount an SSE endpoint like this:

```go
app.Get("/events", sse.New(sse.Config{
Retry: 5 * time.Second,
Handler: func(c fiber.Ctx, stream *sse.Stream) error {
return stream.Event(sse.Event{
Name: "message",
Data: fiber.Map{"message": "hello"},
})
},
}))
```

For long-running streams, wait on your own event source and stop when the client disconnects:

```go
events := make(chan string)

app.Get("/events", sse.New(sse.Config{
Handler: func(c fiber.Ctx, stream *sse.Stream) error {
for {
select {
case msg := <-events:
if err := stream.Event(sse.Event{Name: "message", Data: msg}); err != nil {
return err
}
case <-stream.Done():
return stream.Err()
}
}
},
}))
```

`stream.Context()` is canceled when the stream ends or a write fails, which makes it convenient to pass into database, broker, or gRPC calls:

```go
app.Get("/events", sse.New(sse.Config{
Handler: func(c fiber.Ctx, stream *sse.Stream) error {
rows, err := db.QueryContext(stream.Context(), "SELECT id FROM jobs")
if err != nil {
return err
}
defer rows.Close()

return stream.Comment("connected")
},
}))
```

## Config

| Property | Type | Description | Default |
|:------------------|:-----------------------------|:----------------------------------------------|:--------------------|
| Next | `func(fiber.Ctx) bool` | Skip when the function returns `true`. | `nil` |
| Handler | `sse.Handler` | Writes events to the stream. | `nil` |
| OnClose | `func(fiber.Ctx, error)` | Called when the stream ends, with `nil` when the handler returned successfully and no stream write failed. | `nil` |
| Retry | `time.Duration` | Initial EventSource reconnect delay. | `0` |
| HeartbeatInterval | `time.Duration` | Interval for SSE comment heartbeats. | `15 * time.Second` |
| DisableHeartbeat | `bool` | Disable automatic heartbeat comments. | `false` |

## Default Config

```go
var ConfigDefault = Config{
Next: nil,
Handler: nil,
OnClose: nil,
Retry: 0,
HeartbeatInterval: 15 * time.Second,
DisableHeartbeat: false,
}
```

## Stream

```go
func (s *Stream) Event(event Event) error
func (s *Stream) Comment(comment string) error
func (s *Stream) Retry(retry time.Duration) error
func (s *Stream) Context() context.Context
func (s *Stream) Done() <-chan struct{}
func (s *Stream) Err() error
func (s *Stream) LastEventID() string
```

Every write is flushed. A failed flush closes `Done`, stores the error returned by `Err`, and lets the handler stop without relying on `fasthttp.RequestCtx.Done`, which is not a per-client disconnect signal. After a normal handler return, `Done` is closed and `Context()` is canceled while `Err()` remains `nil`; writes after that return `sse: stream closed`.

`Config.Retry` sends the initial reconnect delay when the stream opens. `Event.Retry` changes the reconnect delay for a specific event, following the SSE wire format.
8 changes: 8 additions & 0 deletions docs/whats_new.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Here's a quick overview of the changes in Fiber `v3`:
- [Proxy](#proxy)
- [Recover](#recover)
- [Session](#session)
- [SSE](#sse)
- [🔌 Addons](#-addons)
- [📋 Migration guide](#-migration-guide)

Expand Down Expand Up @@ -1680,6 +1681,13 @@ The session middleware has undergone significant improvements in v3, focusing on

For more details on these changes and migration instructions, check the [Session Middleware Migration Guide](./middleware/session.md#migration-guide).

### SSE

Fiber now includes a small [SSE middleware](./middleware/sse.md) for Server-Sent Events. It handles native
Comment thread
ReneWerner87 marked this conversation as resolved.
Outdated
`SendStreamWriter` setup, SSE response headers, event formatting, flushing, heartbeat comments, and
disconnect detection through flush errors while leaving application-level hubs, topics, replay stores, and
pub/sub bridges to user code or recipes.

### Timeout

The timeout middleware is now configurable. A new `Config` struct allows customizing the timeout duration, defining a handler that runs when a timeout occurs, and specifying errors to treat as timeouts. The `New` function now accepts a `Config` value instead of a duration.
Expand Down
2 changes: 1 addition & 1 deletion middleware/limiter/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func Test_Limiter_Sliding_Window_RecalculatesAfterHandlerDelay(t *testing.T) {
require.Equal(t, fiber.StatusOK, resp.StatusCode)
}

time.Sleep(time.Second + 100*time.Millisecond)
time.Sleep(2*time.Second + 100*time.Millisecond)

resp, err := app.Test(httptest.NewRequest(fiber.MethodGet, "/", http.NoBody))
require.NoError(t, err)
Expand Down
70 changes: 70 additions & 0 deletions middleware/sse/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package sse

import (
"time"

"github.com/gofiber/fiber/v3"
)

// Handler writes events to a single SSE stream.
type Handler func(c fiber.Ctx, stream *Stream) error

// Config defines the config for middleware.
type Config struct {
// Next defines a function to skip this middleware when returned true.
//
// Optional. Default: nil
Next func(c fiber.Ctx) bool
Comment on lines +14 to +17
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSE is a terminal middleware in nature, since c.SendStreamWriter() doesn't run until after all handlers in the call chain return.

In this case, I think Next() would convey that you can use fiber.Ctx after the middleware when it would lead to undefined behavior.

To avoid confusion, I think we should remove this from the SSE config.


// Handler writes events to the stream.
//
// Required.
Handler Handler

// OnClose is called after the stream handler returns or the client disconnects.
//
// Optional. Default: nil
OnClose func(c fiber.Ctx, err error)

// Retry controls the reconnection delay sent to clients.
// Values less than or equal to zero disable the initial retry field.
//
// Optional. Default: 0
Retry time.Duration

// HeartbeatInterval controls comment heartbeats used to keep intermediaries
// from closing idle streams and to detect disconnected clients.
// When DisableHeartbeat is false, values less than or equal to zero are
// replaced by the default interval.
//
// Optional. Default: 15 * time.Second
HeartbeatInterval time.Duration

// DisableHeartbeat disables automatic comment heartbeats.
//
// Optional. Default: false
DisableHeartbeat bool
}

// ConfigDefault is the default config.
var ConfigDefault = Config{
Next: nil,
Handler: nil,
OnClose: nil,
Retry: 0,
HeartbeatInterval: 15 * time.Second,
DisableHeartbeat: false,
}

// Helper function to set default values.
func configDefault(config ...Config) Config {
if len(config) < 1 {
return ConfigDefault
}

cfg := config[0]
if !cfg.DisableHeartbeat && cfg.HeartbeatInterval <= 0 {
cfg.HeartbeatInterval = ConfigDefault.HeartbeatInterval
}
return cfg
}
Loading
Loading