From cb4853f9d644c7db67539a36c05e47c1db58da82 Mon Sep 17 00:00:00 2001 From: i338709 Date: Mon, 4 May 2026 10:08:22 +0300 Subject: [PATCH 1/2] Fix missing Gorouter access logs when a client disconnects mid-stream Co-authored-by: Tamara Boehm --- .../gorouter/handlers/access_log.go | 15 +++++- .../gorouter/handlers/access_log_test.go | 35 +++++++++++++- .../gorouter/handlers/proxywriter.go | 3 +- .../gorouter/handlers/proxywriter_test.go | 2 +- .../gorouter/proxy/proxy.go | 2 +- .../gorouter/proxy/proxy_unit_test.go | 2 +- .../proxy/round_tripper/error_handler_test.go | 5 +- .../round_tripper/proxy_round_tripper_test.go | 2 +- .../gorouter/proxy/utils/responsewriter.go | 22 ++++++++- .../proxy/utils/responsewriter_test.go | 47 +++++++++++++++++-- 10 files changed, 121 insertions(+), 14 deletions(-) diff --git a/src/code.cloudfoundry.org/gorouter/handlers/access_log.go b/src/code.cloudfoundry.org/gorouter/handlers/access_log.go index 6d21296a4..d88b06f47 100644 --- a/src/code.cloudfoundry.org/gorouter/handlers/access_log.go +++ b/src/code.cloudfoundry.org/gorouter/handlers/access_log.go @@ -51,7 +51,11 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http requestBodyCounter := &countingReadCloser{delegate: r.Body} r.Body = requestBodyCounter - next(rw, r) + var panicVal any + func() { + defer func() { panicVal = recover() }() + next(rw, r) + }() reqInfo, err := ContextRequestInfo(r) if err != nil { @@ -65,6 +69,11 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http alr.BodyBytesSent = proxyWriter.Size() alr.StatusCode = proxyWriter.Status() alr.RouterError = proxyWriter.Header().Get(router_http.CfRouterError) + + if alr.RouterError == "" && proxyWriter.WriteError() != nil { + alr.RouterError = utils.ConnectionCloseDuringStreamingErrMsg + } + alr.FailedAttempts = reqInfo.FailedAttempts alr.RoundTripSuccessful = reqInfo.RoundTripSuccessful @@ -84,6 +93,10 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http alr.LocalAddress = reqInfo.LocalAddress a.accessLogger.Log(*alr) + + if panicVal != nil { + panic(panicVal) + } } type countingReadCloser struct { diff --git a/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go b/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go index 5b07fdc7f..7b2fdff01 100644 --- a/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go +++ b/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go @@ -2,6 +2,7 @@ package handlers_test import ( "bytes" + "errors" "io" "net/http" "net/http/httptest" @@ -70,7 +71,7 @@ var _ = Describe("AccessLog", func() { }) testProxyWriterHandler := func(rw http.ResponseWriter, req *http.Request, next http.HandlerFunc) { - proxyWriter := utils.NewProxyResponseWriter(rw) + proxyWriter := utils.NewProxyResponseWriter(rw, logger.Logger) next(proxyWriter, req) } @@ -196,4 +197,36 @@ var _ = Describe("AccessLog", func() { }) }) + Context("when the client disconnects during response streaming", func() { + BeforeEach(func() { + resp = &failingResponseWriter{ResponseRecorder: httptest.NewRecorder()} + handler = negroni.New() + handler.Use(handlers.NewRequestInfo()) + handler.Use(handlers.NewProxyWriter(logger.Logger)) + handler.Use(handlers.NewAccessLog(accessLogger, extraHeadersToLog, nil, logger.Logger)) + handler.UseFunc(func(rw http.ResponseWriter, req *http.Request, next http.HandlerFunc) { + rw.WriteHeader(http.StatusOK) + rw.Write([]byte("partial streaming data")) + nextCalled = true + reqChan <- req + panic(http.ErrAbortHandler) + }) + }) + + It("writes the access log and sets RouterError to ConnectionCloseDuringStreamingErrMsg", func() { + Expect(func() { handler.ServeHTTP(resp, req) }).To(Panic()) + Expect(accessLogger.LogCallCount()).To(Equal(1)) + alr := accessLogger.LogArgsForCall(0) + Expect(alr.RouterError).To(Equal(utils.ConnectionCloseDuringStreamingErrMsg)) + }) + }) + }) + +type failingResponseWriter struct { + *httptest.ResponseRecorder +} + +func (f *failingResponseWriter) Write(_ []byte) (int, error) { + return 0, errors.New("connection reset by peer") +} diff --git a/src/code.cloudfoundry.org/gorouter/handlers/proxywriter.go b/src/code.cloudfoundry.org/gorouter/handlers/proxywriter.go index 0baf2dea1..94504d23f 100644 --- a/src/code.cloudfoundry.org/gorouter/handlers/proxywriter.go +++ b/src/code.cloudfoundry.org/gorouter/handlers/proxywriter.go @@ -29,7 +29,8 @@ func (p *proxyWriterHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request, log.Panic(p.logger, "request-info-err", log.ErrAttr(err)) return } - proxyWriter := utils.NewProxyResponseWriter(rw) + logger := p.logger.With("vcap_request_id", r.Header.Get(VcapRequestIdHeader)) + proxyWriter := utils.NewProxyResponseWriter(rw, logger) reqInfo.ProxyResponseWriter = proxyWriter next(proxyWriter, r) } diff --git a/src/code.cloudfoundry.org/gorouter/handlers/proxywriter_test.go b/src/code.cloudfoundry.org/gorouter/handlers/proxywriter_test.go index 2e320c1f4..bcda0fce6 100644 --- a/src/code.cloudfoundry.org/gorouter/handlers/proxywriter_test.go +++ b/src/code.cloudfoundry.org/gorouter/handlers/proxywriter_test.go @@ -78,7 +78,7 @@ var _ = Describe("ProxyWriter", func() { var rw http.ResponseWriter Eventually(respChan).Should(Receive(&rw)) Expect(rw).ToNot(BeNil()) - Expect(rw).To(BeAssignableToTypeOf(utils.NewProxyResponseWriter(resp))) + Expect(rw).To(BeAssignableToTypeOf(utils.NewProxyResponseWriter(resp, logger.Logger))) Expect(nextCalled).To(BeTrue(), "Expected the next handler to be called.") }) diff --git a/src/code.cloudfoundry.org/gorouter/proxy/proxy.go b/src/code.cloudfoundry.org/gorouter/proxy/proxy.go index 790eca7b7..47072f90a 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/proxy.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/proxy.go @@ -150,10 +150,10 @@ func NewProxy( n := negroni.New() n.Use(handlers.NewPanicCheck(p.health, logger)) n.Use(handlers.NewRequestInfo()) - n.Use(handlers.NewProxyWriter(logger)) n.Use(zipkinHandler) n.Use(w3cHandler) n.Use(handlers.NewVcapRequestIdHeader(logger)) + n.Use(handlers.NewProxyWriter(logger)) if cfg.SendHttpStartStopServerEvent { n.Use(handlers.NewHTTPStartStop(dropsonde.DefaultEmitter, logger)) } diff --git a/src/code.cloudfoundry.org/gorouter/proxy/proxy_unit_test.go b/src/code.cloudfoundry.org/gorouter/proxy/proxy_unit_test.go index 23f3a9343..1fe169b37 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/proxy_unit_test.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/proxy_unit_test.go @@ -84,7 +84,7 @@ var _ = Describe("Proxy Unit tests", func() { r.Register(route.Uri("some-app"), &route.Endpoint{Stats: route.NewStats()}) responseRecorder = &ResponseRecorderWithFullDuplex{httptest.NewRecorder(), nil, 0} - resp = utils.NewProxyResponseWriter(responseRecorder) + resp = utils.NewProxyResponseWriter(responseRecorder, logger.Logger) }) Context("when backend fails to respond", func() { diff --git a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/error_handler_test.go b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/error_handler_test.go index 017689f7a..bd2896d62 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/error_handler_test.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/error_handler_test.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "errors" "fmt" + "log/slog" "net" "net/http/httptest" @@ -55,7 +56,7 @@ var _ = Describe("HandleError", func() { }, } responseRecorder = httptest.NewRecorder() - responseWriter = utils.NewProxyResponseWriter(responseRecorder) + responseWriter = utils.NewProxyResponseWriter(responseRecorder, slog.Default()) }) It("sets a header to describe the endpoint_failure", func() { @@ -88,7 +89,7 @@ var _ = Describe("HandleError", func() { }) It("calls the handleError callback if it exists", func() { - firstResponseWriter := utils.NewProxyResponseWriter(httptest.NewRecorder()) + firstResponseWriter := utils.NewProxyResponseWriter(httptest.NewRecorder(), slog.Default()) errorHandler.HandleError(firstResponseWriter, errors.New("i'm a teapot")) Expect(errorHandled).To(BeFalse()) diff --git a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go index fa59b63b3..6d135e1f3 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/round_tripper/proxy_round_tripper_test.go @@ -117,7 +117,7 @@ var _ = Describe("ProxyRoundTripper", func() { }) numEndpoints = 1 resp = httptest.NewRecorder() - proxyWriter := utils.NewProxyResponseWriter(resp) + proxyWriter := utils.NewProxyResponseWriter(resp, logger.Logger) reqBody = new(testBody) req = test_util.NewRequest("GET", "myapp.com", "/", reqBody) req.URL.Scheme = "http" diff --git a/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter.go b/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter.go index 616fcaec8..5880119bd 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter.go @@ -3,6 +3,7 @@ package utils import ( "bufio" "errors" + "log/slog" "net" "net/http" ) @@ -18,23 +19,30 @@ type ProxyResponseWriter interface { SetStatus(status int) Size() int AddHeaderRewriter(HeaderRewriter) + WriteError() error } +const ConnectionCloseDuringStreamingErrMsg = "client-conn-closed-during-response-streaming" + type proxyResponseWriter struct { w http.ResponseWriter status int size int + logger *slog.Logger flusher http.Flusher done bool + writeErr error + headerRewriters []HeaderRewriter } -func NewProxyResponseWriter(w http.ResponseWriter) *proxyResponseWriter { +func NewProxyResponseWriter(w http.ResponseWriter, logger *slog.Logger) *proxyResponseWriter { proxyWriter := &proxyResponseWriter{ w: w, flusher: w.(http.Flusher), + logger: logger, } return proxyWriter @@ -61,6 +69,14 @@ func (p *proxyResponseWriter) Write(b []byte) (int, error) { p.WriteHeader(http.StatusOK) } size, err := p.w.Write(b) + if err != nil && p.writeErr == nil { + p.writeErr = err + p.logger.Error(ConnectionCloseDuringStreamingErrMsg, + slog.String("error", err.Error()), + slog.Int("bytes_written", size), + slog.Int("total_size", p.size), + slog.Int("status", p.status)) + } p.size += size return size, err } @@ -118,3 +134,7 @@ func (p *proxyResponseWriter) Unwrap() http.ResponseWriter { func (p *proxyResponseWriter) AddHeaderRewriter(r HeaderRewriter) { p.headerRewriters = append(p.headerRewriters, r) } + +func (p *proxyResponseWriter) WriteError() error { + return p.writeErr +} diff --git a/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter_test.go b/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter_test.go index 03a3fb721..fa9e9cf0f 100644 --- a/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter_test.go +++ b/src/code.cloudfoundry.org/gorouter/proxy/utils/responsewriter_test.go @@ -6,6 +6,7 @@ import ( "net" "net/http" + "code.cloudfoundry.org/gorouter/test_util" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -66,13 +67,15 @@ func (f *fakeHeaderRewriter) RewriteHeader(h http.Header) { var _ = Describe("ProxyWriter", func() { var ( - fake *fakeResponseWriter - proxy *proxyResponseWriter + fake *fakeResponseWriter + proxy *proxyResponseWriter + logger *test_util.TestLogger ) BeforeEach(func() { + logger = test_util.NewTestLogger("test") fake = newFakeResponseWriter() - proxy = NewProxyResponseWriter(fake) + proxy = NewProxyResponseWriter(fake, logger.Logger) }) It("delegates the call to Header", func() { @@ -89,7 +92,7 @@ var _ = Describe("ProxyWriter", func() { fake := &fakeHijackerResponseWriter{ fakeResponseWriter: *newFakeResponseWriter(), } - proxy = NewProxyResponseWriter(fake) + proxy = NewProxyResponseWriter(fake, logger.Logger) proxy.Hijack() Expect(fake.hijackCalled).To(BeTrue()) }) @@ -199,4 +202,40 @@ var _ = Describe("ProxyWriter", func() { Expect(responseWriter).To(Equal(fake)) }) }) + + Describe("WriteError", func() { + It("returns nil when no write error has occurred", func() { + proxy.Write([]byte("foo")) + Expect(proxy.WriteError()).To(BeNil()) + }) + + It("returns the first write error that occurred", func() { + fakeWithError := &fakeResponseWriterWithError{ + fakeResponseWriter: *newFakeResponseWriter(), + writeError: errors.New("connection reset by peer"), + } + proxyWithError := NewProxyResponseWriter(fakeWithError, logger.Logger) + + proxyWithError.Write([]byte("data1")) + Expect(proxyWithError.WriteError()).To(MatchError("connection reset by peer")) + + // Subsequent writes should preserve the first error + fakeWithError.writeError = errors.New("second error") + proxyWithError.Write([]byte("data2")) + Expect(proxyWithError.WriteError()).To(MatchError("connection reset by peer")) + }) + }) }) + +type fakeResponseWriterWithError struct { + fakeResponseWriter + writeError error +} + +func (f *fakeResponseWriterWithError) Write(b []byte) (int, error) { + f.writeCalled = true + if f.writeError != nil { + return len(b) / 2, f.writeError + } + return len(b), nil +} From 7df0b32200921a0a23a44a2b30d4eea5bb99e4ad Mon Sep 17 00:00:00 2001 From: Tamara Boehm Date: Wed, 27 May 2026 10:11:57 +0200 Subject: [PATCH 2/2] Implement additional tests --- .../gorouter/handlers/access_log_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go b/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go index 7b2fdff01..57fe62332 100644 --- a/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go +++ b/src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go @@ -219,6 +219,20 @@ var _ = Describe("AccessLog", func() { alr := accessLogger.LogArgsForCall(0) Expect(alr.RouterError).To(Equal(utils.ConnectionCloseDuringStreamingErrMsg)) }) + + Context("when RouterError is already set on the response header", func() { + BeforeEach(func() { + resp.Header().Add("X-Cf-RouterError", "endpoint-failed") + }) + + It("writes the access log and keeps RouterError from response header", func() { + Expect(func() { handler.ServeHTTP(resp, req) }).To(Panic()) + Expect(accessLogger.LogCallCount()).To(Equal(1)) + alr := accessLogger.LogArgsForCall(0) + Expect(alr.RouterError).NotTo(Equal(utils.ConnectionCloseDuringStreamingErrMsg)) + Expect(alr.RouterError).To(Equal("endpoint-failed")) + }) + }) }) })