Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,88 @@ func (s *Server) ListConnectionInfo() (result []*ConnInfo) {
return
}

func prometheusEscapeLabelValue(s string) string {
s = strings.ReplaceAll(s, `\`, `\\`)
s = strings.ReplaceAll(s, "\n", `\n`)
s = strings.ReplaceAll(s, `"`, `\"`)
return s
}

func prometheusLabelValueOrUnknown(s string) string {
if s == "" {
return "unknown"
}
return s
}

func prometheusLabels(index uint32, module, upstream string) string {
return fmt.Sprintf(
`index="%d",module="%s",upstream="%s"`,
index,
prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(module)),
prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(upstream)),
)
}

func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) {
connections := s.ListConnectionInfo()

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_active_connections Current active rsync proxy connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_active_connections gauge")
_, _ = fmt.Fprintf(w, "rsync_proxy_active_connections %d\n", s.GetActiveConnectionCount())

connectionCounts := make(map[string]int)
for _, conn := range connections {
module := prometheusLabelValueOrUnknown(conn.Module)
upstream := prometheusLabelValueOrUnknown(conn.UpstreamAddr)
key := module + "\xff" + upstream
connectionCounts[key]++
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e617a49. ConnInfo now protects mutable metadata with an RW mutex, and metrics/status serialization read a consistent snapshot instead of directly reading fields that relay updates concurrently. I also verified the active metrics path with go test -race.

}

keys := make([]string, 0, len(connectionCounts))
for key := range connectionCounts {
keys = append(keys, key)
}
sort.Strings(keys)

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_active_connections_by_module Current active rsync proxy connections by module and upstream.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_active_connections_by_module gauge")
for _, key := range keys {
parts := strings.SplitN(key, "\xff", 2)
module := prometheusEscapeLabelValue(parts[0])
upstream := prometheusEscapeLabelValue(parts[1])
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e617a49. The aggregation key is now a structured {module, upstream} value instead of a delimiter-joined string, so client-controlled module names cannot collide with upstream labels.

_, _ = fmt.Fprintf(w, "rsync_proxy_active_connections_by_module{module=\"%s\",upstream=\"%s\"} %d\n", module, upstream, connectionCounts[key])
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_sent_bytes Bytes sent to clients for active connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_sent_bytes gauge")
for _, conn := range connections {
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_sent_bytes{%s} %d\n", prometheusLabels(conn.Index, conn.Module, conn.UpstreamAddr), conn.SentBytes.Load())
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_received_bytes Bytes received from clients for active connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_received_bytes gauge")
for _, conn := range connections {
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_received_bytes{%s} %d\n", prometheusLabels(conn.Index, conn.Module, conn.UpstreamAddr), conn.ReceivedBytes.Load())
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_connected_timestamp_seconds Unix timestamp when active connections were established.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_connected_timestamp_seconds gauge")
for _, conn := range connections {
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_connected_timestamp_seconds{%s} %d\n", prometheusLabels(conn.Index, conn.Module, conn.UpstreamAddr), conn.ConnectedAt.Unix())
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_duration_seconds Current duration of active connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_duration_seconds gauge")
for _, conn := range connections {
duration := now.Sub(conn.ConnectedAt).Seconds()
if duration < 0 {
duration = 0
}
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_duration_seconds{%s} %.0f\n", prometheusLabels(conn.Index, conn.Module, conn.UpstreamAddr), duration)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e617a49. Connection duration metrics now use millisecond precision (%.3f) instead of whole seconds.

}
}

func (s *Server) runHTTPServer() error {
hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -804,6 +886,16 @@ func (s *Server) runHTTPServer() error {
_, _ = fmt.Fprintf(w, "rsync-proxy,host=%s count=%d %d\n", hostname, count, timestamp)
})

mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
s.writePrometheusMetrics(w, time.Now())
})

return http.Serve(s.HTTPListener, &mux)
}

Expand Down
95 changes: 95 additions & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -109,7 +110,7 @@
defer fakeRsync.Close()

srv.modules = map[string][]Target{
"fake": {{Upstream: "u1", Addr: fakeRsync.Listener.Addr().String()}},

Check failure on line 113 in pkg/server/server_test.go

View workflow job for this annotation

GitHub Actions / Build

string `fake` has 6 occurrences, make it a constant (goconst)
}
srv.upstreamQueues = map[string]*queue.Queue{"u1": queue.New(0, 0)}

Expand Down Expand Up @@ -220,7 +221,7 @@

rawConn, err := tls.Dial("tcp", srv.TLSListenAddr, &tls.Config{
RootCAs: pool,
ServerName: "localhost",

Check failure on line 224 in pkg/server/server_test.go

View workflow job for this annotation

GitHub Actions / Build

string `localhost` has 3 occurrences, make it a constant (goconst)
})
r.NoError(err)
conn := rsync.NewConn(rawConn)
Expand Down Expand Up @@ -340,6 +341,100 @@
wg.Done()
}

