From 0a9e276a9e00775df9734cf03dad67664fa99ac9 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Sat, 6 Jun 2026 18:00:56 -0600 Subject: [PATCH] unbounded: keep consumer Source on close so globe + counter balance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit broflake's OnConnectionChange delivers the consumer addr on accept but a nil addr on close (the WebRTC session is already torn down, so the remote IP is gone). The close ConnectionEvent therefore carried an empty Source, and downstream consumers couldn't pair it with its accept: the Flutter globe matches the arc to remove by source IP, and the "people helped" counter decrements the same way. So closes were dropped — arcs orphaned and accumulated, and the live counter only ever grew (it equalled the lifetime total). Track each consumer slot's addr on accept (connSources, keyed by broflake's workerIdx — stable across a connection's accept->close) and restore it on close, so every -1 carries the same Source its +1 did. No event-shape change; downstream consumers already key off Source. Co-Authored-By: Claude Opus 4.8 (1M context) --- unbounded/conn_sources_test.go | 56 ++++++++++++++++++++++++++++++++++ unbounded/unbounded.go | 56 ++++++++++++++++++++++++++++++++-- unbounded/unbounded_test.go | 16 ++++++---- 3 files changed, 119 insertions(+), 9 deletions(-) create mode 100644 unbounded/conn_sources_test.go diff --git a/unbounded/conn_sources_test.go b/unbounded/conn_sources_test.go new file mode 100644 index 00000000..00e08071 --- /dev/null +++ b/unbounded/conn_sources_test.go @@ -0,0 +1,56 @@ +package unbounded + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestConnSources_resolve pins the accept→close source backfill. broflake +// delivers the consumer addr on accept but a nil (empty) addr on close, so +// resolve must restore the accept's addr onto the close — otherwise the close +// event carries an empty Source and downstream consumers (the Flutter globe +// arc + the "people helped" counter) can't match it to the accept, so the +// arc orphans and the counter never decrements. +func TestConnSources_resolve(t *testing.T) { + c := newConnSources() + + // Accept echoes its own addr and remembers it for the slot. + assert.Equal(t, "1.2.3.4", c.resolve(1, 7, "1.2.3.4"), + "accept returns its own source") + + // Close arrives with an empty addr (broflake's nil) — restore the + // accept's addr so the -1 can be matched to its +1. + assert.Equal(t, "1.2.3.4", c.resolve(-1, 7, ""), + "close restores the accept's source") + + // The slot was freed; a stale/duplicate close has nothing to restore. + assert.Equal(t, "", c.resolve(-1, 7, ""), + "close after the slot is freed restores nothing") + + // Slots are tracked independently. + c.resolve(1, 8, "5.6.7.8") + c.resolve(1, 9, "1.1.1.1") + assert.Equal(t, "5.6.7.8", c.resolve(-1, 8, ""), "slot 8 restores 8's addr") + assert.Equal(t, "1.1.1.1", c.resolve(-1, 9, ""), "slot 9 unaffected by slot 8") + + // An accept with no addr (broflake couldn't surface the consumer IP) + // stays empty through close — neither end is counted, which is correct: + // with no source there's nothing to match or draw. + assert.Equal(t, "", c.resolve(1, 10, ""), "accept with empty addr stays empty") + assert.Equal(t, "", c.resolve(-1, 10, ""), "its close stays empty too") + + // A close that already carries a real addr is passed through unchanged + // (don't clobber a good value) and still frees the slot. + c.resolve(1, 11, "9.9.9.9") + assert.Equal(t, "9.9.9.9", c.resolve(-1, 11, "9.9.9.9"), + "close with a real addr is passed through") + assert.Equal(t, "", c.resolve(-1, 11, ""), "slot 11 freed after its close") + + // Slot reuse: broflake recycles a workerIdx; a fresh accept overwrites + // the prior addr even without an intervening close. + c.resolve(1, 12, "2.2.2.2") + c.resolve(1, 12, "3.3.3.3") + assert.Equal(t, "3.3.3.3", c.resolve(-1, 12, ""), + "reused slot restores the latest accept's addr") +} diff --git a/unbounded/unbounded.go b/unbounded/unbounded.go index 2e12ef12..3d7c5d96 100644 --- a/unbounded/unbounded.go +++ b/unbounded/unbounded.go @@ -43,9 +43,9 @@ import ( // being routed through this widget proxy) connects or disconnects via // the broflake mesh. // -// State +1 on accept, -1 on close -// Source consumer's IP if broflake exposes it, otherwise empty -// Timestamp emit time in Unix milliseconds +// State +1 on accept, -1 on close +// Source consumer's IP if broflake exposes it, otherwise empty +// Timestamp emit time in Unix milliseconds // // JSON shape is identical to peer.ConnectionEvent so a consumer // reading both SSE streams can deserialize each frame with the @@ -65,6 +65,45 @@ type ConnectionEvent struct { Timestamp int64 `json:"timestamp"` } +// connSources tracks the source addr of each live consumer slot so a close +// event — which broflake delivers with a nil addr — can be re-tagged with the +// addr its accept carried. Keyed by broflake's workerIdx (the consumer slot), +// stable across a single connection's accept→close. Without this, a close +// carries an empty Source, downstream consumers (the Flutter globe + helped +// counter) can't match it to the accept, and the connection's arc/count leaks. +// Concurrency-safe: broflake fires connection-change callbacks from per-worker +// goroutines. +type connSources struct { + mu sync.Mutex + addrs map[int]string +} + +func newConnSources() *connSources { + return &connSources{addrs: make(map[int]string)} +} + +// resolve records the addr on accept (state > 0) or restores it on close +// (state < 0, where broflake's addr is nil), returning the Source the +// ConnectionEvent should carry. An accept with an empty addr is left +// untracked, so its close stays empty too — neither is counted, which is the +// right behavior when broflake can't surface the consumer IP at all. +func (c *connSources) resolve(state, workerIdx int, addrStr string) string { + c.mu.Lock() + defer c.mu.Unlock() + switch { + case state > 0: + if addrStr != "" { + c.addrs[workerIdx] = addrStr + } + case state < 0: + if addrStr == "" { + addrStr = c.addrs[workerIdx] + } + delete(c.addrs, workerIdx) + } + return addrStr +} + var manager = &unboundedManager{} // widget is the minimum interface the manager needs from a running @@ -490,6 +529,16 @@ func (m *unboundedManager) start() { // off. broflake exposes no registration point we could // disarm directly (the callback IS the registration), so the // inline ctx check is the next-best place. + // broflake hands us the consumer's addr on accept but a nil addr on + // close (the WebRTC session is already torn down, so the remote IP is + // gone). Consumers of ConnectionEvent identify a connection by its + // Source: the Flutter globe matches a close to the arc it should + // remove by source IP, and decrements its "people helped" counter the + // same way. A close with an empty Source can't be matched, so the arc + // orphans and the counter never comes back down. sources remembers + // each slot's addr on accept and restores it on close so every -1 + // carries the same Source its +1 did. + sources := newConnSources() bfOpt.OnConnectionChangeFunc = func(state int, workerIdx int, addr net.IP) { if ctx.Err() != nil { return @@ -498,6 +547,7 @@ func (m *unboundedManager) start() { if addr != nil { addrStr = addr.String() } + addrStr = sources.resolve(state, workerIdx, addrStr) slog.Debug("Unbounded: consumer connection change", "state", state, "workerIdx", workerIdx, "source", addrStr) events.Emit(ConnectionEvent{ diff --git a/unbounded/unbounded_test.go b/unbounded/unbounded_test.go index 9b0ff737..9f7fd09b 100644 --- a/unbounded/unbounded_test.go +++ b/unbounded/unbounded_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - C "github.com/getlantern/common" "github.com/getlantern/broflake/clientcore" + C "github.com/getlantern/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -635,10 +635,13 @@ func TestStopCtx_TimesOut(t *testing.T) { // TestConnectionEventBridge pins the observable API this package // adds: broflake's OnConnectionChangeFunc callback must translate // (state, workerIdx, addr) into a ConnectionEvent with the matching -// State, the addr.String() Source (empty when addr is nil), and a -// freshly-stamped Timestamp. Capture the callback that start -// installs on bfOpt via a fake newWidget, invoke it with both nil -// and non-nil addrs, then assert the events arriving via +// State, the consumer Source, and a freshly-stamped Timestamp. +// broflake hands the callback a nil addr on close, so the close's +// Source is backfilled from the addr its accept carried (see +// connSources) — otherwise downstream consumers (the Flutter globe + +// helped counter) couldn't match the close to its accept. Capture the +// callback that start installs on bfOpt via a fake newWidget, invoke +// accept-then-close on one slot, then assert the events arriving via // events.Subscribe. func TestConnectionEventBridge(t *testing.T) { var captured atomic.Pointer[clientcore.ConnectionChangeFunc] @@ -690,7 +693,8 @@ func TestConnectionEventBridge(t *testing.T) { require.Contains(t, byState, 1, "expected an accept event (State=+1)") require.Contains(t, byState, -1, "expected a close event (State=-1)") assert.Equal(t, "198.51.100.42", byState[1].Source, "accept Source") - assert.Equal(t, "", byState[-1].Source, "close Source (nil addr -> empty string)") + assert.Equal(t, "198.51.100.42", byState[-1].Source, + "close Source is backfilled from the accept (broflake delivers a nil addr on close)") for state, evt := range byState { assert.GreaterOrEqual(t, evt.Timestamp, before, "State=%d Timestamp not before emit", state) assert.LessOrEqual(t, evt.Timestamp, after, "State=%d Timestamp not after emit", state)