Skip to content
119 changes: 119 additions & 0 deletions docs/middleware/sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
id: sse
---

# SSE

The SSE middleware provides the transport pieces for Server-Sent Events: response headers, event formatting, flushing, heartbeat comments, and disconnect detection through `Flush` errors.

It intentionally does not include a hub, topics, authentication, replay storage, metrics, or external pub/sub bridges. Those are application concerns that can be composed around the stream handler.

## Signatures

```go
func New(config ...Config) fiber.Handler
```

## Examples

Import the middleware package:

```go
import (
"time"

"github.com/gofiber/fiber/v3"
"github.com/gofiber/fiber/v3/middleware/sse"
)
```

Once your Fiber app is initialized, mount an SSE endpoint like this:

```go
app.Get("/events", sse.New(sse.Config{
Retry: 5 * time.Second,
Handler: func(c fiber.Ctx, stream *sse.Stream) error {
return stream.Event(sse.Event{
Name: "message",
Data: fiber.Map{"message": "hello"},
})
},
}))
```

For long-running streams, wait on your own event source and stop when the client disconnects:

```go
events := make(chan string)

app.Get("/events", sse.New(sse.Config{
Handler: func(c fiber.Ctx, stream *sse.Stream) error {
for {
select {
case msg := <-events:
if err := stream.Event(sse.Event{Name: "message", Data: msg}); err != nil {
return err
}
case <-stream.Done():
return stream.Err()
}
}
},
}))
```

`stream.Context()` is canceled when the stream ends or a write fails, which makes it convenient to pass into database, broker, or gRPC calls:

```go
app.Get("/events", sse.New(sse.Config{
Handler: func(c fiber.Ctx, stream *sse.Stream) error {
rows, err := db.QueryContext(stream.Context(), "SELECT id FROM jobs")
if err != nil {
return err
}
defer rows.Close()

return stream.Comment("connected")
},
}))
```

## Config

| Property | Type | Description | Default |
|:------------------|:-----------------------------|:----------------------------------------------|:--------------------|
| Next | `func(fiber.Ctx) bool` | Skip when the function returns `true`. | `nil` |
| Handler | `sse.Handler` | Writes events to the stream. | `nil` |
| OnClose | `func(fiber.Ctx, error)` | Called when the stream ends, with `nil` when the handler returned successfully and no stream write failed. | `nil` |
| Retry | `time.Duration` | Initial EventSource reconnect delay. | `0` |
| HeartbeatInterval | `time.Duration` | Interval for SSE comment heartbeats. | `15 * time.Second` |
| DisableHeartbeat | `bool` | Disable automatic heartbeat comments. | `false` |

## Default Config

```go
var ConfigDefault = Config{
Next: nil,
Handler: nil,
OnClose: nil,
Retry: 0,
HeartbeatInterval: 15 * time.Second,
DisableHeartbeat: false,
}
```

## Stream

```go
func (s *Stream) Event(event Event) error
func (s *Stream) Comment(comment string) error
func (s *Stream) Retry(retry time.Duration) error
func (s *Stream) Context() context.Context
func (s *Stream) Done() <-chan struct{}
func (s *Stream) Err() error
func (s *Stream) LastEventID() string
```

Every write is flushed. A failed flush closes `Done`, stores the error returned by `Err`, and lets the handler stop without relying on `fasthttp.RequestCtx.Done`, which is not a per-client disconnect signal. After a normal handler return, `Done` and `Context()` are closed while `Err()` remains `nil`; writes after that return `sse: stream closed`.
Comment thread
ReneWerner87 marked this conversation as resolved.
Outdated

