Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions unbounded/conn_sources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package unbounded

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestConnSources_resolve pins the accept→close source backfill. broflake
// delivers the consumer addr on accept but a nil (empty) addr on close, so
// resolve must restore the accept's addr onto the close — otherwise the close
// event carries an empty Source and downstream consumers (the Flutter globe
// arc + the "people helped" counter) can't match it to the accept, so the
// arc orphans and the counter never decrements.
func TestConnSources_resolve(t *testing.T) {
c := newConnSources()

// Accept echoes its own addr and remembers it for the slot.
assert.Equal(t, "1.2.3.4", c.resolve(1, 7, "1.2.3.4"),
"accept returns its own source")

// Close arrives with an empty addr (broflake's nil) — restore the
// accept's addr so the -1 can be matched to its +1.
assert.Equal(t, "1.2.3.4", c.resolve(-1, 7, ""),
"close restores the accept's source")

// The slot was freed; a stale/duplicate close has nothing to restore.
assert.Equal(t, "", c.resolve(-1, 7, ""),
"close after the slot is freed restores nothing")

// Slots are tracked independently.
c.resolve(1, 8, "5.6.7.8")
c.resolve(1, 9, "1.1.1.1")
assert.Equal(t, "5.6.7.8", c.resolve(-1, 8, ""), "slot 8 restores 8's addr")
assert.Equal(t, "1.1.1.1", c.resolve(-1, 9, ""), "slot 9 unaffected by slot 8")

// An accept with no addr (broflake couldn't surface the consumer IP)
// stays empty through close — neither end is counted, which is correct:
// with no source there's nothing to match or draw.
assert.Equal(t, "", c.resolve(1, 10, ""), "accept with empty addr stays empty")
assert.Equal(t, "", c.resolve(-1, 10, ""), "its close stays empty too")

// A close that already carries a real addr is passed through unchanged
// (don't clobber a good value) and still frees the slot.
c.resolve(1, 11, "9.9.9.9")
assert.Equal(t, "9.9.9.9", c.resolve(-1, 11, "9.9.9.9"),
"close with a real addr is passed through")
assert.Equal(t, "", c.resolve(-1, 11, ""), "slot 11 freed after its close")

// Slot reuse: broflake recycles a workerIdx; a fresh accept overwrites
// the prior addr even without an intervening close.
c.resolve(1, 12, "2.2.2.2")
c.resolve(1, 12, "3.3.3.3")
assert.Equal(t, "3.3.3.3", c.resolve(-1, 12, ""),
"reused slot restores the latest accept's addr")
Comment on lines +50 to +55
}
56 changes: 53 additions & 3 deletions unbounded/unbounded.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ import (
// being routed through this widget proxy) connects or disconnects via
// the broflake mesh.
//
// State +1 on accept, -1 on close
// Source consumer's IP if broflake exposes it, otherwise empty
// Timestamp emit time in Unix milliseconds
// State +1 on accept, -1 on close
// Source consumer's IP if broflake exposes it, otherwise empty
// Timestamp emit time in Unix milliseconds
//
// JSON shape is identical to peer.ConnectionEvent so a consumer
// reading both SSE streams can deserialize each frame with the
Expand All @@ -65,6 +65,45 @@ type ConnectionEvent struct {
Timestamp int64 `json:"timestamp"`
}

// connSources tracks the source addr of each live consumer slot so a close
// event — which broflake delivers with a nil addr — can be re-tagged with the
// addr its accept carried. Keyed by broflake's workerIdx (the consumer slot),
// stable across a single connection's accept→close. Without this, a close
// carries an empty Source, downstream consumers (the Flutter globe + helped
// counter) can't match it to the accept, and the connection's arc/count leaks.
// Concurrency-safe: broflake fires connection-change callbacks from per-worker
// goroutines.
type connSources struct {
mu sync.Mutex
addrs map[int]string
}

func newConnSources() *connSources {
return &connSources{addrs: make(map[int]string)}
}

// resolve records the addr on accept (state > 0) or restores it on close
// (state < 0, where broflake's addr is nil), returning the Source the
// ConnectionEvent should carry. An accept with an empty addr is left
// untracked, so its close stays empty too — neither is counted, which is the
// right behavior when broflake can't surface the consumer IP at all.
func (c *connSources) resolve(state, workerIdx int, addrStr string) string {
c.mu.Lock()
defer c.mu.Unlock()
switch {
case state > 0:
if addrStr != "" {
c.addrs[workerIdx] = addrStr
}
Comment on lines +94 to +97
case state < 0:
if addrStr == "" {
addrStr = c.addrs[workerIdx]
}
delete(c.addrs, workerIdx)
}
return addrStr
}

var manager = &unboundedManager{}

// widget is the minimum interface the manager needs from a running
Expand Down Expand Up @@ -490,6 +529,16 @@ func (m *unboundedManager) start() {
// off. broflake exposes no registration point we could
// disarm directly (the callback IS the registration), so the
// inline ctx check is the next-best place.
// broflake hands us the consumer's addr on accept but a nil addr on
// close (the WebRTC session is already torn down, so the remote IP is
// gone). Consumers of ConnectionEvent identify a connection by its
// Source: the Flutter globe matches a close to the arc it should
// remove by source IP, and decrements its "people helped" counter the
// same way. A close with an empty Source can't be matched, so the arc
// orphans and the counter never comes back down. sources remembers
// each slot's addr on accept and restores it on close so every -1
// carries the same Source its +1 did.
sources := newConnSources()
bfOpt.OnConnectionChangeFunc = func(state int, workerIdx int, addr net.IP) {
if ctx.Err() != nil {
return
Expand All @@ -498,6 +547,7 @@ func (m *unboundedManager) start() {
if addr != nil {
addrStr = addr.String()
}
addrStr = sources.resolve(state, workerIdx, addrStr)
slog.Debug("Unbounded: consumer connection change",
"state", state, "workerIdx", workerIdx, "source", addrStr)
events.Emit(ConnectionEvent{
Expand Down
16 changes: 10 additions & 6 deletions unbounded/unbounded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"testing"
"time"

C "github.com/getlantern/common"
"github.com/getlantern/broflake/clientcore"
C "github.com/getlantern/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -635,10 +635,13 @@ func TestStopCtx_TimesOut(t *testing.T) {
// TestConnectionEventBridge pins the observable API this package
// adds: broflake's OnConnectionChangeFunc callback must translate
// (state, workerIdx, addr) into a ConnectionEvent with the matching
// State, the addr.String() Source (empty when addr is nil), and a
// freshly-stamped Timestamp. Capture the callback that start
// installs on bfOpt via a fake newWidget, invoke it with both nil
// and non-nil addrs, then assert the events arriving via
// State, the consumer Source, and a freshly-stamped Timestamp.
// broflake hands the callback a nil addr on close, so the close's
// Source is backfilled from the addr its accept carried (see
// connSources) — otherwise downstream consumers (the Flutter globe +
// helped counter) couldn't match the close to its accept. Capture the
// callback that start installs on bfOpt via a fake newWidget, invoke
// accept-then-close on one slot, then assert the events arriving via
// events.Subscribe.
func TestConnectionEventBridge(t *testing.T) {
var captured atomic.Pointer[clientcore.ConnectionChangeFunc]
Expand Down Expand Up @@ -690,7 +693,8 @@ func TestConnectionEventBridge(t *testing.T) {
require.Contains(t, byState, 1, "expected an accept event (State=+1)")
require.Contains(t, byState, -1, "expected a close event (State=-1)")
assert.Equal(t, "198.51.100.42", byState[1].Source, "accept Source")
assert.Equal(t, "", byState[-1].Source, "close Source (nil addr -> empty string)")
assert.Equal(t, "198.51.100.42", byState[-1].Source,
"close Source is backfilled from the accept (broflake delivers a nil addr on close)")
for state, evt := range byState {
assert.GreaterOrEqual(t, evt.Timestamp, before, "State=%d Timestamp not before emit", state)
assert.LessOrEqual(t, evt.Timestamp, after, "State=%d Timestamp not after emit", state)
Expand Down
Loading