func TestMetricsEndpointNoConnections(t *testing.T) {
srv := startServer(t)
defer srv.Close()

resp, err := http.Get("http://" + srv.HTTPListener.Addr().String() + "/metrics")
require.NoError(t, err)
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
text := string(body)

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "text/plain; version=0.0.4; charset=utf-8", resp.Header.Get("Content-Type"))
assert.Contains(t, text, "# HELP rsync_proxy_active_connections Current active rsync proxy connections.")
assert.Contains(t, text, "# TYPE rsync_proxy_active_connections gauge")
assert.Contains(t, text, "rsync_proxy_active_connections 0\n")
}

func TestMetricsEndpointRejectsNonGET(t *testing.T) {
srv := startServer(t)
defer srv.Close()

resp, err := http.Post("http://"+srv.HTTPListener.Addr().String()+"/metrics", "text/plain", nil)
require.NoError(t, err)
defer resp.Body.Close()

Comment on lines +352 to +378
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e617a49. The HTTP tests now use a test client with a timeout instead of package-level http.Get/http.Post.

assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
}

func TestMetricsIncludesActiveConnections(t *testing.T) {
srv := startServer(t)
defer srv.Close()

var wg sync.WaitGroup
wg.Add(1)
fakeRsync := rsync.NewServer(func(conn *rsync.Conn) {
defer conn.Close()
_, _, err := doServerHandshake(conn, RsyncdServerVersion)
require.NoError(t, err)
wg.Wait()
})
fakeRsync.Start()
defer fakeRsync.Close()

upstreamAddr := fakeRsync.Listener.Addr().String()
srv.modules = map[string][]Target{
"fake": {{Upstream: "u1", Addr: upstreamAddr}},
}
srv.upstreamQueues = map[string]*queue.Queue{"u1": queue.New(0, 0)}

rawConn, err := net.Dial("tcp", srv.TCPListener.Addr().String())
require.NoError(t, err)
conn := rsync.NewConn(rawConn)
defer conn.Close()

_, err = doClientHandshake(conn, RsyncdServerVersion, "fake")
require.NoError(t, err)

require.Eventually(t, func() bool {
infos := srv.ListConnectionInfo()
return len(infos) == 1 && infos[0].UpstreamAddr == upstreamAddr
}, time.Second, 10*time.Millisecond)

resp, err := http.Get("http://" + srv.HTTPListener.Addr().String() + "/metrics")
require.NoError(t, err)
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
text := string(body)

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Contains(t, text, "rsync_proxy_active_connections 1\n")
assert.Contains(t, text, fmt.Sprintf("rsync_proxy_active_connections_by_module{module=\"fake\",upstream=%q} 1\n", upstreamAddr))
assert.Contains(t, text, "rsync_proxy_connection_sent_bytes{index=\"")
assert.Contains(t, text, "module=\"fake\"")
assert.Contains(t, text, fmt.Sprintf("upstream=%q", upstreamAddr))
assert.Contains(t, text, "rsync_proxy_connection_received_bytes{index=\"")
assert.Contains(t, text, "rsync_proxy_connection_connected_timestamp_seconds{index=\"")
assert.Contains(t, text, "rsync_proxy_connection_duration_seconds{index=\"")
assert.NotContains(t, text, rawConn.LocalAddr().String())

wg.Done()
}

func TestPrometheusLabelValueEscaping(t *testing.T) {
assert.Equal(t, `plain`, prometheusEscapeLabelValue("plain"))
assert.Equal(t, `quote\"value`, prometheusEscapeLabelValue(`quote"value`))
assert.Equal(t, `slash\\value`, prometheusEscapeLabelValue(`slash\value`))
assert.Equal(t, `line\nbreak`, prometheusEscapeLabelValue("line\nbreak"))
assert.Equal(t, `unknown`, prometheusLabelValueOrUnknown(""))
}

func TestPerUpstreamQueueIsolation(t *testing.T) {
srv := startServer(t)
defer srv.Close()
Expand Down Expand Up @@ -517,7 +612,7 @@
dir := t.TempDir()
configPath := filepath.Join(dir, "config.toml")

firstUpstream := rsync.NewModuleListServer([]string{"foo"})

Check failure on line 615 in pkg/server/server_test.go

View workflow job for this annotation

GitHub Actions / Build

string `foo` has 17 occurrences, make it a constant (goconst)
firstUpstream.Start()
defer firstUpstream.Close()

Expand Down Expand Up @@ -564,7 +659,7 @@
srv := New()
srv.reloadLock.Lock()
srv.upstreams = []upstreamConfig{
{Name: "u1", Modules: []string{"foo", "bar"}},

Check failure on line 662 in pkg/server/server_test.go

View workflow job for this annotation

GitHub Actions / Build

string `bar` has 16 occurrences, make it a constant (goconst)
{Name: "u2", Modules: []string{"baz"}},
}
srv.reloadLock.Unlock()
Expand Down
Loading