Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions cmd/unified-server/handlers_tracks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bufio"
"bytes"
"context"
"encoding/binary"
Expand All @@ -13,6 +14,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/vmihailenco/msgpack/v5"
"safecast-new-map/pkg/database"
Expand Down Expand Up @@ -668,18 +670,44 @@ func streamMarkersHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
}
w.Header().Set("Cache-Control", "no-cache")
// Disable nginx response buffering so chunks reach the client as they're flushed.
w.Header().Set("X-Accel-Buffering", "no")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}

// Coalesce small per-marker writes into ~64KB chunks before they hit the
// chunked HTTP encoder. Without this, each marker becomes its own chunk +
// flush, which serializes ~70k tiny writes for a single viewport.
bw := bufio.NewWriterSize(w, 64*1024)

// Batch network flushes: flush when the buffered writer fills naturally,
// or every flushEveryMarkers, or every flushEveryDur — whichever comes first.
const flushEveryMarkers = 256
const flushEveryDur = 50 * time.Millisecond
pending := 0
lastFlush := time.Now()
flush := func() {
_ = bw.Flush()
flusher.Flush()
pending = 0
lastFlush = time.Now()
}
maybeFlush := func() {
pending++
if pending >= flushEveryMarkers || time.Since(lastFlush) >= flushEveryDur {
flush()
}
}

// Helper to write a msgpack frame: 4-byte length prefix + data
writeMsgpackFrame := func(data []byte) {
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(data)))
w.Write(lenBuf)
w.Write(data)
bw.Write(lenBuf)
bw.Write(data)
}

// Emit realtime markers first when enabled.
Expand All @@ -689,10 +717,10 @@ func streamMarkersHandler(w http.ResponseWriter, r *http.Request) {
writeMsgpackFrame(b)
} else {
b, _ := json.Marshal(m)
fmt.Fprintf(w, "data: %s\n\n", b)
fmt.Fprintf(bw, "data: %s\n\n", b)
}
}
flusher.Flush()
flush()

for {
select {
Expand All @@ -706,33 +734,33 @@ func streamMarkersHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
if useMsgpack {
// Send zero-length frame as error indicator
w.Write([]byte{0, 0, 0, 0})
bw.Write([]byte{0, 0, 0, 0})
} else {
fmt.Fprintf(w, "event: done\ndata: %v\n\n", err)
fmt.Fprintf(bw, "event: done\ndata: %v\n\n", err)
}
flusher.Flush()
flush()
return
}
errCh = nil
case m, ok := <-agg:
if !ok {
if useMsgpack {
// Send zero-length frame as end marker
w.Write([]byte{0, 0, 0, 0})
bw.Write([]byte{0, 0, 0, 0})
} else {
fmt.Fprint(w, "event: done\ndata: end\n\n")
fmt.Fprint(bw, "event: done\ndata: end\n\n")
}
flusher.Flush()
flush()
return
}
if useMsgpack {
b, _ := msgpack.Marshal(m)
writeMsgpackFrame(b)
} else {
b, _ := json.Marshal(m)
fmt.Fprintf(w, "data: %s\n\n", b)
fmt.Fprintf(bw, "data: %s\n\n", b)
}
flusher.Flush()
maybeFlush()
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/httpapi/static_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ type StaticRoutesConfig struct {
}

// RegisterStaticRoutes attaches static asset routes to mux.
// /static serves files from embedded assets and /js serves files from a
// filesystem directory for runtime JS worker compatibility.
// Both /static/ and /js/ serve from the embedded StaticFS so they work
// regardless of the process working directory in production.
func RegisterStaticRoutes(mux *http.ServeMux, cfg StaticRoutesConfig) {
if mux == nil {
return
}
if cfg.StaticFS != nil {
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(cfg.StaticFS))))
}
if cfg.JSDir != "" {
fileServer := http.FileServer(http.FS(cfg.StaticFS))
mux.Handle("/static/", http.StripPrefix("/static/", fileServer))
// /js/foo.js → js/foo.js inside StaticFS (i.e. public_html/js/foo.js).
mux.Handle("/js/", fileServer)
} else if cfg.JSDir != "" {
mux.Handle("/js/", http.StripPrefix("/js/", http.FileServer(http.Dir(cfg.JSDir))))
}
}
Loading