Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ For full documentation, visit https://docs.coldbrew.cloud
- [type CBStopper](<#CBStopper>)
- [type CBWorkerProvider](<#CBWorkerProvider>)
- [type OTLPConfig](<#OTLPConfig>)
- [type SSEMarshaler](<#SSEMarshaler>)
- [func \(\*SSEMarshaler\) ContentType\(\_ any\) string](<#SSEMarshaler.ContentType>)
- [func \(\*SSEMarshaler\) Delimiter\(\) \[\]byte](<#SSEMarshaler.Delimiter>)
- [func \(s \*SSEMarshaler\) Marshal\(v any\) \(\[\]byte, error\)](<#SSEMarshaler.Marshal>)
- [func \(\*SSEMarshaler\) NewDecoder\(\_ io.Reader\) runtime.Decoder](<#SSEMarshaler.NewDecoder>)
- [func \(s \*SSEMarshaler\) NewEncoder\(w io.Writer\) runtime.Encoder](<#SSEMarshaler.NewEncoder>)
- [func \(\*SSEMarshaler\) StreamContentType\(\_ any\) string](<#SSEMarshaler.StreamContentType>)
- [func \(\*SSEMarshaler\) Unmarshal\(\_ \[\]byte, \_ any\) error](<#SSEMarshaler.Unmarshal>)


## Constants
Expand Down Expand Up @@ -506,4 +514,98 @@ type OTLPConfig struct {
}
```

<a name="SSEMarshaler"></a>
## type [SSEMarshaler](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L37-L39>)

SSEMarshaler is a runtime.Marshaler that emits Server\-Sent Events \(text/event\-stream\) frames for server\-streaming gateway RPCs. It lets browser EventSource clients consume streaming RPCs directly — useful for AI/LLM token streaming and other long\-running progressive responses.

Each Marshal call returns "data: \<json\>" with no trailing newline; the Delimiter \("\\n\\n"\) terminates each SSE frame per the SSE spec. The JSON payload uses protojson via the embedded runtime.JSONPb, so field naming matches the gateway's default JSON responses.

Wire it up from a service's PreStart hook:

```
core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})
```
Comment on lines +526 to +528

@coderabbitai coderabbitai Bot May 17, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add language identifiers to fenced code blocks.

The two new fenced blocks are missing language tags (MD040) and can fail markdown lint.

Suggested fix
-```
+```go
 core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})

- +go
&core.SSEMarshaler{JSONPb: runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
}}

</details>


Also applies to: 536-540

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.1)</summary>

[warning] 526-526: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @README.md around lines 526 - 528, The README's fenced code blocks around
examples that include core.RegisterHTTPMarshaler, &core.SSEMarshaler{}, and the
&core.SSEMarshaler{JSONPb: runtime.JSONPb{...}} snippet lack language
identifiers and trigger MD040; update each of those triple-backtick blocks to
include the Go language tag (```go) so the examples for
core.RegisterHTTPMarshaler and the SSEMarshaler/JSONPb snippet are fenced as Go
code.


</details>

<!-- fingerprinting:phantom:triton:hawk -->

<!-- This is an auto-generated reply by CodeRabbit -->

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping for this PR. The README is auto-generated by gomarkdoc from the Go doc comments — those code samples render without language tags because gomarkdoc reformats them. MD040 is a lint warning, not a CI failure, and the right fix is upstream in gomarkdoc rather than hand-editing the regenerated README (which would just get clobbered on the next make doc).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ankurs, understood — makes complete sense. Patching a generated file would just create noise for the next make doc run, and the fix belongs in the gomarkdoc template/tooling rather than here.


✏️ Learnings added
Learnt from: ankurs
Repo: go-coldbrew/core PR: 92
File: README.md:526-528
Timestamp: 2026-05-28T16:52:53.133Z
Learning: In the go-coldbrew/core repository, `README.md` (and `config/README.md`) are auto-generated files produced by `gomarkdoc` from Go doc comments via `make doc`. They should **not** be hand-edited, as any manual changes will be clobbered on the next `make doc` invocation. Issues with the generated output (e.g., missing language tags on fenced code blocks / MD040 warnings) need to be fixed upstream in the gomarkdoc configuration or Go doc comment formatting, not directly in the README.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


Clients then opt in by sending Accept: text/event\-stream on the gateway URL. The newHTTPCompressionWrapper excludes text/event\-stream from gzip/zstd compression so frames reach the client in real time \(compressed SSE is buffered by many HTTP intermediaries\).

SSE is server\-to\-client only: Unmarshal and NewDecoder return an error.

Per\-field protojson options \(EmitUnpopulated, UseProtoNames, etc.\) can be set by initializing the embedded JSONPb directly:

```
&core.SSEMarshaler{JSONPb: runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
}}
```

```go
type SSEMarshaler struct {
runtime.JSONPb
}
```

<a name="SSEMarshaler.ContentType"></a>
### func \(\*SSEMarshaler\) [ContentType](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L48>)

```go
func (*SSEMarshaler) ContentType(_ any) string
```

ContentType always returns "text/event\-stream".

<a name="SSEMarshaler.Delimiter"></a>
### func \(\*SSEMarshaler\) [Delimiter](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L74>)

```go
func (*SSEMarshaler) Delimiter() []byte
```

Delimiter returns "\\n\\n", which terminates one SSE frame.

<a name="SSEMarshaler.Marshal"></a>
### func \(\*SSEMarshaler\) [Marshal](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L62>)

```go
func (s *SSEMarshaler) Marshal(v any) ([]byte, error)
```

Marshal returns "data: \<json\>" with no trailing newline. Frame termination is supplied by Delimiter; the gateway writes Marshal output followed by Delimiter for each streamed message.

<a name="SSEMarshaler.NewDecoder"></a>
### func \(\*SSEMarshaler\) [NewDecoder](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L86>)

```go
func (*SSEMarshaler) NewDecoder(_ io.Reader) runtime.Decoder
```

NewDecoder returns a decoder that always errors, for the same reason as Unmarshal.

<a name="SSEMarshaler.NewEncoder"></a>
### func \(\*SSEMarshaler\) [NewEncoder](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L94>)

```go
func (s *SSEMarshaler) NewEncoder(w io.Writer) runtime.Encoder
```

NewEncoder returns an encoder that writes "data: \<json\>\\n\\n" per Encode call.

<a name="SSEMarshaler.StreamContentType"></a>
### func \(\*SSEMarshaler\) [StreamContentType](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L55>)

```go
func (*SSEMarshaler) StreamContentType(_ any) string
```

StreamContentType matches ContentType so server\-streaming responses also advertise text/event\-stream. Gateway prefers this over ContentType when implemented \(see runtime.ForwardResponseStream\).

<a name="SSEMarshaler.Unmarshal"></a>
### func \(\*SSEMarshaler\) [Unmarshal](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L80>)

```go
func (*SSEMarshaler) Unmarshal(_ []byte, _ any) error
```

Unmarshal returns an error: SSE is a server\-to\-client format and the gateway never reads SSE bodies from inbound requests.

Generated by [gomarkdoc](<https://github.com/princjef/gomarkdoc>)
19 changes: 19 additions & 0 deletions compression.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
package core

import (
"mime"
"net/http"

"github.com/go-coldbrew/core/config"
"github.com/klauspost/compress/gzhttp"
)

// sseMediaType is the Content-Type advertised by SSEMarshaler and excluded
// from HTTP compression.
const sseMediaType = "text/event-stream"

// newHTTPCompressionWrapper builds the gzhttp wrapper used by initHTTP. It
// negotiates gzip and (unless disabled) zstd from Accept-Encoding. Pulled out
// so it can be tested without standing up the full gateway.
//
// text/event-stream is excluded via excludeSSEContentTypeFilter — proxies
// and CDNs buffer compressed SSE responses, defeating real-time delivery.
func newHTTPCompressionWrapper(cfg config.Config) (func(http.Handler) http.HandlerFunc, error) {
return gzhttp.NewWrapper(
gzhttp.MinSize(cfg.HTTPCompressionMinSize),
gzhttp.EnableZstd(!cfg.DisableZstdCompression),
gzhttp.PreferZstd(!cfg.DisableZstdCompression && cfg.PreferZstd),
gzhttp.ContentTypeFilter(excludeSSEContentTypeFilter),
)
}

// excludeSSEContentTypeFilter wraps gzhttp.DefaultContentTypeFilter to also
// exclude text/event-stream, so SSE frames are delivered uncompressed and
// reach the client without intermediary buffering.
func excludeSSEContentTypeFilter(ct string) bool {
if mediaType, _, err := mime.ParseMediaType(ct); err == nil && mediaType == sseMediaType {
return false
}
return gzhttp.DefaultContentTypeFilter(ct)
}
46 changes: 46 additions & 0 deletions compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,52 @@ func TestNewHTTPCompressionWrapper_NegotiatesEncoding(t *testing.T) {
}
}

func TestNewHTTPCompressionWrapper_ExcludesEventStream(t *testing.T) {
// SSE responses must never be compressed: intermediaries (proxies, CDNs)
// buffer compressed event streams, which defeats real-time delivery for
// EventSource clients consuming streaming gateway RPCs.
cfg := config.Config{HTTPCompressionMinSize: 256, PreferZstd: true}
wrapper, err := newHTTPCompressionWrapper(cfg)
if err != nil {
t.Fatalf("newHTTPCompressionWrapper: %v", err)
}

body := strings.Repeat("data: payload\n\n", 256) // well above MinSize

cases := []struct {
name string
contentType string
wantEncoded bool
}{
{"plain-text-still-compresses", "text/plain", true},
{"sse-bare", "text/event-stream", false},
{"sse-with-charset", "text/event-stream; charset=utf-8", false},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", tc.contentType)
_, _ = w.Write([]byte(body))
})
wrapped := wrapper(handler)

req := httptest.NewRequest(http.MethodGet, "/", nil)
req.Header.Set("Accept-Encoding", "gzip, zstd")
rec := httptest.NewRecorder()
wrapped.ServeHTTP(rec, req)

encoding := rec.Header().Get("Content-Encoding")
if tc.wantEncoded && encoding == "" {
t.Fatalf("Content-Type %q: expected compression, got none", tc.contentType)
}
if !tc.wantEncoded && encoding != "" {
t.Fatalf("Content-Type %q: expected no compression, got %q", tc.contentType, encoding)
}
})
}
}

func TestNewHTTPCompressionWrapper_BelowMinSize(t *testing.T) {
cfg := config.Config{HTTPCompressionMinSize: 256, PreferZstd: true}
wrapper, err := newHTTPCompressionWrapper(cfg)
Expand Down
106 changes: 106 additions & 0 deletions marshaler_sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package core

import (
"errors"
"io"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
)

// SSEMarshaler is a runtime.Marshaler that emits Server-Sent Events
// (text/event-stream) frames for server-streaming gateway RPCs. It lets
// browser EventSource clients consume streaming RPCs directly — useful for
// AI/LLM token streaming and other long-running progressive responses.
//
// Each Marshal call returns "data: <json>" with no trailing newline; the
// Delimiter ("\n\n") terminates each SSE frame per the SSE spec. The JSON
// payload uses protojson via the embedded runtime.JSONPb, so field naming
// matches the gateway's default JSON responses.
//
// Wire it up from a service's PreStart hook:
//
// core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})
//
// Clients then opt in by sending Accept: text/event-stream on the gateway
// URL. The newHTTPCompressionWrapper excludes text/event-stream from
// gzip/zstd compression so frames reach the client in real time (compressed
// SSE is buffered by many HTTP intermediaries).
//
// SSE is server-to-client only: Unmarshal and NewDecoder return an error.
//
// Per-field protojson options (EmitUnpopulated, UseProtoNames, etc.) can be
// set by initializing the embedded JSONPb directly:
//
// &core.SSEMarshaler{JSONPb: runtime.JSONPb{
// MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
// }}
type SSEMarshaler struct {
runtime.JSONPb
}

var (
ssePrefix = []byte("data: ")
sseDelimiter = []byte("\n\n")
errSSEReadNotSupported = errors.New("core: SSEMarshaler does not support reading; Server-Sent Events is a server-to-client format")
)

// ContentType always returns "text/event-stream".
func (*SSEMarshaler) ContentType(_ any) string {
return sseMediaType
}

// StreamContentType matches ContentType so server-streaming responses also
// advertise text/event-stream. Gateway prefers this over ContentType when
// implemented (see runtime.ForwardResponseStream).
func (*SSEMarshaler) StreamContentType(_ any) string {
return sseMediaType
}

// Marshal returns "data: <json>" with no trailing newline. Frame
// termination is supplied by Delimiter; the gateway writes Marshal output
// followed by Delimiter for each streamed message.
func (s *SSEMarshaler) Marshal(v any) ([]byte, error) {
body, err := s.JSONPb.Marshal(v)
if err != nil {
return nil, err
}
out := make([]byte, 0, len(ssePrefix)+len(body))
out = append(out, ssePrefix...)
out = append(out, body...)
Comment on lines +103 to +105

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5d9cc5e — same change as the CodeRabbit comment above: Marshal now prefixes every line of the payload, not just the first, so multiline JSON (when JSONPb is configured with Multiline/Indent) stays SSE-spec-compliant. Default single-line JSON is unaffected.

return out, nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// Delimiter returns "\n\n", which terminates one SSE frame.
func (*SSEMarshaler) Delimiter() []byte {
return sseDelimiter

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5d9cc5e: Delimiter() now returns []byte("\n\n") freshly allocated per call, so callers cannot mutate framing for other SSEMarshaler instances. New test TestSSEMarshaler_DelimiterReturnsFreshSlice mutates the returned slice and verifies the next call still returns \n\n.

}

// Unmarshal returns an error: SSE is a server-to-client format and the
// gateway never reads SSE bodies from inbound requests.
func (*SSEMarshaler) Unmarshal(_ []byte, _ any) error {
return errSSEReadNotSupported
}

// NewDecoder returns a decoder that always errors, for the same reason as
// Unmarshal.
func (*SSEMarshaler) NewDecoder(_ io.Reader) runtime.Decoder {
return runtime.DecoderFunc(func(_ any) error {
return errSSEReadNotSupported
})
}

// NewEncoder returns an encoder that writes "data: <json>\n\n" per Encode
// call.
func (s *SSEMarshaler) NewEncoder(w io.Writer) runtime.Encoder {
return runtime.EncoderFunc(func(v any) error {
body, err := s.Marshal(v)
if err != nil {
return err
}
if _, err := w.Write(body); err != nil {
return err
}
_, err = w.Write(s.Delimiter())
return err
})
}
Loading
Loading