diff --git a/Makefile b/Makefile
index fe835ac..00f1f8f 100644
--- a/Makefile
+++ b/Makefile
@@ -10,7 +10,7 @@ doc:
lint:
go tool golangci-lint run
- go tool govulncheck -scan=module
+ go tool govulncheck ./...
bench:
go test -run=^$$ -bench=. -benchmem ./...
diff --git a/README.md b/README.md
index 70a508a..95a506b 100755
--- a/README.md
+++ b/README.md
@@ -114,6 +114,14 @@ For full documentation, visit https://docs.coldbrew.cloud
- [type CBStopper](<#CBStopper>)
- [type CBWorkerProvider](<#CBWorkerProvider>)
- [type OTLPConfig](<#OTLPConfig>)
+- [type SSEMarshaler](<#SSEMarshaler>)
+ - [func \(\*SSEMarshaler\) ContentType\(\_ any\) string](<#SSEMarshaler.ContentType>)
+ - [func \(\*SSEMarshaler\) Delimiter\(\) \[\]byte](<#SSEMarshaler.Delimiter>)
+ - [func \(s \*SSEMarshaler\) Marshal\(v any\) \(\[\]byte, error\)](<#SSEMarshaler.Marshal>)
+ - [func \(\*SSEMarshaler\) NewDecoder\(\_ io.Reader\) runtime.Decoder](<#SSEMarshaler.NewDecoder>)
+ - [func \(s \*SSEMarshaler\) NewEncoder\(w io.Writer\) runtime.Encoder](<#SSEMarshaler.NewEncoder>)
+ - [func \(\*SSEMarshaler\) StreamContentType\(\_ any\) string](<#SSEMarshaler.StreamContentType>)
+ - [func \(\*SSEMarshaler\) Unmarshal\(\_ \[\]byte, \_ any\) error](<#SSEMarshaler.Unmarshal>)
## Constants
@@ -183,7 +191,7 @@ RegisterServeMuxOption appends a runtime.ServeMuxOption that initHTTP passes to
Must be called before core.Run\(\) \(typically from a service's PreStart hook\). Not safe for concurrent registration.
-## func [SetOTELGRPCClientOptions]()
+## func [SetOTELGRPCClientOptions]()
```go
func SetOTELGRPCClientOptions(opts ...otelgrpc.Option)
@@ -192,7 +200,7 @@ func SetOTELGRPCClientOptions(opts ...otelgrpc.Option)
Deprecated: Use SetOTELOptions instead. Only applies when OTEL\_USE\_LEGACY\_INSTRUMENTATION=true.
-## func [SetOTELGRPCServerOptions]()
+## func [SetOTELGRPCServerOptions]()
```go
func SetOTELGRPCServerOptions(opts ...otelgrpc.Option)
@@ -201,7 +209,7 @@ func SetOTELGRPCServerOptions(opts ...otelgrpc.Option)
Deprecated: Use SetOTELOptions instead. Only applies when OTEL\_USE\_LEGACY\_INSTRUMENTATION=true.
-## func [SetOTELOptions]()
+## func [SetOTELOptions]()
```go
func SetOTELOptions(opts grpcotel.Options)
@@ -360,7 +368,7 @@ type CB interface {
```
-### func [New]()
+### func [New]()
```go
func New(c config.Config) CB
@@ -506,4 +514,100 @@ type OTLPConfig struct {
}
```
+
+## type [SSEMarshaler]()
+
+SSEMarshaler is a runtime.Marshaler that emits Server\-Sent Events \(text/event\-stream\) frames for server\-streaming gateway RPCs. It lets browser EventSource clients consume streaming RPCs directly — useful for AI/LLM token streaming and other long\-running progressive responses.
+
+Each Marshal call returns "data: \" with no trailing newline; the Delimiter \("\\n\\n"\) terminates each SSE frame per the SSE spec. The JSON payload uses protojson via the embedded runtime.JSONPb, so field naming matches the gateway's default JSON responses.
+
+Wire it up from a service's PreStart hook:
+
+```
+core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})
+```
+
+Clients then opt in by sending Accept: text/event\-stream on the gateway URL. The newHTTPCompressionWrapper excludes text/event\-stream from gzip/zstd compression so frames reach the client in real time \(compressed SSE is buffered by many HTTP intermediaries\).
+
+SSE is server\-to\-client only: Unmarshal and NewDecoder return an error.
+
+Per\-field protojson options \(EmitUnpopulated, UseProtoNames, etc.\) can be set by initializing the embedded JSONPb directly:
+
+```
+&core.SSEMarshaler{JSONPb: runtime.JSONPb{
+ MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
+}}
+```
+
+```go
+type SSEMarshaler struct {
+ runtime.JSONPb
+}
+```
+
+
+### func \(\*SSEMarshaler\) [ContentType]()
+
+```go
+func (*SSEMarshaler) ContentType(_ any) string
+```
+
+ContentType always returns "text/event\-stream".
+
+
+### func \(\*SSEMarshaler\) [Delimiter]()
+
+```go
+func (*SSEMarshaler) Delimiter() []byte
+```
+
+Delimiter returns "\\n\\n", which terminates one SSE frame. A fresh slice is returned per call so callers cannot mutate the framing for other SSEMarshaler instances.
+
+
+### func \(\*SSEMarshaler\) [Marshal]()
+
+```go
+func (s *SSEMarshaler) Marshal(v any) ([]byte, error)
+```
+
+Marshal returns "data: \" with no trailing newline. Frame termination is supplied by Delimiter; the gateway writes Marshal output followed by Delimiter for each streamed message.
+
+Newlines inside the JSON payload \(when the embedded runtime.JSONPb is configured with MarshalOptions.Multiline or Indent\) are turned into continuation lines: each line of the payload starts with "data: " as the SSE spec requires, otherwise EventSource truncates the frame after the first line.
+
+
+### func \(\*SSEMarshaler\) [NewDecoder]()
+
+```go
+func (*SSEMarshaler) NewDecoder(_ io.Reader) runtime.Decoder
+```
+
+NewDecoder returns a decoder that always errors, for the same reason as Unmarshal.
+
+
+### func \(\*SSEMarshaler\) [NewEncoder]()
+
+```go
+func (s *SSEMarshaler) NewEncoder(w io.Writer) runtime.Encoder
+```
+
+NewEncoder returns an encoder that writes "data: \\\n\\n" per Encode call.
+
+
+### func \(\*SSEMarshaler\) [StreamContentType]()
+
+```go
+func (*SSEMarshaler) StreamContentType(_ any) string
+```
+
+StreamContentType matches ContentType so server\-streaming responses also advertise text/event\-stream. Gateway prefers this over ContentType when implemented \(see runtime.ForwardResponseStream\).
+
+
+### func \(\*SSEMarshaler\) [Unmarshal]()
+
+```go
+func (*SSEMarshaler) Unmarshal(_ []byte, _ any) error
+```
+
+Unmarshal returns an error: SSE is a server\-to\-client format and the gateway never reads SSE bodies from inbound requests.
+
Generated by [gomarkdoc]()
diff --git a/compression.go b/compression.go
index 7039ae2..fd08e52 100644
--- a/compression.go
+++ b/compression.go
@@ -1,19 +1,38 @@
package core
import (
+ "mime"
"net/http"
"github.com/go-coldbrew/core/config"
"github.com/klauspost/compress/gzhttp"
)
+// sseMediaType is the Content-Type advertised by SSEMarshaler and excluded
+// from HTTP compression.
+const sseMediaType = "text/event-stream"
+
// newHTTPCompressionWrapper builds the gzhttp wrapper used by initHTTP. It
// negotiates gzip and (unless disabled) zstd from Accept-Encoding. Pulled out
// so it can be tested without standing up the full gateway.
+//
+// text/event-stream is excluded via excludeSSEContentTypeFilter — proxies
+// and CDNs buffer compressed SSE responses, defeating real-time delivery.
func newHTTPCompressionWrapper(cfg config.Config) (func(http.Handler) http.HandlerFunc, error) {
return gzhttp.NewWrapper(
gzhttp.MinSize(cfg.HTTPCompressionMinSize),
gzhttp.EnableZstd(!cfg.DisableZstdCompression),
gzhttp.PreferZstd(!cfg.DisableZstdCompression && cfg.PreferZstd),
+ gzhttp.ContentTypeFilter(excludeSSEContentTypeFilter),
)
}
+
+// excludeSSEContentTypeFilter wraps gzhttp.DefaultContentTypeFilter to also
+// exclude text/event-stream, so SSE frames are delivered uncompressed and
+// reach the client without intermediary buffering.
+func excludeSSEContentTypeFilter(ct string) bool {
+ if mediaType, _, err := mime.ParseMediaType(ct); err == nil && mediaType == sseMediaType {
+ return false
+ }
+ return gzhttp.DefaultContentTypeFilter(ct)
+}
diff --git a/compression_test.go b/compression_test.go
index 5b07c9a..63cc854 100644
--- a/compression_test.go
+++ b/compression_test.go
@@ -78,6 +78,52 @@ func TestNewHTTPCompressionWrapper_NegotiatesEncoding(t *testing.T) {
}
}
+func TestNewHTTPCompressionWrapper_ExcludesEventStream(t *testing.T) {
+ // SSE responses must never be compressed: intermediaries (proxies, CDNs)
+ // buffer compressed event streams, which defeats real-time delivery for
+ // EventSource clients consuming streaming gateway RPCs.
+ cfg := config.Config{HTTPCompressionMinSize: 256, PreferZstd: true}
+ wrapper, err := newHTTPCompressionWrapper(cfg)
+ if err != nil {
+ t.Fatalf("newHTTPCompressionWrapper: %v", err)
+ }
+
+ body := strings.Repeat("data: payload\n\n", 256) // well above MinSize
+
+ cases := []struct {
+ name string
+ contentType string
+ wantEncoded bool
+ }{
+ {"plain-text-still-compresses", "text/plain", true},
+ {"sse-bare", "text/event-stream", false},
+ {"sse-with-charset", "text/event-stream; charset=utf-8", false},
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ handler := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
+ w.Header().Set("Content-Type", tc.contentType)
+ _, _ = w.Write([]byte(body))
+ })
+ wrapped := wrapper(handler)
+
+ req := httptest.NewRequest(http.MethodGet, "/", nil)
+ req.Header.Set("Accept-Encoding", "gzip, zstd")
+ rec := httptest.NewRecorder()
+ wrapped.ServeHTTP(rec, req)
+
+ encoding := rec.Header().Get("Content-Encoding")
+ if tc.wantEncoded && encoding == "" {
+ t.Fatalf("Content-Type %q: expected compression, got none", tc.contentType)
+ }
+ if !tc.wantEncoded && encoding != "" {
+ t.Fatalf("Content-Type %q: expected no compression, got %q", tc.contentType, encoding)
+ }
+ })
+ }
+}
+
func TestNewHTTPCompressionWrapper_BelowMinSize(t *testing.T) {
cfg := config.Config{HTTPCompressionMinSize: 256, PreferZstd: true}
wrapper, err := newHTTPCompressionWrapper(cfg)
diff --git a/config/README.md b/config/README.md
index c320edb..58b0e7c 100755
--- a/config/README.md
+++ b/config/README.md
@@ -67,7 +67,7 @@ import "github.com/go-coldbrew/core/config"
-## type [Config]()
+## type [Config]()
Config is the configuration for the Coldbrew server. It is populated from environment variables and has sensible defaults for all fields so that you can just use it as is without any configuration. The following environment variables are supported and can be used to override the defaults for the fields.
@@ -152,6 +152,13 @@ type Config struct {
UseJSONBuiltinMarshaller bool `envconfig:"USE_JSON_BUILTIN_MARSHALLER" env:"USE_JSON_BUILTIN_MARSHALLER" default:"false"`
// JSONBuiltinMarshallerMime specifies the Content-Type/Accept header for use by the json builtin marshaler
JSONBuiltinMarshallerMime string `envconfig:"JSON_BUILTIN_MARSHALLER_MIME" env:"JSON_BUILTIN_MARSHALLER_MIME" default:"application/json"`
+ // DisableSSEMarshaler opts out of the auto-registered text/event-stream
+ // marshaler. By default, server-streaming gateway RPCs are consumable as
+ // Server-Sent Events when the client sends Accept: text/event-stream — the
+ // natural transport for browser EventSource clients and AI/LLM token
+ // streams. Set true to suppress the registration (e.g. if the service
+ // registers a custom SSE marshaler via core.RegisterHTTPMarshaler).
+ DisableSSEMarshaler bool `envconfig:"DISABLE_SSE_MARSHALER" env:"DISABLE_SSE_MARSHALER" default:"false"`
// MaxConnectionIdle is a duration for the amount of time after which an
// idle connection would be closed by sending a GoAway. Idleness duration is
// defined since the most recent time the number of outstanding RPCs became
@@ -274,7 +281,7 @@ type Config struct {
```
-### func \(Config\) [Validate]()
+### func \(Config\) [Validate]()
```go
func (c Config) Validate() []string
@@ -283,7 +290,7 @@ func (c Config) Validate() []string
Validate checks the configuration for common misconfigurations and returns a list of warning messages. It does not return an error to avoid breaking existing services — warnings are meant to be logged at startup.
-### func \(Config\) [ValidateStrict]()
+### func \(Config\) [ValidateStrict]()
```go
func (c Config) ValidateStrict() []error
diff --git a/config/config.go b/config/config.go
index 904d2f4..7f91712 100644
--- a/config/config.go
+++ b/config/config.go
@@ -93,6 +93,13 @@ type Config struct {
UseJSONBuiltinMarshaller bool `envconfig:"USE_JSON_BUILTIN_MARSHALLER" env:"USE_JSON_BUILTIN_MARSHALLER" default:"false"`
// JSONBuiltinMarshallerMime specifies the Content-Type/Accept header for use by the json builtin marshaler
JSONBuiltinMarshallerMime string `envconfig:"JSON_BUILTIN_MARSHALLER_MIME" env:"JSON_BUILTIN_MARSHALLER_MIME" default:"application/json"`
+ // DisableSSEMarshaler opts out of the auto-registered text/event-stream
+ // marshaler. By default, server-streaming gateway RPCs are consumable as
+ // Server-Sent Events when the client sends Accept: text/event-stream — the
+ // natural transport for browser EventSource clients and AI/LLM token
+ // streams. Set true to suppress the registration (e.g. if the service
+ // registers a custom SSE marshaler via core.RegisterHTTPMarshaler).
+ DisableSSEMarshaler bool `envconfig:"DISABLE_SSE_MARSHALER" env:"DISABLE_SSE_MARSHALER" default:"false"`
// MaxConnectionIdle is a duration for the amount of time after which an
// idle connection would be closed by sending a GoAway. Idleness duration is
// defined since the most recent time the number of outstanding RPCs became
diff --git a/core.go b/core.go
index 65c18d4..99e215e 100644
--- a/core.go
+++ b/core.go
@@ -452,23 +452,10 @@ func (c *cb) initHTTP(ctx context.Context) (*http.Server, error) {
allowedHttpHeaderPrefixes = []string{c.config.HTTPHeaderPrefix}
}
- muxOpts := []runtime.ServeMuxOption{
- runtime.WithIncomingHeaderMatcher(
- getCustomHeaderMatcher(allowedHttpHeaderPrefixes, c.config.TraceHeaderName, c.config.DebugLogHeaderName),
- ),
- runtime.WithMarshalerOption("application/proto", pMar),
- runtime.WithMarshalerOption("application/protobuf", pMar),
- runtime.WithMiddlewares(spanRouteMiddleware),
- }
-
- if c.config.UseJSONBuiltinMarshaller {
- muxOpts = append(
- muxOpts,
- runtime.WithMarshalerOption(c.config.JSONBuiltinMarshallerMime, &runtime.JSONBuiltin{}),
- )
- }
-
- muxOpts = append(muxOpts, registeredServeMuxOptions()...)
+ muxOpts := append(
+ buildHTTPMuxOptions(c.config, allowedHttpHeaderPrefixes, pMar),
+ registeredServeMuxOptions()...,
+ )
mux := runtime.NewServeMux(muxOpts...)
diff --git a/go.mod b/go.mod
index 9036dde..d833fb0 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
module github.com/go-coldbrew/core
-go 1.26.2
+go 1.26.3
require (
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
@@ -279,17 +279,17 @@ require (
go.uber.org/zap v1.27.1 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
- golang.org/x/crypto v0.49.0 // indirect
+ golang.org/x/crypto v0.51.0 // indirect
golang.org/x/exp v0.0.0-20250813145105-42675adae3e6 // indirect
golang.org/x/exp/typeparams v0.0.0-20260209203927-2842357ff358 // indirect
- golang.org/x/mod v0.34.0 // indirect
- golang.org/x/net v0.52.0 // indirect
- golang.org/x/sys v0.42.0 // indirect
- golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect
- golang.org/x/term v0.41.0 // indirect
- golang.org/x/text v0.35.0 // indirect
+ golang.org/x/mod v0.35.0 // indirect
+ golang.org/x/net v0.55.0 // indirect
+ golang.org/x/sys v0.45.0 // indirect
+ golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect
+ golang.org/x/term v0.43.0 // indirect
+ golang.org/x/text v0.37.0 // indirect
golang.org/x/time v0.15.0 // indirect
- golang.org/x/tools v0.43.0 // indirect
+ golang.org/x/tools v0.44.0 // indirect
golang.org/x/vuln v1.1.4 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
diff --git a/go.sum b/go.sum
index 2a964aa..2ce3e87 100644
--- a/go.sum
+++ b/go.sum
@@ -700,8 +700,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
-golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
-golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
+golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
+golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/exp v0.0.0-20250813145105-42675adae3e6 h1:SbTAbRFnd5kjQXbczszQ0hdk3ctwYf3qBNH9jIsGclE=
golang.org/x/exp v0.0.0-20250813145105-42675adae3e6/go.mod h1:4QTo5u+SEIbbKW1RacMZq1YEfOBqeXa19JeshGi+zc4=
golang.org/x/exp/typeparams v0.0.0-20220428152302-39d4317da171/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk=
@@ -717,8 +717,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
-golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI=
-golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY=
+golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM=
+golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -733,8 +733,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
-golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
-golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
+golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8=
+golang.org/x/net v0.55.0/go.mod h1:L5U2KuzuOe1lY7Z+aWVIKK6qEeJXnXV9yzGA+WCHJww=
golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ=
golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -776,18 +776,18 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
-golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
-golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c h1:6a8FdnNk6bTXBjR4AGKFgUKuo+7GnR3FX5L7CbveeZc=
-golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c/go.mod h1:TpUTTEp9frx7rTdLpC9gFG9kdI7zVLFTFFlqaH2Cncw=
+golang.org/x/sys v0.45.0 h1:dO4czNzziLiiXplLQgBCEpCvXQ3dnkn0SdaZSYdQ+FY=
+golang.org/x/sys v0.45.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
+golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa h1:efT73AJZfAAUV7SOip6pWGkwJDzIGiKBZGVzHYa+ve4=
+golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa/go.mod h1:kHjTxDEnAu6/Nl9lDkzjWpR+bmKfxeiRuSDlsMb70gE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
-golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU=
-golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A=
+golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4=
+golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -796,8 +796,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
-golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
-golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
+golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
+golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -812,8 +812,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
-golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s=
-golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0=
+golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c=
+golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI=
golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM=
golang.org/x/tools/go/expect v0.1.1-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated h1:1h2MnaIAIXISqTFKdENegdpAgUXz6NrPEsbIeWaBRvM=
diff --git a/marshaler_sse.go b/marshaler_sse.go
new file mode 100644
index 0000000..a86a924
--- /dev/null
+++ b/marshaler_sse.go
@@ -0,0 +1,144 @@
+package core
+
+import (
+ "bytes"
+ "errors"
+ "io"
+
+ "github.com/go-coldbrew/core/config"
+ "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
+)
+
+// buildHTTPMuxOptions assembles the base runtime.ServeMuxOptions applied to
+// the HTTP gateway, in the order grpc-gateway applies them: defaults first,
+// then config-toggled options (SSE marshaler, JSON builtin), then
+// service-registered options on top (appended by the caller). Service options
+// win on the same MIME so they can override SSEMarshaler with a custom
+// variant if needed.
+func buildHTTPMuxOptions(cfg config.Config, allowedHeaderPrefixes []string, protoMarshaler runtime.Marshaler) []runtime.ServeMuxOption {
+ opts := []runtime.ServeMuxOption{
+ runtime.WithIncomingHeaderMatcher(
+ getCustomHeaderMatcher(allowedHeaderPrefixes, cfg.TraceHeaderName, cfg.DebugLogHeaderName),
+ ),
+ runtime.WithMarshalerOption("application/proto", protoMarshaler),
+ runtime.WithMarshalerOption("application/protobuf", protoMarshaler),
+ runtime.WithMiddlewares(spanRouteMiddleware),
+ }
+ if !cfg.DisableSSEMarshaler {
+ opts = append(opts, runtime.WithMarshalerOption(sseMediaType, &SSEMarshaler{}))
+ }
+ if cfg.UseJSONBuiltinMarshaller {
+ opts = append(opts, runtime.WithMarshalerOption(cfg.JSONBuiltinMarshallerMime, &runtime.JSONBuiltin{}))
+ }
+ return opts
+}
+
+// SSEMarshaler is a runtime.Marshaler that emits Server-Sent Events
+// (text/event-stream) frames for server-streaming gateway RPCs. It lets
+// browser EventSource clients consume streaming RPCs directly — useful for
+// AI/LLM token streaming and other long-running progressive responses.
+//
+// Each Marshal call returns "data: " with no trailing newline; the
+// Delimiter ("\n\n") terminates each SSE frame per the SSE spec. The JSON
+// payload uses protojson via the embedded runtime.JSONPb, so field naming
+// matches the gateway's default JSON responses.
+//
+// Wire it up from a service's PreStart hook:
+//
+// core.RegisterHTTPMarshaler("text/event-stream", &core.SSEMarshaler{})
+//
+// Clients then opt in by sending Accept: text/event-stream on the gateway
+// URL. The newHTTPCompressionWrapper excludes text/event-stream from
+// gzip/zstd compression so frames reach the client in real time (compressed
+// SSE is buffered by many HTTP intermediaries).
+//
+// SSE is server-to-client only: Unmarshal and NewDecoder return an error.
+//
+// Per-field protojson options (EmitUnpopulated, UseProtoNames, etc.) can be
+// set by initializing the embedded JSONPb directly:
+//
+// &core.SSEMarshaler{JSONPb: runtime.JSONPb{
+// MarshalOptions: protojson.MarshalOptions{EmitUnpopulated: true},
+// }}
+type SSEMarshaler struct {
+ runtime.JSONPb
+}
+
+var (
+ ssePrefix = []byte("data: ")
+ sseLineContinuation = []byte("\ndata: ")
+ errSSEReadNotSupported = errors.New("core: SSEMarshaler does not support reading; Server-Sent Events is a server-to-client format")
+)
+
+// ContentType always returns "text/event-stream".
+func (*SSEMarshaler) ContentType(_ any) string {
+ return sseMediaType
+}
+
+// StreamContentType matches ContentType so server-streaming responses also
+// advertise text/event-stream. Gateway prefers this over ContentType when
+// implemented (see runtime.ForwardResponseStream).
+func (*SSEMarshaler) StreamContentType(_ any) string {
+ return sseMediaType
+}
+
+// Marshal returns "data: " with no trailing newline. Frame
+// termination is supplied by Delimiter; the gateway writes Marshal output
+// followed by Delimiter for each streamed message.
+//
+// Newlines inside the JSON payload (when the embedded runtime.JSONPb is
+// configured with MarshalOptions.Multiline or Indent) are turned into
+// continuation lines: each line of the payload starts with "data: " as the
+// SSE spec requires, otherwise EventSource truncates the frame after the
+// first line.
+func (s *SSEMarshaler) Marshal(v any) ([]byte, error) {
+ body, err := s.JSONPb.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+ // Each subsequent line of a multiline JSON payload must also be
+ // "data: "-prefixed per the SSE spec. ReplaceAll is a no-op when the
+ // body contains no newlines (single-line JSON, the default).
+ body = bytes.ReplaceAll(body, []byte("\n"), sseLineContinuation)
+ out := make([]byte, 0, len(ssePrefix)+len(body))
+ out = append(out, ssePrefix...)
+ out = append(out, body...)
+ return out, nil
+}
+
+// Delimiter returns "\n\n", which terminates one SSE frame. A fresh slice
+// is returned per call so callers cannot mutate the framing for other
+// SSEMarshaler instances.
+func (*SSEMarshaler) Delimiter() []byte {
+ return []byte("\n\n")
+}
+
+// Unmarshal returns an error: SSE is a server-to-client format and the
+// gateway never reads SSE bodies from inbound requests.
+func (*SSEMarshaler) Unmarshal(_ []byte, _ any) error {
+ return errSSEReadNotSupported
+}
+
+// NewDecoder returns a decoder that always errors, for the same reason as
+// Unmarshal.
+func (*SSEMarshaler) NewDecoder(_ io.Reader) runtime.Decoder {
+ return runtime.DecoderFunc(func(_ any) error {
+ return errSSEReadNotSupported
+ })
+}
+
+// NewEncoder returns an encoder that writes "data: \n\n" per Encode
+// call.
+func (s *SSEMarshaler) NewEncoder(w io.Writer) runtime.Encoder {
+ return runtime.EncoderFunc(func(v any) error {
+ body, err := s.Marshal(v)
+ if err != nil {
+ return err
+ }
+ if _, err := w.Write(body); err != nil {
+ return err
+ }
+ _, err = w.Write(s.Delimiter())
+ return err
+ })
+}
diff --git a/marshaler_sse_test.go b/marshaler_sse_test.go
new file mode 100644
index 0000000..bb04032
--- /dev/null
+++ b/marshaler_sse_test.go
@@ -0,0 +1,213 @@
+package core
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+
+ "github.com/go-coldbrew/core/config"
+ "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/types/known/structpb"
+)
+
+func TestSSEMarshaler_ContentType(t *testing.T) {
+ m := &SSEMarshaler{}
+ if got := m.ContentType(nil); got != "text/event-stream" {
+ t.Fatalf("ContentType() = %q, want text/event-stream", got)
+ }
+ if got := m.StreamContentType(nil); got != "text/event-stream" {
+ t.Fatalf("StreamContentType() = %q, want text/event-stream", got)
+ }
+}
+
+func TestSSEMarshaler_Delimiter(t *testing.T) {
+ m := &SSEMarshaler{}
+ if got := string(m.Delimiter()); got != "\n\n" {
+ t.Fatalf("Delimiter() = %q, want %q", got, "\n\n")
+ }
+}
+
+// TestSSEMarshaler_DelimiterReturnsFreshSlice guards against accidental
+// sharing of the underlying array: mutating the returned slice must not
+// change the framing the next caller sees.
+func TestSSEMarshaler_DelimiterReturnsFreshSlice(t *testing.T) {
+ m := &SSEMarshaler{}
+ first := m.Delimiter()
+ first[0] = 'X'
+ if got := string(m.Delimiter()); got != "\n\n" {
+ t.Fatalf("Delimiter mutated by previous caller: got %q, want %q", got, "\n\n")
+ }
+}
+
+func TestSSEMarshaler_MarshalPrefixesDataNoTrailingNewline(t *testing.T) {
+ m := &SSEMarshaler{}
+ out, err := m.Marshal(map[string]any{"token": "hello"})
+ if err != nil {
+ t.Fatalf("Marshal: %v", err)
+ }
+ s := string(out)
+ if !strings.HasPrefix(s, "data: ") {
+ t.Fatalf("missing 'data: ' prefix: %q", s)
+ }
+ if strings.HasSuffix(s, "\n") {
+ t.Fatalf("Marshal must not append a newline (delimiter supplies framing): %q", s)
+ }
+ // The JSON after the prefix must be valid JSON with the original field.
+ var got map[string]any
+ if err := json.Unmarshal([]byte(strings.TrimPrefix(s, "data: ")), &got); err != nil {
+ t.Fatalf("payload after prefix is not valid JSON: %v (%q)", err, s)
+ }
+ if got["token"] != "hello" {
+ t.Fatalf("expected token=hello, got %v", got)
+ }
+}
+
+// TestSSEMarshaler_MultilinePayloadPrefixesEveryLine guards the SSE
+// continuation behavior: callers that opt into multiline JSON (via
+// protojson.MarshalOptions.Multiline/Indent on the embedded JSONPb) get a
+// frame where every payload line starts with "data: ". Without this,
+// EventSource truncates the frame at the first newline.
+func TestSSEMarshaler_MultilinePayloadPrefixesEveryLine(t *testing.T) {
+ m := &SSEMarshaler{
+ JSONPb: runtime.JSONPb{
+ MarshalOptions: protojson.MarshalOptions{Multiline: true, Indent: " "},
+ },
+ }
+ msg, err := structpb.NewStruct(map[string]any{"token": "hello", "index": 0})
+ if err != nil {
+ t.Fatalf("structpb.NewStruct: %v", err)
+ }
+
+ out, err := m.Marshal(msg)
+ if err != nil {
+ t.Fatalf("Marshal: %v", err)
+ }
+
+ s := string(out)
+ if !strings.HasPrefix(s, "data: ") {
+ t.Fatalf("missing 'data: ' prefix: %q", s)
+ }
+ // Every internal newline in the payload must be followed by a "data: "
+ // continuation prefix. If we strip the leading prefix, no bare newline
+ // should be left unprefixed.
+ body := strings.TrimPrefix(s, "data: ")
+ for i := 0; i < len(body); {
+ idx := strings.IndexByte(body[i:], '\n')
+ if idx < 0 {
+ break
+ }
+ after := body[i+idx+1:]
+ if !strings.HasPrefix(after, "data: ") {
+ t.Fatalf("payload contains a bare newline not followed by 'data: ' continuation prefix\nfull output:\n%s", s)
+ }
+ i += idx + 1
+ }
+}
+
+func TestSSEMarshaler_EncoderProducesValidStream(t *testing.T) {
+ m := &SSEMarshaler{}
+ var buf bytes.Buffer
+ enc := m.NewEncoder(&buf)
+ if err := enc.Encode(map[string]any{"token": "a"}); err != nil {
+ t.Fatalf("Encode #1: %v", err)
+ }
+ if err := enc.Encode(map[string]any{"token": "b"}); err != nil {
+ t.Fatalf("Encode #2: %v", err)
+ }
+
+ want := `data: {"token":"a"}` + "\n\n" + `data: {"token":"b"}` + "\n\n"
+ if got := buf.String(); got != want {
+ t.Fatalf("encoder output mismatch:\n got: %q\nwant: %q", got, want)
+ }
+}
+
+func TestSSEMarshaler_UnmarshalReturnsError(t *testing.T) {
+ m := &SSEMarshaler{}
+ err := m.Unmarshal([]byte("data: foo"), &struct{}{})
+ if err == nil {
+ t.Fatal("Unmarshal should return an error: SSE is a server-to-client format")
+ }
+ if !errors.Is(err, errSSEReadNotSupported) {
+ t.Fatalf("Unmarshal err = %v, want errSSEReadNotSupported", err)
+ }
+}
+
+func TestSSEMarshaler_DecoderReturnsError(t *testing.T) {
+ m := &SSEMarshaler{}
+ dec := m.NewDecoder(strings.NewReader("data: foo"))
+ err := dec.Decode(&struct{}{})
+ if err == nil {
+ t.Fatal("Decoder should return an error: SSE is a server-to-client format")
+ }
+ if !errors.Is(err, errSSEReadNotSupported) {
+ t.Fatalf("Decode err = %v, want errSSEReadNotSupported", err)
+ }
+}
+
+// TestSSEMarshaler_SatisfiesGatewayInterfaces guards against a future
+// refactor that drops one of the interfaces ForwardResponseStream relies on
+// (Delimited for frame separators, StreamContentType for streams).
+func TestSSEMarshaler_SatisfiesGatewayInterfaces(t *testing.T) {
+ var (
+ _ runtime.Marshaler = (*SSEMarshaler)(nil)
+ _ runtime.Delimited = (*SSEMarshaler)(nil)
+ _ runtime.StreamContentType = (*SSEMarshaler)(nil)
+ )
+}
+
+// TestSSEMarshaler_NewEncoderPropagatesWriteError ensures stream cancellation
+// (a broken pipe partway through encoding) surfaces to the handler rather
+// than being silently swallowed.
+func TestSSEMarshaler_NewEncoderPropagatesWriteError(t *testing.T) {
+ m := &SSEMarshaler{}
+ enc := m.NewEncoder(errWriter{})
+ if err := enc.Encode(map[string]any{"token": "x"}); err == nil {
+ t.Fatal("expected Encode to return write error")
+ }
+}
+
+type errWriter struct{}
+
+func (errWriter) Write(_ []byte) (int, error) { return 0, io.ErrClosedPipe }
+
+// TestBuildHTTPMuxOptions_SSERegisteredByDefault confirms that an HTTP gateway
+// built with default config selects SSEMarshaler when the client sends
+// Accept: text/event-stream. Without this, SSE clients would silently fall
+// back to the JSON marshaler and never receive event-stream framing.
+func TestBuildHTTPMuxOptions_SSERegisteredByDefault(t *testing.T) {
+ mux := runtime.NewServeMux(
+ buildHTTPMuxOptions(config.Config{}, nil, &runtime.ProtoMarshaller{})...,
+ )
+
+ req, _ := http.NewRequest(http.MethodGet, "/anything", nil)
+ req.Header.Set("Accept", "text/event-stream")
+
+ _, outbound := runtime.MarshalerForRequest(mux, req)
+ if _, ok := outbound.(*SSEMarshaler); !ok {
+ t.Fatalf("expected outbound marshaler to be *SSEMarshaler for Accept: text/event-stream, got %T", outbound)
+ }
+}
+
+// TestBuildHTTPMuxOptions_SSEDisabled confirms that setting
+// DisableSSEMarshaler suppresses the auto-registration so the gateway falls
+// back to the default JSON marshaler. Important so services that explicitly
+// don't want SSE — or want to register a custom SSE marshaler — get a clean
+// slate.
+func TestBuildHTTPMuxOptions_SSEDisabled(t *testing.T) {
+ mux := runtime.NewServeMux(
+ buildHTTPMuxOptions(config.Config{DisableSSEMarshaler: true}, nil, &runtime.ProtoMarshaller{})...,
+ )
+
+ req, _ := http.NewRequest(http.MethodGet, "/anything", nil)
+ req.Header.Set("Accept", "text/event-stream")
+
+ _, outbound := runtime.MarshalerForRequest(mux, req)
+ if _, ok := outbound.(*SSEMarshaler); ok {
+ t.Fatal("expected SSEMarshaler not to be registered when DisableSSEMarshaler is true")
+ }
+}