Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/code.cloudfoundry.org/gorouter/handlers/access_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http
requestBodyCounter := &countingReadCloser{delegate: r.Body}
r.Body = requestBodyCounter

next(rw, r)
var panicVal any
func() {
defer func() { panicVal = recover() }()
next(rw, r)
}()

reqInfo, err := ContextRequestInfo(r)
if err != nil {
Expand All @@ -65,6 +69,11 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http
alr.BodyBytesSent = proxyWriter.Size()
alr.StatusCode = proxyWriter.Status()
alr.RouterError = proxyWriter.Header().Get(router_http.CfRouterError)

if alr.RouterError == "" && proxyWriter.WriteError() != nil {
alr.RouterError = utils.ConnectionCloseDuringStreamingErrMsg
}

alr.FailedAttempts = reqInfo.FailedAttempts
alr.RoundTripSuccessful = reqInfo.RoundTripSuccessful

Expand All @@ -84,6 +93,10 @@ func (a *accessLog) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http
alr.LocalAddress = reqInfo.LocalAddress

a.accessLogger.Log(*alr)

if panicVal != nil {
panic(panicVal)
}
}

type countingReadCloser struct {
Expand Down
35 changes: 34 additions & 1 deletion src/code.cloudfoundry.org/gorouter/handlers/access_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers_test

import (
"bytes"
"errors"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -70,7 +71,7 @@ var _ = Describe("AccessLog", func() {
})

testProxyWriterHandler := func(rw http.ResponseWriter, req *http.Request, next http.HandlerFunc) {
proxyWriter := utils.NewProxyResponseWriter(rw)
proxyWriter := utils.NewProxyResponseWriter(rw, logger.Logger)
next(proxyWriter, req)
}

Expand Down Expand Up @@ -196,4 +197,36 @@ var _ = Describe("AccessLog", func() {
})
})

Context("when the client disconnects during response streaming", func() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test only covers one branch of this condition:

if alr.RouterError == "" && proxyWriter.WriteError() != nil {
    alr.RouterError = utils.ConnectionCloseDuringStreamingErrMsg
}

Could you also add a test for the case where both an existing RouterError is set on the response and a write error occurred?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented a new test when RouterError exists on the response header, and we do not want to overwrite it.

BeforeEach(func() {
resp = &failingResponseWriter{ResponseRecorder: httptest.NewRecorder()}
handler = negroni.New()
handler.Use(handlers.NewRequestInfo())
handler.Use(handlers.NewProxyWriter(logger.Logger))
handler.Use(handlers.NewAccessLog(accessLogger, extraHeadersToLog, nil, logger.Logger))
handler.UseFunc(func(rw http.ResponseWriter, req *http.Request, next http.HandlerFunc) {
rw.WriteHeader(http.StatusOK)
rw.Write([]byte("partial streaming data"))
nextCalled = true
reqChan <- req
panic(http.ErrAbortHandler)
})
})

It("writes the access log and sets RouterError to ConnectionCloseDuringStreamingErrMsg", func() {
Expect(func() { handler.ServeHTTP(resp, req) }).To(Panic())
Expect(accessLogger.LogCallCount()).To(Equal(1))
alr := accessLogger.LogArgsForCall(0)
Expect(alr.RouterError).To(Equal(utils.ConnectionCloseDuringStreamingErrMsg))
})
})

})

type failingResponseWriter struct {
*httptest.ResponseRecorder
}