`Config.Retry` sends the initial reconnect delay when the stream opens. `Event.Retry` changes the reconnect delay for a specific event, following the SSE wire format.
8 changes: 8 additions & 0 deletions docs/whats_new.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Here's a quick overview of the changes in Fiber `v3`:
- [Proxy](#proxy)
- [Recover](#recover)
- [Session](#session)
- [SSE](#sse)
- [🔌 Addons](#-addons)
- [📋 Migration guide](#-migration-guide)

Expand Down Expand Up @@ -1680,6 +1681,13 @@ The session middleware has undergone significant improvements in v3, focusing on

For more details on these changes and migration instructions, check the [Session Middleware Migration Guide](./middleware/session.md#migration-guide).

### SSE

Fiber now includes a small [SSE middleware](./middleware/sse.md) for Server-Sent Events. It handles native
Comment thread
ReneWerner87 marked this conversation as resolved.
Outdated
`SendStreamWriter` setup, SSE response headers, event formatting, flushing, heartbeat comments, and
disconnect detection through flush errors while leaving application-level hubs, topics, replay stores, and
pub/sub bridges to user code or recipes.

### Timeout

The timeout middleware is now configurable. A new `Config` struct allows customizing the timeout duration, defining a handler that runs when a timeout occurs, and specifying errors to treat as timeouts. The `New` function now accepts a `Config` value instead of a duration.
Expand Down
70 changes: 70 additions & 0 deletions middleware/sse/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package sse

import (
"time"

"github.com/gofiber/fiber/v3"
)

// Handler writes events to a single SSE stream.
type Handler func(c fiber.Ctx, stream *Stream) error

// Config defines the config for middleware.
type Config struct {
// Next defines a function to skip this middleware when returned true.
//
// Optional. Default: nil
Next func(c fiber.Ctx) bool
Comment on lines +14 to +17
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSE is a terminal middleware in nature, since c.SendStreamWriter() doesn't run until after all handlers in the call chain return.

In this case, I think Next() would convey that you can use fiber.Ctx after the middleware when it would lead to undefined behavior.

To avoid confusion, I think we should remove this from the SSE config.


// Handler writes events to the stream.
//
// Required.
Handler Handler

// OnClose is called after the stream handler returns or the client disconnects.
//
// Optional. Default: nil
OnClose func(c fiber.Ctx, err error)

// Retry controls the reconnection delay sent to clients.
// Values less than or equal to zero disable the initial retry field.
//
// Optional. Default: 0
Retry time.Duration

// HeartbeatInterval controls comment heartbeats used to keep intermediaries
// from closing idle streams and to detect disconnected clients.
// When DisableHeartbeat is false, values less than or equal to zero are
// replaced by the default interval.
//
// Optional. Default: 15 * time.Second
HeartbeatInterval time.Duration

// DisableHeartbeat disables automatic comment heartbeats.
//
// Optional. Default: false
DisableHeartbeat bool
}

// ConfigDefault is the default config.
var ConfigDefault = Config{
Next: nil,
Handler: nil,
OnClose: nil,
Retry: 0,
HeartbeatInterval: 15 * time.Second,
DisableHeartbeat: false,
}

// Helper function to set default values.
func configDefault(config ...Config) Config {
if len(config) < 1 {
return ConfigDefault
}

cfg := config[0]
if !cfg.DisableHeartbeat && cfg.HeartbeatInterval <= 0 {
cfg.HeartbeatInterval = ConfigDefault.HeartbeatInterval
}
return cfg
}
137 changes: 137 additions & 0 deletions middleware/sse/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package sse

import (
"bufio"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/gofiber/utils/v2"
)

var errInvalidField = errors.New("field must not contain CR or LF")

// Event defines a single Server-Sent Event frame.
type Event struct {
// Data is written as one or more data fields. Strings and byte slices are
// written as-is; other values are JSON encoded.
Data any

// ID sets the SSE id field.
ID string

// Name sets the SSE event field.
Name string

// Retry sets the SSE retry field for this event.
Retry time.Duration
}

func writeEvent(w *bufio.Writer, event Event) error {
if event.ID != "" {
id, err := sanitizeField(event.ID)
if err != nil {
return fmt.Errorf("sse: invalid id: %w", err)
}
if _, err := fmt.Fprintf(w, "id: %s\n", id); err != nil {
return fmt.Errorf("sse: write id: %w", err)
}
}
if event.Name != "" {
name, err := sanitizeField(event.Name)
if err != nil {
return fmt.Errorf("sse: invalid event: %w", err)
}
if _, err := fmt.Fprintf(w, "event: %s\n", name); err != nil {
return fmt.Errorf("sse: write event: %w", err)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if event.Retry > 0 {
if _, err := fmt.Fprintf(w, "retry: %d\n", event.Retry.Milliseconds()); err != nil {
return fmt.Errorf("sse: write retry: %w", err)
}
}

data, err := eventData(event.Data)
if err != nil {
return err
}
if err := writeData(w, data); err != nil {
return err
}
if _, err := w.WriteString("\n"); err != nil {
return fmt.Errorf("sse: finish event: %w", err)
}
return nil
}

func writeComment(w *bufio.Writer, comment string) error {
comment = sanitizeComment(comment)
if comment == "" {
if _, err := w.WriteString(":\n\n"); err != nil {
return fmt.Errorf("sse: write heartbeat: %w", err)
}
return nil
}
for line := range strings.SplitSeq(comment, "\n") {
if _, err := fmt.Fprintf(w, ": %s\n", line); err != nil {
return fmt.Errorf("sse: write comment: %w", err)
}
}
if _, err := w.WriteString("\n"); err != nil {
return fmt.Errorf("sse: finish comment: %w", err)
}
return nil
}

func eventData(data any) (string, error) {
switch value := data.(type) {
case nil:
return "", nil
case string:
return value, nil
case []byte:
return string(value), nil
case json.RawMessage:
return string(value), nil
default:
encoded, err := json.Marshal(value)
Comment thread
ReneWerner87 marked this conversation as resolved.
Outdated
if err != nil {
return "", fmt.Errorf("sse: marshal data: %w", err)
}
return string(encoded), nil
}
}

func writeData(w *bufio.Writer, data string) error {
data = normalizeNewlines(data)
for line := range strings.SplitSeq(data, "\n") {
if _, err := fmt.Fprintf(w, "data: %s\n", line); err != nil {
return fmt.Errorf("sse: write data: %w", err)
}
}
return nil
}

func sanitizeField(value string) (string, error) {
if strings.ContainsAny(value, "\r\n") {
return "", errInvalidField
}
return utils.Trim(value, ' '), nil
}

func sanitizeComment(value string) string {
value = normalizeNewlines(value)
lines := make([]string, 0, strings.Count(value, "\n")+1)
for line := range strings.SplitSeq(value, "\n") {
lines = append(lines, utils.Trim(line, ' '))
}
return strings.Join(lines, "\n")
}

func normalizeNewlines(value string) string {
value = strings.ReplaceAll(value, "\r\n", "\n")
return strings.ReplaceAll(value, "\r", "\n")
}
Loading
Loading