From aea5c7d8971cb069f72f34a2212c7fac2d68b718 Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Thu, 21 May 2026 18:57:24 +0200 Subject: [PATCH 1/2] banditcallback: emit per-device first-seen to lantern-cloud MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The lantern-cloud bandit catalog now selects http-proxy arms for legacy clients via the unified action space, but those arms had no EXP3 reward signal because clients on the lantern-http-proxy backend don't make the callback at URL-test completion the way sing-box clients do natively. Without a signal, EXP3 weights for http-proxy arms stayed flat at their cold-start prior — explored via gamma but never reinforced. This change has the http-proxy daemon emit the callback on behalf of those clients. On the first request from a device-id within a TTL window, the new banditcallback.Emitter fires an async best-effort GET to the API's /v1/bandit/callback?token=&did= endpoint. The arm-token is plumbed at provisioning via two new INI keys (banditcallbacktoken / banditcallbackurl); the API discriminates arm-tokens from per-probe tokens by the `arm-` prefix and writes a flat-reward update straight to EXP3 + per-route signals. A device's first connection within the TTL window triggers one emit; subsequent connections from the same device are suppressed in-memory until the window rolls over. Map sweep is opportunistic on the same TTL cadence, so worst-case memory is ~2× unique devices per window without a dedicated reaper goroutine. The API does its own SET-NX-based dedup as defense against a restarted/multi-replica daemon losing the LRU and re-firing within the window. The filter installs after tokenfilter (auth) but before devicefilter (throttling) — devicefilter skips when ReportingRedisClient is nil for pro proxies, but the bandit still wants signal for pro arms. OnFirstOnly because we only need the device-id header once per connection. No-op when Token/URL are empty, so non-bandit-eligible tracks carrying the same binary stay silent. Co-Authored-By: Claude Opus 4.7 (1M context) --- banditcallback/banditcallback.go | 192 ++++++++++++++++++++++++++ banditcallback/banditcallback_test.go | 159 +++++++++++++++++++++ http-proxy/main.go | 11 ++ http_proxy.go | 32 +++++ 4 files changed, 394 insertions(+) create mode 100644 banditcallback/banditcallback.go create mode 100644 banditcallback/banditcallback_test.go diff --git a/banditcallback/banditcallback.go b/banditcallback/banditcallback.go new file mode 100644 index 00000000..9161f7e2 --- /dev/null +++ b/banditcallback/banditcallback.go @@ -0,0 +1,192 @@ +// Package banditcallback emits a best-effort per-device first-seen +// callback to the lantern-cloud bandit API so http-proxy arms get a +// reward signal without the client having to make the callback itself. +// +// The emitter holds a TTL-bounded map of device-ids it has already +// reported within the window. A device's first connection within the +// window triggers an async GET to the configured callback URL with the +// per-arm token (plumbed at provisioning) and the device-id; subsequent +// connections from the same device within TTL are suppressed. After the +// window, the next connection re-fires the callback. This matches the +// API-side dedup TTL so legitimate "device returns after a gap" events +// land, while a misbehaving client hammering the proxy can't pump the +// arm's reward signal. +package banditcallback + +import ( + "context" + "net/http" + "net/url" + "sync" + "time" + + "github.com/getlantern/golog" + "github.com/getlantern/proxy/v3/filters" +) + +var log = golog.LoggerFor("banditcallback") + +// Emitter fires per-device first-seen callbacks to the lantern-cloud +// API. Disabled when CallbackURL or Token is empty — the right +// default for non-bandit-eligible tracks. +type Emitter struct { + token string + callbackURL string + ttl time.Duration + client *http.Client + + mu sync.Mutex + seen map[string]time.Time + lastSweep time.Time + + // Counters exposed for the proxy's metrics layer; readable via + // the accessor methods. Bumped under mu since the proxy reads + // them rarely (periodic flush) and bumps them on a hot path + // (every device-id request); atomic.AddUint64 would add a CPU + // barrier on every Apply for no observable win. + emitted uint64 + suppressed uint64 +} + +// New returns an emitter. token and callbackURL come from the daemon's +// INI (banditcallbacktoken / banditcallbackurl). ttl is the per-device +// dedup window — typically matches the API-side ProbeTTLForPollInterval +// at the daemon's expected poll. Zero ttl uses 10m as a sensible default. +func New(token, callbackURL string, ttl time.Duration) *Emitter { + if ttl <= 0 { + ttl = 10 * time.Minute + } + return &Emitter{ + token: token, + callbackURL: callbackURL, + ttl: ttl, + client: &http.Client{ + Timeout: 5 * time.Second, + }, + seen: make(map[string]time.Time), + } +} + +// Enabled reports whether the emitter will actually fire callbacks. +// Callers can use this to skip computing the device-id when the daemon +// wasn't provisioned with a token. +func (e *Emitter) Enabled() bool { + return e != nil && e.token != "" && e.callbackURL != "" +} + +// EmitIfFirstSeen records the device-id and, if it hasn't been seen +// within the TTL window, fires an async best-effort GET to the +// configured callback URL. Returns immediately — the HTTP request runs +// in its own goroutine and any failure is logged at debug only (the +// callback is a hint to the bandit, not a correctness signal). +func (e *Emitter) EmitIfFirstSeen(ctx context.Context, deviceID string) { + if !e.Enabled() || deviceID == "" { + return + } + + now := time.Now() + first := e.checkAndRecord(deviceID, now) + if !first { + return + } + + go e.fire(ctx, deviceID) +} + +// checkAndRecord returns true if the deviceID is first-seen within the +// TTL window. The map is swept opportunistically whenever the gap +// since the last sweep exceeds the TTL — bounding worst-case map size +// at ~2× the unique device-ids seen in one TTL window without a +// dedicated reaper goroutine. +func (e *Emitter) checkAndRecord(deviceID string, now time.Time) bool { + e.mu.Lock() + defer e.mu.Unlock() + + if now.Sub(e.lastSweep) > e.ttl { + for k, t := range e.seen { + if now.Sub(t) > e.ttl { + delete(e.seen, k) + } + } + e.lastSweep = now + } + + if prev, ok := e.seen[deviceID]; ok && now.Sub(prev) <= e.ttl { + e.suppressed++ + return false + } + e.seen[deviceID] = now + e.emitted++ + return true +} + +func (e *Emitter) fire(ctx context.Context, deviceID string) { + // Detach from the request context so a closing client connection + // doesn't cancel the outbound HTTP request. The callback is for + // the bandit's benefit, not the client's; we want it to complete + // even if the client just hung up. + fireCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + u, err := url.Parse(e.callbackURL) + if err != nil { + log.Debugf("banditcallback: invalid callback URL %q: %v", e.callbackURL, err) + return + } + q := u.Query() + q.Set("token", e.token) + q.Set("did", deviceID) + u.RawQuery = q.Encode() + + req, err := http.NewRequestWithContext(fireCtx, http.MethodGet, u.String(), nil) + if err != nil { + log.Debugf("banditcallback: build request: %v", err) + return + } + + resp, err := e.client.Do(req) + if err != nil { + log.Debugf("banditcallback: request failed: %v", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + log.Debugf("banditcallback: API returned %d for did %s", resp.StatusCode, deviceID) + } +} + +// Stats returns (emitted, suppressed) counter values. Cheap; takes the +// same mu the hot path uses but the hot path holds it for microseconds. +func (e *Emitter) Stats() (emitted, suppressed uint64) { + e.mu.Lock() + defer e.mu.Unlock() + return e.emitted, e.suppressed +} + +// Filter is a proxy filter that calls EmitIfFirstSeen on the device-id +// header. Wraps Emitter so the proxy can install it in its filter chain +// independently of devicefilter (which only runs for non-pro tracks). +// The filter is a near-noop hot path: under TTL, just a map read + +// counter bump under a brief mutex; misses kick off an async goroutine. +type Filter struct { + deviceIDHeader string + emitter *Emitter +} + +// NewFilter returns a proxy filter that drives the emitter from the +// request's device-id header. headerName is typically common.DeviceIdHeader +// so the filter package itself doesn't depend on the http-proxy-lantern +// common package. +func NewFilter(headerName string, emitter *Emitter) *Filter { + return &Filter{deviceIDHeader: headerName, emitter: emitter} +} + +// Apply implements filters.Filter. Forwards unconditionally (the +// emitter is a side-effect; failures are non-fatal). +func (f *Filter) Apply(cs *filters.ConnectionState, req *http.Request, next filters.Next) (*http.Response, *filters.ConnectionState, error) { + if f.emitter != nil && f.emitter.Enabled() { + f.emitter.EmitIfFirstSeen(req.Context(), req.Header.Get(f.deviceIDHeader)) + } + return next(cs, req) +} diff --git a/banditcallback/banditcallback_test.go b/banditcallback/banditcallback_test.go new file mode 100644 index 00000000..d9f386f4 --- /dev/null +++ b/banditcallback/banditcallback_test.go @@ -0,0 +1,159 @@ +package banditcallback + +import ( + "context" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestEmitter_DisabledWhenUnconfigured(t *testing.T) { + // Empty token disables emission entirely so non-bandit-eligible + // tracks can carry the same daemon binary without firing + // callbacks. Verify Enabled and that EmitIfFirstSeen is a no-op. + cases := []struct { + name string + token string + callbackURL string + }{ + {"empty token", "", "https://api.example/v1/bandit/callback"}, + {"empty url", "arm-xyz", ""}, + {"both empty", "", ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + e := New(tc.token, tc.callbackURL, time.Minute) + if e.Enabled() { + t.Fatal("expected disabled") + } + e.EmitIfFirstSeen(context.Background(), "did-1") + emitted, _ := e.Stats() + if emitted != 0 { + t.Fatalf("expected 0 emits, got %d", emitted) + } + }) + } +} + +func TestEmitter_FirstSeenFires(t *testing.T) { + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&hits, 1) + if got := r.URL.Query().Get("token"); got != "arm-test" { + t.Errorf("token mismatch: %q", got) + } + if got := r.URL.Query().Get("did"); got == "" { + t.Error("missing did") + } + w.WriteHeader(http.StatusNoContent) + })) + defer srv.Close() + + e := New("arm-test", srv.URL, time.Minute) + e.EmitIfFirstSeen(context.Background(), "device-a") + + // Emission is async; wait for the goroutine. 1s ceiling is generous. + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&hits) == 1 { + break + } + time.Sleep(5 * time.Millisecond) + } + if got := atomic.LoadInt32(&hits); got != 1 { + t.Fatalf("expected 1 callback hit, got %d", got) + } + + emitted, suppressed := e.Stats() + if emitted != 1 || suppressed != 0 { + t.Fatalf("counters: emitted=%d suppressed=%d", emitted, suppressed) + } +} + +func TestEmitter_DedupSuppressesRepeat(t *testing.T) { + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&hits, 1) + w.WriteHeader(http.StatusNoContent) + })) + defer srv.Close() + + e := New("arm-test", srv.URL, time.Minute) + for i := 0; i < 10; i++ { + e.EmitIfFirstSeen(context.Background(), "device-a") + } + + // One emit, nine suppressed. Wait for async emit to complete. + deadline := time.Now().Add(time.Second) + for time.Now().Before(deadline) { + emitted, suppressed := e.Stats() + if emitted == 1 && suppressed == 9 { + break + } + time.Sleep(5 * time.Millisecond) + } + emitted, suppressed := e.Stats() + if emitted != 1 || suppressed != 9 { + t.Fatalf("counters: emitted=%d suppressed=%d", emitted, suppressed) + } + // Confirm the server only received one hit. + time.Sleep(50 * time.Millisecond) + if got := atomic.LoadInt32(&hits); got != 1 { + t.Fatalf("expected 1 callback hit, got %d", got) + } +} + +func TestEmitter_ConcurrentFirstSeenIsSingleFire(t *testing.T) { + // 100 goroutines racing on the same device-id must yield exactly + // one outbound call. This is the contention case the mu lock is + // designed for; a TOCTOU bug here would explode reward signal. + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&hits, 1) + w.WriteHeader(http.StatusNoContent) + })) + defer srv.Close() + + e := New("arm-test", srv.URL, time.Minute) + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + e.EmitIfFirstSeen(context.Background(), "race-device") + }() + } + wg.Wait() + + // Wait for async send. + time.Sleep(200 * time.Millisecond) + if got := atomic.LoadInt32(&hits); got != 1 { + t.Fatalf("expected exactly 1 hit under contention, got %d", got) + } +} + +func TestEmitter_ReEmitsAfterTTL(t *testing.T) { + // After the TTL window, a returning device should re-emit. Use a + // tiny TTL to keep the test fast. + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&hits, 1) + w.WriteHeader(http.StatusNoContent) + })) + defer srv.Close() + + e := New("arm-test", srv.URL, 50*time.Millisecond) + e.EmitIfFirstSeen(context.Background(), "ttl-device") + time.Sleep(20 * time.Millisecond) // within TTL — suppressed + e.EmitIfFirstSeen(context.Background(), "ttl-device") + time.Sleep(80 * time.Millisecond) // past TTL — fires again + e.EmitIfFirstSeen(context.Background(), "ttl-device") + + time.Sleep(100 * time.Millisecond) + if got := atomic.LoadInt32(&hits); got != 2 { + t.Fatalf("expected 2 hits across TTL window, got %d", got) + } +} diff --git a/http-proxy/main.go b/http-proxy/main.go index 36c7d771..9c82b633 100644 --- a/http-proxy/main.go +++ b/http-proxy/main.go @@ -89,6 +89,14 @@ var ( cfgSvrAuthToken = flag.String("cfgsvrauthtoken", "", "Token attached to config-server requests, not attaching if empty") connectOKWaitsForUpstream = flag.Bool("connect-ok-waits-for-upstream", false, "Set to true to wait for upstream connection before responding OK to CONNECT requests") + // Per-arm bandit callback. Plumbed at provisioning by the + // lantern-cloud VPS provisioner. Empty token disables emission, so + // non-bandit-eligible tracks (or older provisioner builds) carry + // the same binary without firing any callbacks. + banditCallbackToken = flag.String("banditcallbacktoken", "", "Per-arm bandit callback token plumbed by the provisioner") + banditCallbackURL = flag.String("banditcallbackurl", "", "Full URL of the /v1/bandit/callback endpoint") + banditCallbackTTL = flag.Duration("banditcallbackttl", 10*time.Minute, "Per-device dedup window for bandit callback emission") + throttleRefreshInterval = flag.Duration("throttlerefresh", throttle.DefaultRefreshInterval, "Specifies how frequently to refresh throttling configuration from redis. Defaults to 5 minutes.") enableMultipath = flag.Bool("enablemultipath", false, "Enable multipath. Only clients support multipath can communicate with it.") @@ -395,6 +403,9 @@ func main() { KeyFile: *keyfile, CfgSvrAuthToken: *cfgSvrAuthToken, ConnectOKWaitsForUpstream: *connectOKWaitsForUpstream, + BanditCallbackToken: *banditCallbackToken, + BanditCallbackURL: *banditCallbackURL, + BanditCallbackTTL: *banditCallbackTTL, EnableMultipath: *enableMultipath, ThrottleRefreshInterval: *throttleRefreshInterval, TracesSampleRate: *tracesSampleRate, diff --git a/http_proxy.go b/http_proxy.go index 48492126..b166ad32 100644 --- a/http_proxy.go +++ b/http_proxy.go @@ -28,7 +28,9 @@ import ( "github.com/getlantern/gonat" "github.com/getlantern/kcpwrapper" + "github.com/getlantern/http-proxy-lantern/v2/banditcallback" "github.com/getlantern/http-proxy-lantern/v2/broflake" + "github.com/getlantern/http-proxy-lantern/v2/common" "github.com/getlantern/http-proxy-lantern/v2/opsfilter" "github.com/getlantern/http-proxy-lantern/v2/otel" "github.com/getlantern/http-proxy-lantern/v2/shadowsocks" @@ -177,6 +179,15 @@ type Proxy struct { CountryLookup geo.CountryLookup ISPLookup geo.ISPLookup + // Per-arm bandit callback. Plumbed at provisioning by the + // lantern-cloud VPS provisioner; empty BanditCallbackToken + // disables emission entirely. The emitter is constructed once in + // ListenAndServe and then referenced by createFilterChain. + BanditCallbackToken string + BanditCallbackURL string + BanditCallbackTTL time.Duration + banditCallbackEmitter *banditcallback.Emitter + MultiplexProtocol string SmuxVersion int SmuxMaxFrameSize int @@ -236,6 +247,14 @@ func (p *Proxy) ListenAndServe(ctx context.Context) error { return errors.New("Unable to configure instrumentation: %v", err) } + // Per-arm bandit callback emitter. New is cheap and safe to call + // with empty token/URL — Enabled() reports false and EmitIfFirstSeen + // becomes a no-op, so the filter installs but stays silent on + // non-bandit-eligible builds. + p.banditCallbackEmitter = banditcallback.New( + p.BanditCallbackToken, p.BanditCallbackURL, p.BanditCallbackTTL, + ) + var onServerError func(conn net.Conn, err error) if err := p.setupPacketForward(); err != nil { log.Errorf("Unable to set up packet forwarding, will continue to start up: %v", err) @@ -523,6 +542,19 @@ func (p *Proxy) createFilterChain(bl *blacklist.Blacklist) (filters.Chain, proxy filterChain = filterChain.Append(proxy.OnFirstOnly(tokenfilter.New(p.Token, p.instrument))) } + // Per-arm bandit callback emitter. Sits after auth so we don't + // fire on unauthenticated noise, but before devicefilter so it + // runs for pro tracks too (devicefilter skips when + // ReportingRedisClient is nil for pro proxies, but the bandit + // still wants signal for those arms). OnFirstOnly because we only + // need the header once per connection — same as the other + // auth-adjacent filters. No-op when Token/URL are empty. + if p.banditCallbackEmitter != nil && p.banditCallbackEmitter.Enabled() { + filterChain = filterChain.Append( + proxy.OnFirstOnly(banditcallback.NewFilter(common.DeviceIdHeader, p.banditCallbackEmitter)), + ) + } + if p.ReportingRedisClient == nil { log.Debug("Not enabling bandwidth limiting") } else { From 7370830e9462d464ab4c2900388137d1db59925e Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Fri, 22 May 2026 11:01:17 +0200 Subject: [PATCH 2/2] banditcallback: clarify filter wiring comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Copilot review feedback on #668: the two comments around the banditcallback filter implied the filter "installs but stays silent" when token/URL are empty, but createFilterChain actually skips the append entirely when the emitter is disabled. Update both comments to match the real behavior: - banditcallback.New is still cheap to call with empty inputs (the emitter's Enabled() reports false), but the filter is never installed in that case — zero per-request work on non-bandit-eligible builds. - Note the benchmark-mode caveat (no tokenfilter), since the "after auth" placement only holds in the production path. No behavior change; comments only. Co-Authored-By: Claude Opus 4.7 (1M context) --- http_proxy.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/http_proxy.go b/http_proxy.go index b166ad32..c6e9403c 100644 --- a/http_proxy.go +++ b/http_proxy.go @@ -248,9 +248,9 @@ func (p *Proxy) ListenAndServe(ctx context.Context) error { } // Per-arm bandit callback emitter. New is cheap and safe to call - // with empty token/URL — Enabled() reports false and EmitIfFirstSeen - // becomes a no-op, so the filter installs but stays silent on - // non-bandit-eligible builds. + // with empty token/URL — Enabled() reports false in that case and + // createFilterChain skips appending the filter entirely, so a + // non-bandit-eligible build runs no extra per-request code. p.banditCallbackEmitter = banditcallback.New( p.BanditCallbackToken, p.BanditCallbackURL, p.BanditCallbackTTL, ) @@ -542,13 +542,19 @@ func (p *Proxy) createFilterChain(bl *blacklist.Blacklist) (filters.Chain, proxy filterChain = filterChain.Append(proxy.OnFirstOnly(tokenfilter.New(p.Token, p.instrument))) } - // Per-arm bandit callback emitter. Sits after auth so we don't - // fire on unauthenticated noise, but before devicefilter so it - // runs for pro tracks too (devicefilter skips when - // ReportingRedisClient is nil for pro proxies, but the bandit - // still wants signal for those arms). OnFirstOnly because we only - // need the header once per connection — same as the other - // auth-adjacent filters. No-op when Token/URL are empty. + // Per-arm bandit callback emitter. Appended only when the + // emitter is enabled (non-empty token + URL); on a + // non-bandit-eligible build the filter is never installed and + // adds zero per-request work. Placement: after tokenfilter so we + // don't fire on unauthenticated noise (note: benchmark mode + // skips tokenfilter entirely, so this filter would see + // unauthenticated traffic there — fine because bench mode is a + // local-only test setup), and before devicefilter so the + // emitter still runs for pro tracks (devicefilter is gated on + // ReportingRedisClient, which pro proxies don't set, but the + // bandit still wants signal for pro arms). OnFirstOnly because + // the device-id header only needs to be read once per + // connection — matches the other auth-adjacent filters. if p.banditCallbackEmitter != nil && p.banditCallbackEmitter.Enabled() { filterChain = filterChain.Append( proxy.OnFirstOnly(banditcallback.NewFilter(common.DeviceIdHeader, p.banditCallbackEmitter)),