func (f *failingResponseWriter) Write(_ []byte) (int, error) {
return 0, errors.New("connection reset by peer")
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func (p *proxyWriterHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request,
log.Panic(p.logger, "request-info-err", log.ErrAttr(err))
return
}
proxyWriter := utils.NewProxyResponseWriter(rw)
logger := p.logger.With("vcap_request_id", r.Header.Get(VcapRequestIdHeader))
proxyWriter := utils.NewProxyResponseWriter(rw, logger)
reqInfo.ProxyResponseWriter = proxyWriter
next(proxyWriter, r)
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var _ = Describe("ProxyWriter", func() {
var rw http.ResponseWriter
Eventually(respChan).Should(Receive(&rw))
Expect(rw).ToNot(BeNil())
Expect(rw).To(BeAssignableToTypeOf(utils.NewProxyResponseWriter(resp)))
Expect(rw).To(BeAssignableToTypeOf(utils.NewProxyResponseWriter(resp, logger.Logger)))
Expect(nextCalled).To(BeTrue(), "Expected the next handler to be called.")
})

Expand Down
2 changes: 1 addition & 1 deletion src/code.cloudfoundry.org/gorouter/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ func NewProxy(
n := negroni.New()
n.Use(handlers.NewPanicCheck(p.health, logger))
n.Use(handlers.NewRequestInfo())
n.Use(handlers.NewProxyWriter(logger))
n.Use(zipkinHandler)
n.Use(w3cHandler)
n.Use(handlers.NewVcapRequestIdHeader(logger))
n.Use(handlers.NewProxyWriter(logger))
if cfg.SendHttpStartStopServerEvent {
n.Use(handlers.NewHTTPStartStop(dropsonde.DefaultEmitter, logger))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var _ = Describe("Proxy Unit tests", func() {
r.Register(route.Uri("some-app"), &route.Endpoint{Stats: route.NewStats()})

responseRecorder = &ResponseRecorderWithFullDuplex{httptest.NewRecorder(), nil, 0}
resp = utils.NewProxyResponseWriter(responseRecorder)
resp = utils.NewProxyResponseWriter(responseRecorder, logger.Logger)
})

Context("when backend fails to respond", func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/x509"
"errors"
"fmt"
"log/slog"
"net"
"net/http/httptest"

Expand Down Expand Up @@ -55,7 +56,7 @@ var _ = Describe("HandleError", func() {
},
}
responseRecorder = httptest.NewRecorder()
responseWriter = utils.NewProxyResponseWriter(responseRecorder)
responseWriter = utils.NewProxyResponseWriter(responseRecorder, slog.Default())
})

It("sets a header to describe the endpoint_failure", func() {
Expand Down Expand Up @@ -88,7 +89,7 @@ var _ = Describe("HandleError", func() {
})

It("calls the handleError callback if it exists", func() {
firstResponseWriter := utils.NewProxyResponseWriter(httptest.NewRecorder())
firstResponseWriter := utils.NewProxyResponseWriter(httptest.NewRecorder(), slog.Default())
errorHandler.HandleError(firstResponseWriter, errors.New("i'm a teapot"))
Expect(errorHandled).To(BeFalse())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var _ = Describe("ProxyRoundTripper", func() {
})
numEndpoints = 1
resp = httptest.NewRecorder()
proxyWriter := utils.NewProxyResponseWriter(resp)
proxyWriter := utils.NewProxyResponseWriter(resp, logger.Logger)
reqBody = new(testBody)
req = test_util.NewRequest("GET", "myapp.com", "/", reqBody)
req.URL.Scheme = "http"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"bufio"
"errors"
"log/slog"
"net"
"net/http"
)
Expand All @@ -18,23 +19,30 @@ type ProxyResponseWriter interface {
SetStatus(status int)
Size() int
AddHeaderRewriter(HeaderRewriter)
WriteError() error
}

const ConnectionCloseDuringStreamingErrMsg = "client-conn-closed-during-response-streaming"

type proxyResponseWriter struct {
w http.ResponseWriter
status int
size int

logger *slog.Logger
flusher http.Flusher
done bool

writeErr error

headerRewriters []HeaderRewriter
}

func NewProxyResponseWriter(w http.ResponseWriter) *proxyResponseWriter {
func NewProxyResponseWriter(w http.ResponseWriter, logger *slog.Logger) *proxyResponseWriter {
proxyWriter := &proxyResponseWriter{
w: w,
flusher: w.(http.Flusher),
logger: logger,
}

return proxyWriter
Expand All @@ -61,6 +69,14 @@ func (p *proxyResponseWriter) Write(b []byte) (int, error) {
p.WriteHeader(http.StatusOK)
}
size, err := p.w.Write(b)
if err != nil && p.writeErr == nil {
p.writeErr = err
p.logger.Error(ConnectionCloseDuringStreamingErrMsg,
slog.String("error", err.Error()),
slog.Int("bytes_written", size),
slog.Int("total_size", p.size),
slog.Int("status", p.status))
}
p.size += size
return size, err
}
Expand Down Expand Up @@ -118,3 +134,7 @@ func (p *proxyResponseWriter) Unwrap() http.ResponseWriter {
func (p *proxyResponseWriter) AddHeaderRewriter(r HeaderRewriter) {
p.headerRewriters = append(p.headerRewriters, r)
}

func (p *proxyResponseWriter) WriteError() error {
return p.writeErr
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"net/http"

"code.cloudfoundry.org/gorouter/test_util"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -66,13 +67,15 @@ func (f *fakeHeaderRewriter) RewriteHeader(h http.Header) {

var _ = Describe("ProxyWriter", func() {
var (
fake *fakeResponseWriter
proxy *proxyResponseWriter
fake *fakeResponseWriter
proxy *proxyResponseWriter
logger *test_util.TestLogger
)

BeforeEach(func() {
logger = test_util.NewTestLogger("test")
fake = newFakeResponseWriter()
proxy = NewProxyResponseWriter(fake)
proxy = NewProxyResponseWriter(fake, logger.Logger)
})

It("delegates the call to Header", func() {
Expand All @@ -89,7 +92,7 @@ var _ = Describe("ProxyWriter", func() {
fake := &fakeHijackerResponseWriter{
fakeResponseWriter: *newFakeResponseWriter(),
}
proxy = NewProxyResponseWriter(fake)
proxy = NewProxyResponseWriter(fake, logger.Logger)
proxy.Hijack()
Expect(fake.hijackCalled).To(BeTrue())
})
Expand Down Expand Up @@ -199,4 +202,40 @@ var _ = Describe("ProxyWriter", func() {
Expect(responseWriter).To(Equal(fake))
})
})

Describe("WriteError", func() {
It("returns nil when no write error has occurred", func() {
proxy.Write([]byte("foo"))
Expect(proxy.WriteError()).To(BeNil())
})

It("returns the first write error that occurred", func() {
fakeWithError := &fakeResponseWriterWithError{
fakeResponseWriter: *newFakeResponseWriter(),
writeError: errors.New("connection reset by peer"),
}
proxyWithError := NewProxyResponseWriter(fakeWithError, logger.Logger)

proxyWithError.Write([]byte("data1"))
Expect(proxyWithError.WriteError()).To(MatchError("connection reset by peer"))

// Subsequent writes should preserve the first error
fakeWithError.writeError = errors.New("second error")
proxyWithError.Write([]byte("data2"))
Expect(proxyWithError.WriteError()).To(MatchError("connection reset by peer"))
})
})
})

type fakeResponseWriterWithError struct {
fakeResponseWriter
writeError error
}

func (f *fakeResponseWriterWithError) Write(b []byte) (int, error) {
f.writeCalled = true
if f.writeError != nil {
return len(b) / 2, f.writeError
}
return len(b), nil
}
Loading