Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 108 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ For full documentation, visit https://docs.coldbrew.cloud
- [type CBStopper](<#CBStopper>)
- [type CBWorkerProvider](<#CBWorkerProvider>)
- [type OTLPConfig](<#OTLPConfig>)
- [type SSEMarshaler](<#SSEMarshaler>)
- [func \(\*SSEMarshaler\) ContentType\(\_ any\) string](<#SSEMarshaler.ContentType>)
- [func \(\*SSEMarshaler\) Delimiter\(\) \[\]byte](<#SSEMarshaler.Delimiter>)
- [func \(s \*SSEMarshaler\) Marshal\(v any\) \(\[\]byte, error\)](<#SSEMarshaler.Marshal>)
- [func \(\*SSEMarshaler\) NewDecoder\(\_ io.Reader\) runtime.Decoder](<#SSEMarshaler.NewDecoder>)
- [func \(s \*SSEMarshaler\) NewEncoder\(w io.Writer\) runtime.Encoder](<#SSEMarshaler.NewEncoder>)
- [func \(\*SSEMarshaler\) StreamContentType\(\_ any\) string](<#SSEMarshaler.StreamContentType>)
- [func \(\*SSEMarshaler\) Unmarshal\(\_ \[\]byte, \_ any\) error](<#SSEMarshaler.Unmarshal>)


## Constants
Expand Down Expand Up @@ -183,7 +191,7 @@ RegisterServeMuxOption appends a runtime.ServeMuxOption that initHTTP passes to
Must be called before core.Run\(\) \(typically from a service's PreStart hook\). Not safe for concurrent registration.

<a name="SetOTELGRPCClientOptions"></a>
## func [SetOTELGRPCClientOptions](<https://github.com/go-coldbrew/core/blob/main/core.go#L624>)
## func [SetOTELGRPCClientOptions](<https://github.com/go-coldbrew/core/blob/main/core.go#L611>)

```go
func SetOTELGRPCClientOptions(opts ...otelgrpc.Option)
Expand All @@ -192,7 +200,7 @@ func SetOTELGRPCClientOptions(opts ...otelgrpc.Option)
Deprecated: Use SetOTELOptions instead. Only applies when OTEL\_USE\_LEGACY\_INSTRUMENTATION=true.

<a name="SetOTELGRPCServerOptions"></a>
## func [SetOTELGRPCServerOptions](<https://github.com/go-coldbrew/core/blob/main/core.go#L618>)
## func [SetOTELGRPCServerOptions](<https://github.com/go-coldbrew/core/blob/main/core.go#L605>)

```go
func SetOTELGRPCServerOptions(opts ...otelgrpc.Option)
Expand All @@ -201,7 +209,7 @@ func SetOTELGRPCServerOptions(opts ...otelgrpc.Option)
Deprecated: Use SetOTELOptions instead. Only applies when OTEL\_USE\_LEGACY\_INSTRUMENTATION=true.

<a name="SetOTELOptions"></a>
## func [SetOTELOptions](<https://github.com/go-coldbrew/core/blob/main/core.go#L631>)
## func [SetOTELOptions](<https://github.com/go-coldbrew/core/blob/main/core.go#L618>)

```go
func SetOTELOptions(opts grpcotel.Options)
Expand Down Expand Up @@ -360,7 +368,7 @@ type CB interface {
```

<a name="New"></a>
### func [New](<https://github.com/go-coldbrew/core/blob/main/core.go#L1031>)
### func [New](<https://github.com/go-coldbrew/core/blob/main/core.go#L1018>)

```go
func New(c config.Config) CB
Expand Down Expand Up @@ -506,4 +514,100 @@ type OTLPConfig struct {
}
```

<a name="SSEMarshaler"></a>
## type [SSEMarshaler](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L63-L65>)

SSEMarshaler is a runtime.Marshaler that emits Server\-Sent Events \(text/event\-stream\) frames for server\-streaming gateway RPCs. It lets browser EventSource clients consume streaming RPCs directly — useful for AI/LLM token streaming and other long\-running progressive responses.

Each Marshal call returns "data: \<json\>" with no trailing newline; the Delimiter \("\\n\\n"\) terminates each SSE frame per the SSE spec. The JSON payload uses protojson via the embedded runtime.JSONPb, so field naming matches the gateway's default JSON responses.

Wire it up from a service's PreStart hook:

```
core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})
```
Comment on lines +526 to +528

@coderabbitai coderabbitai Bot May 17, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add language identifiers to fenced code blocks.

The two new fenced blocks are missing language tags (MD040) and can fail markdown lint.

Suggested fix
-```
+```go
 core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})

- +go
&core.SSEMarshaler{JSONPb: runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
}}

</details>


Also applies to: 536-540

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.1)</summary>

[warning] 526-526: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @README.md around lines 526 - 528, The README's fenced code blocks around
examples that include core.RegisterHTTPMarshaler, &core.SSEMarshaler{}, and the
&core.SSEMarshaler{JSONPb: runtime.JSONPb{...}} snippet lack language
identifiers and trigger MD040; update each of those triple-backtick blocks to
include the Go language tag (```go) so the examples for
core.RegisterHTTPMarshaler and the SSEMarshaler/JSONPb snippet are fenced as Go
code.


</details>

<!-- fingerprinting:phantom:triton:hawk -->

<!-- This is an auto-generated reply by CodeRabbit -->

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping for this PR. The README is auto-generated by gomarkdoc from the Go doc comments — those code samples render without language tags because gomarkdoc reformats them. MD040 is a lint warning, not a CI failure, and the right fix is upstream in gomarkdoc rather than hand-editing the regenerated README (which would just get clobbered on the next make doc).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ankurs, understood — makes complete sense. Patching a generated file would just create noise for the next make doc run, and the fix belongs in the gomarkdoc template/tooling rather than here.


✏️ Learnings added
Learnt from: ankurs
Repo: go-coldbrew/core PR: 92
File: README.md:526-528
Timestamp: 2026-05-28T16:52:53.133Z
Learning: In the go-coldbrew/core repository, `README.md` (and `config/README.md`) are auto-generated files produced by `gomarkdoc` from Go doc comments via `make doc`. They should **not** be hand-edited, as any manual changes will be clobbered on the next `make doc` invocation. Issues with the generated output (e.g., missing language tags on fenced code blocks / MD040 warnings) need to be fixed upstream in the gomarkdoc configuration or Go doc comment formatting, not directly in the README.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


Clients then opt in by sending Accept: text/event\-stream on the gateway URL. The newHTTPCompressionWrapper excludes text/event\-stream from gzip/zstd compression so frames reach the client in real time \(compressed SSE is buffered by many HTTP intermediaries\).

SSE is server\-to\-client only: Unmarshal and NewDecoder return an error.

Per\-field protojson options \(EmitUnpopulated, UseProtoNames, etc.\) can be set by initializing the embedded JSONPb directly:

```
&core.SSEMarshaler{JSONPb: runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
}}
```

```go
type SSEMarshaler struct {
runtime.JSONPb
}
```

<a name="SSEMarshaler.ContentType"></a>
### func \(\*SSEMarshaler\) [ContentType](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L74>)

```go
func (*SSEMarshaler) ContentType(_ any) string
```

ContentType always returns "text/event\-stream".

<a name="SSEMarshaler.Delimiter"></a>
### func \(\*SSEMarshaler\) [Delimiter](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L112>)

```go
func (*SSEMarshaler) Delimiter() []byte
```

Delimiter returns "\\n\\n", which terminates one SSE frame. A fresh slice is returned per call so callers cannot mutate the framing for other SSEMarshaler instances.

<a name="SSEMarshaler.Marshal"></a>
### func \(\*SSEMarshaler\) [Marshal](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L94>)

```go
func (s *SSEMarshaler) Marshal(v any) ([]byte, error)
```

Marshal returns "data: \<json\>" with no trailing newline. Frame termination is supplied by Delimiter; the gateway writes Marshal output followed by Delimiter for each streamed message.

Newlines inside the JSON payload \(when the embedded runtime.JSONPb is configured with MarshalOptions.Multiline or Indent\) are turned into continuation lines: each line of the payload starts with "data: " as the SSE spec requires, otherwise EventSource truncates the frame after the first line.

<a name="SSEMarshaler.NewDecoder"></a>
### func \(\*SSEMarshaler\) [NewDecoder](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L124>)

```go
func (*SSEMarshaler) NewDecoder(_ io.Reader) runtime.Decoder
```

NewDecoder returns a decoder that always errors, for the same reason as Unmarshal.

<a name="SSEMarshaler.NewEncoder"></a>
### func \(\*SSEMarshaler\) [NewEncoder](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L132>)

```go
func (s *SSEMarshaler) NewEncoder(w io.Writer) runtime.Encoder
```

NewEncoder returns an encoder that writes "data: \<json\>\\n\\n" per Encode call.

<a name="SSEMarshaler.StreamContentType"></a>
### func \(\*SSEMarshaler\) [StreamContentType](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L81>)

```go
func (*SSEMarshaler) StreamContentType(_ any) string
```

StreamContentType matches ContentType so server\-streaming responses also advertise text/event\-stream. Gateway prefers this over ContentType when implemented \(see runtime.ForwardResponseStream\).

<a name="SSEMarshaler.Unmarshal"></a>
### func \(\*SSEMarshaler\) [Unmarshal](<https://github.com/go-coldbrew/core/blob/main/marshaler_sse.go#L118>)

```go
func (*SSEMarshaler) Unmarshal(_ []byte, _ any) error
```

Unmarshal returns an error: SSE is a server\-to\-client format and the gateway never reads SSE bodies from inbound requests.

Generated by [gomarkdoc](<https://github.com/princjef/gomarkdoc>)
19 changes: 19 additions & 0 deletions compression.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
package core

import (
"mime"
"net/http"

"github.com/go-coldbrew/core/config"
"github.com/klauspost/compress/gzhttp"
)

// sseMediaType is the Content-Type advertised by SSEMarshaler and excluded
// from HTTP compression.
const sseMediaType = "text/event-stream"

// newHTTPCompressionWrapper builds the gzhttp wrapper used by initHTTP. It
// negotiates gzip and (unless disabled) zstd from Accept-Encoding. Pulled out
// so it can be tested without standing up the full gateway.
//
// text/event-stream is excluded via excludeSSEContentTypeFilter — proxies
// and CDNs buffer compressed SSE responses, defeating real-time delivery.
func newHTTPCompressionWrapper(cfg config.Config) (func(http.Handler) http.HandlerFunc, error) {
return gzhttp.NewWrapper(
gzhttp.MinSize(cfg.HTTPCompressionMinSize),
gzhttp.EnableZstd(!cfg.DisableZstdCompression),
gzhttp.PreferZstd(!cfg.DisableZstdCompression && cfg.PreferZstd),
gzhttp.ContentTypeFilter(excludeSSEContentTypeFilter),
)
}

// excludeSSEContentTypeFilter wraps gzhttp.DefaultContentTypeFilter to also
// exclude text/event-stream, so SSE frames are delivered uncompressed and
// reach the client without intermediary buffering.
func excludeSSEContentTypeFilter(ct string) bool {
if mediaType, _, err := mime.ParseMediaType(ct); err == nil && mediaType == sseMediaType {
return false
}
return gzhttp.DefaultContentTypeFilter(ct)
}
46 changes: 46 additions & 0 deletions compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,52 @@ func TestNewHTTPCompressionWrapper_NegotiatesEncoding(t *testing.T) {
}
}

func TestNewHTTPCompressionWrapper_ExcludesEventStream(t *testing.T) {
// SSE responses must never be compressed: intermediaries (proxies, CDNs)
// buffer compressed event streams, which defeats real-time delivery for
// EventSource clients consuming streaming gateway RPCs.
cfg := config.Config{HTTPCompressionMinSize: 256, PreferZstd: true}
wrapper, err := newHTTPCompressionWrapper(cfg)
if err != nil {
t.Fatalf("newHTTPCompressionWrapper: %v", err)
}

body := strings.Repeat("data: payload\n\n", 256) // well above MinSize

cases := []struct {
name string
contentType string
wantEncoded bool
}{
{"plain-text-still-compresses", "text/plain", true},
{"sse-bare", "text/event-stream", false},
{"sse-with-charset", "text/event-stream; charset=utf-8", false},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", tc.contentType)
_, _ = w.Write([]byte(body))
})
wrapped := wrapper(handler)

req := httptest.NewRequest(http.MethodGet, "/", nil)
req.Header.Set("Accept-Encoding", "gzip, zstd")
rec := httptest.NewRecorder()
wrapped.ServeHTTP(rec, req)

encoding := rec.Header().Get("Content-Encoding")
if tc.wantEncoded && encoding == "" {
t.Fatalf("Content-Type %q: expected compression, got none", tc.contentType)
}
if !tc.wantEncoded && encoding != "" {
t.Fatalf("Content-Type %q: expected no compression, got %q", tc.contentType, encoding)
}
})
}
}

func TestNewHTTPCompressionWrapper_BelowMinSize(t *testing.T) {
cfg := config.Config{HTTPCompressionMinSize: 256, PreferZstd: true}
wrapper, err := newHTTPCompressionWrapper(cfg)
Expand Down
13 changes: 10 additions & 3 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import "github.com/go-coldbrew/core/config"


<a name="Config"></a>
## type [Config](<https://github.com/go-coldbrew/core/blob/main/config/config.go#L21-L219>)
## type [Config](<https://github.com/go-coldbrew/core/blob/main/config/config.go#L21-L226>)

Config is the configuration for the Coldbrew server. It is populated from environment variables and has sensible defaults for all fields so that you can just use it as is without any configuration. The following environment variables are supported and can be used to override the defaults for the fields.

Expand Down Expand Up @@ -152,6 +152,13 @@ type Config struct {
UseJSONBuiltinMarshaller bool `envconfig:"USE_JSON_BUILTIN_MARSHALLER" env:"USE_JSON_BUILTIN_MARSHALLER" default:"false"`
// JSONBuiltinMarshallerMime specifies the Content-Type/Accept header for use by the json builtin marshaler
JSONBuiltinMarshallerMime string `envconfig:"JSON_BUILTIN_MARSHALLER_MIME" env:"JSON_BUILTIN_MARSHALLER_MIME" default:"application/json"`
// DisableSSEMarshaler opts out of the auto-registered text/event-stream
// marshaler. By default, server-streaming gateway RPCs are consumable as
// Server-Sent Events when the client sends Accept: text/event-stream — the
// natural transport for browser EventSource clients and AI/LLM token
// streams. Set true to suppress the registration (e.g. if the service
// registers a custom SSE marshaler via core.RegisterHTTPMarshaler).
DisableSSEMarshaler bool `envconfig:"DISABLE_SSE_MARSHALER" env:"DISABLE_SSE_MARSHALER" default:"false"`
// MaxConnectionIdle is a duration for the amount of time after which an
// idle connection would be closed by sending a GoAway. Idleness duration is
// defined since the most recent time the number of outstanding RPCs became
Expand Down Expand Up @@ -274,7 +281,7 @@ type Config struct {
```

<a name="Config.Validate"></a>
### func \(Config\) [Validate](<https://github.com/go-coldbrew/core/blob/main/config/config.go#L224>)
### func \(Config\) [Validate](<https://github.com/go-coldbrew/core/blob/main/config/config.go#L231>)

```go
func (c Config) Validate() []string
Expand All @@ -283,7 +290,7 @@ func (c Config) Validate() []string
Validate checks the configuration for common misconfigurations and returns a list of warning messages. It does not return an error to avoid breaking existing services — warnings are meant to be logged at startup.

<a name="Config.ValidateStrict"></a>
### func \(Config\) [ValidateStrict](<https://github.com/go-coldbrew/core/blob/main/config/config.go#L315>)
### func \(Config\) [ValidateStrict](<https://github.com/go-coldbrew/core/blob/main/config/config.go#L322>)

```go
func (c Config) ValidateStrict() []error
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ type Config struct {
UseJSONBuiltinMarshaller bool `envconfig:"USE_JSON_BUILTIN_MARSHALLER" env:"USE_JSON_BUILTIN_MARSHALLER" default:"false"`
// JSONBuiltinMarshallerMime specifies the Content-Type/Accept header for use by the json builtin marshaler
JSONBuiltinMarshallerMime string `envconfig:"JSON_BUILTIN_MARSHALLER_MIME" env:"JSON_BUILTIN_MARSHALLER_MIME" default:"application/json"`
// DisableSSEMarshaler opts out of the auto-registered text/event-stream
// marshaler. By default, server-streaming gateway RPCs are consumable as
// Server-Sent Events when the client sends Accept: text/event-stream — the
// natural transport for browser EventSource clients and AI/LLM token
// streams. Set true to suppress the registration (e.g. if the service
// registers a custom SSE marshaler via core.RegisterHTTPMarshaler).
DisableSSEMarshaler bool `envconfig:"DISABLE_SSE_MARSHALER" env:"DISABLE_SSE_MARSHALER" default:"false"`
// MaxConnectionIdle is a duration for the amount of time after which an
// idle connection would be closed by sending a GoAway. Idleness duration is
// defined since the most recent time the number of outstanding RPCs became
Expand Down
21 changes: 4 additions & 17 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,23 +452,10 @@ func (c *cb) initHTTP(ctx context.Context) (*http.Server, error) {
allowedHttpHeaderPrefixes = []string{c.config.HTTPHeaderPrefix}
}

muxOpts := []runtime.ServeMuxOption{
runtime.WithIncomingHeaderMatcher(
getCustomHeaderMatcher(allowedHttpHeaderPrefixes, c.config.TraceHeaderName, c.config.DebugLogHeaderName),
),
runtime.WithMarshalerOption("application/proto", pMar),
runtime.WithMarshalerOption("application/protobuf", pMar),
runtime.WithMiddlewares(spanRouteMiddleware),
}

if c.config.UseJSONBuiltinMarshaller {
muxOpts = append(
muxOpts,
runtime.WithMarshalerOption(c.config.JSONBuiltinMarshallerMime, &runtime.JSONBuiltin{}),
)
}

muxOpts = append(muxOpts, registeredServeMuxOptions()...)
muxOpts := append(
buildHTTPMuxOptions(c.config, allowedHttpHeaderPrefixes, pMar),
registeredServeMuxOptions()...,
)

mux := runtime.NewServeMux(muxOpts...)

Expand Down
Loading
Loading