Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions pkg/server/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package server

import (
"fmt"
"io"
"sort"
"strings"
"time"
)

func prometheusEscapeLabelValue(s string) string {
s = strings.ReplaceAll(s, `\`, `\\`)
s = strings.ReplaceAll(s, "\n", `\n`)
s = strings.ReplaceAll(s, `"`, `\"`)
return s
}

func prometheusLabelValueOrUnknown(s string) string {
if s == "" {
return "unknown"
}
return s
}

func prometheusLabels(index uint32, module, upstream string) string {
return fmt.Sprintf(
`index="%d",module="%s",upstream="%s"`,
index,
prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(module)),
prometheusEscapeLabelValue(prometheusLabelValueOrUnknown(upstream)),
)
}

type prometheusConnectionGroup struct {
module string
upstream string
}

func (s *Server) writePrometheusMetrics(w io.Writer, now time.Time) {
connections := s.ListConnectionInfo()
acceptedConnections := s.acceptedConnTotal.Load()

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_active_connections Current active rsync proxy connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_active_connections gauge")
_, _ = fmt.Fprintf(w, "rsync_proxy_active_connections %d\n", s.GetActiveConnectionCount())

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_accepted_connections_total Total accepted rsync proxy connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_accepted_connections_total counter")
_, _ = fmt.Fprintf(w, "rsync_proxy_accepted_connections_total %d\n", acceptedConnections)

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_completed_connections_total Total completed rsync proxy connections that reached upstream relay.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_completed_connections_total counter")
_, _ = fmt.Fprintf(w, "rsync_proxy_completed_connections_total %d\n", s.completedConns.Load())

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_sent_bytes_total Total bytes sent to clients for completed connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_sent_bytes_total counter")
_, _ = fmt.Fprintf(w, "rsync_proxy_sent_bytes_total %d\n", s.sentBytesTotal.Load())

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_received_bytes_total Total bytes received from clients for completed connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_received_bytes_total counter")
_, _ = fmt.Fprintf(w, "rsync_proxy_received_bytes_total %d\n", s.recvBytesTotal.Load())

connectionCounts := make(map[prometheusConnectionGroup]int)
for _, conn := range connections {
_, module, upstream, _, _, _ := conn.snapshot()
key := prometheusConnectionGroup{
module: prometheusLabelValueOrUnknown(module),
upstream: prometheusLabelValueOrUnknown(upstream),
}
connectionCounts[key]++
}

keys := make([]prometheusConnectionGroup, 0, len(connectionCounts))
for key := range connectionCounts {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool {
if keys[i].module != keys[j].module {
return keys[i].module < keys[j].module
}
return keys[i].upstream < keys[j].upstream
})

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_active_connections_by_module Current active rsync proxy connections by module and upstream.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_active_connections_by_module gauge")
for _, key := range keys {
module := prometheusEscapeLabelValue(key.module)
upstream := prometheusEscapeLabelValue(key.upstream)
_, _ = fmt.Fprintf(w, "rsync_proxy_active_connections_by_module{module=\"%s\",upstream=\"%s\"} %d\n", module, upstream, connectionCounts[key])
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_sent_bytes Bytes sent to clients for active connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_sent_bytes gauge")
for _, conn := range connections {
index, module, upstream, _, sentBytes, _ := conn.snapshot()
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_sent_bytes{%s} %d\n", prometheusLabels(index, module, upstream), sentBytes)
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_received_bytes Bytes received from clients for active connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_received_bytes gauge")
for _, conn := range connections {
index, module, upstream, _, _, receivedBytes := conn.snapshot()
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_received_bytes{%s} %d\n", prometheusLabels(index, module, upstream), receivedBytes)
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_connected_timestamp_seconds Unix timestamp when active connections were established.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_connected_timestamp_seconds gauge")
for _, conn := range connections {
index, module, upstream, connectedAt, _, _ := conn.snapshot()
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_connected_timestamp_seconds{%s} %d\n", prometheusLabels(index, module, upstream), connectedAt.Unix())
}

_, _ = fmt.Fprintln(w, "# HELP rsync_proxy_connection_duration_seconds Current duration of active connections.")
_, _ = fmt.Fprintln(w, "# TYPE rsync_proxy_connection_duration_seconds gauge")
for _, conn := range connections {
index, module, upstream, connectedAt, _, _ := conn.snapshot()
duration := now.Sub(connectedAt).Seconds()
if duration < 0 {
duration = 0
}
_, _ = fmt.Fprintf(w, "rsync_proxy_connection_duration_seconds{%s} %.3f\n", prometheusLabels(index, module, upstream), duration)
}
}
66 changes: 53 additions & 13 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
const lineFeed = '\n'

type ConnInfo struct {
mu sync.RWMutex
Index uint32
LocalAddr string
RemoteAddr string
Expand All @@ -60,7 +61,26 @@ type ConnInfo struct {
ReceivedBytes atomic.Int64
}

func (c *ConnInfo) SetModule(module string) {
c.mu.Lock()
defer c.mu.Unlock()
c.Module = module
}

func (c *ConnInfo) SetUpstreamAddr(upstreamAddr string) {
c.mu.Lock()
defer c.mu.Unlock()
c.UpstreamAddr = upstreamAddr
}

func (c *ConnInfo) snapshot() (index uint32, module, upstreamAddr string, connectedAt time.Time, sentBytes, receivedBytes int64) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.Index, c.Module, c.UpstreamAddr, c.ConnectedAt, c.SentBytes.Load(), c.ReceivedBytes.Load()
}

func (c *ConnInfo) MarshalJSON() ([]byte, error) {
index, module, upstreamAddr, connectedAt, sentBytes, receivedBytes := c.snapshot()
// Handle atomic value (cannot marshal directly)
return json.Marshal(struct {
Index uint32 `json:"index"`
Expand All @@ -72,14 +92,14 @@ func (c *ConnInfo) MarshalJSON() ([]byte, error) {
SentBytes int64 `json:"sentBytes"`
ReceivedBytes int64 `json:"receivedBytes"`
}{
Index: c.Index,
Index: index,
LocalAddr: c.LocalAddr,
RemoteAddr: c.RemoteAddr,
ConnectedAt: c.ConnectedAt,
Module: c.Module,
UpstreamAddr: c.UpstreamAddr,
SentBytes: c.SentBytes.Load(),
ReceivedBytes: c.ReceivedBytes.Load(),
ConnectedAt: connectedAt,
Module: module,
UpstreamAddr: upstreamAddr,
SentBytes: sentBytes,
ReceivedBytes: receivedBytes,
})
}

Expand Down Expand Up @@ -123,9 +143,13 @@ type Server struct {

upstreamQueues map[string]*queue.Queue

activeConnCount atomic.Int64
connIndex atomic.Uint32
connInfo sync.Map
activeConnCount atomic.Int64
connIndex atomic.Uint32
acceptedConnTotal atomic.Uint64
connInfo sync.Map
completedConns atomic.Int64
sentBytesTotal atomic.Int64
recvBytesTotal atomic.Int64

TCPListener net.Listener
TLSListener net.Listener
Expand Down Expand Up @@ -537,8 +561,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err
}

moduleName := string(buf[:n-1]) // trim trailing \n
info.Module = moduleName
s.connInfo.Store(index, &info)
info.SetModule(moduleName)

targets, ok := s.getTargetsForModule(moduleName)
if !ok {
Expand All @@ -551,8 +574,7 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err
target := targets[chooseTargetByClientIP(net.ParseIP(ip), len(targets))]
upstreamAddr := target.Addr
useProxyProtocol := target.UseProxyProtocol
info.UpstreamAddr = upstreamAddr
s.connInfo.Store(index, &info)
info.SetUpstreamAddr(upstreamAddr)

upstreamQueue, ok := s.getQueueForUpstream(target.Upstream)
if !ok {
Expand Down Expand Up @@ -676,9 +698,16 @@ func (s *Server) relay(ctx context.Context, index uint32, downConn net.Conn) err
s.errorLog.F("close downstream read: %v", err)
}
}
_ = upConn.Close()
_ = downConn.Close()
<-sentClosed
<-receivedClosed

sentBytes := info.SentBytes.Load()
receivedBytes := info.ReceivedBytes.Load()
s.completedConns.Add(1)
s.sentBytesTotal.Add(sentBytes)
s.recvBytesTotal.Add(receivedBytes)
Comment on lines 706 to +710
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 82cb742. The lifetime counters now wait for both copy goroutines to finish before sampling byte totals, so the counters and access log use the completed transfer state instead of one half of the relay.


duration := time.Since(info.ConnectedAt)
s.accessLog.F("client %s finishes module %s (sent: %d, received: %d, duration: %s)", ip, moduleName, sentBytes, receivedBytes, duration)
Expand Down Expand Up @@ -804,6 +833,16 @@ func (s *Server) runHTTPServer() error {
_, _ = fmt.Fprintf(w, "rsync-proxy,host=%s count=%d %d\n", hostname, count, timestamp)
})

mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
s.writePrometheusMetrics(w, time.Now())
})

return http.Serve(s.HTTPListener, &mux)
}

Expand Down Expand Up @@ -858,6 +897,7 @@ func (s *Server) Close() {

func (s *Server) handleConn(ctx context.Context, conn net.Conn) {
s.activeConnCount.Add(1)
s.acceptedConnTotal.Add(1)
defer s.activeConnCount.Add(-1)
connIndex := s.connIndex.Add(1)

Expand Down
Loading