From ce851d019bb34155e0526671f9bb7b80c366a15f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Fri, 17 May 2024 10:27:21 +0200 Subject: [PATCH] Update prometheus/promhttp/http.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel RĂ¼ger --- prometheus/promhttp/http.go | 124 ++++++++++++++++++++----------- prometheus/promhttp/http_test.go | 61 ++++++++++++--- 2 files changed, 132 insertions(+), 53 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 7517caa48..8aa403fb7 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -55,7 +55,25 @@ const ( processStartTimeHeader = "Process-Start-Time-Unix" ) -var defaultEncodingOffers = []string{"gzip", "zstd"} +type Compression int + +const ( + Identity Compression = iota + Gzip + Zstd +) + +var compressions = [...]string{ + "identity", + "gzip", + "zstd", +} + +func (c Compression) String() string { + return compressions[c] +} + +var defaultCompressionFormats = []Compression{Identity, Gzip, Zstd} var gzipPool = sync.Pool{ New: func() interface{} { @@ -168,46 +186,14 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } else { contentType = expfmt.Negotiate(req.Header) } - header := rsp.Header() - header.Set(contentTypeHeader, string(contentType)) - - w := io.Writer(rsp) - if !opts.DisableCompression { - offers := defaultEncodingOffers - if len(opts.EncodingOffers) > 0 { - offers = opts.EncodingOffers - } - // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. - compression := httputil.NegotiateContentEncoding(req, offers) - switch compression { - case "zstd": - header.Set(contentEncodingHeader, "zstd") - // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. - z, err := zstd.NewWriter(rsp, zstd.WithEncoderLevel(zstd.SpeedFastest)) - if err != nil { - return - } - - z.Reset(w) - defer z.Close() + rsp.Header().Set(contentTypeHeader, string(contentType)) - w = z - case "gzip": - header.Set(contentEncodingHeader, "gzip") - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) + w, err := GetWriter(req, rsp, opts.DisableCompression, opts.OfferedCompressions) - gz.Reset(w) - defer gz.Close() - - w = gz - case "identity": - // This means the content is not encoded. - default: - // The content encoding was not implemented yet. - return + if err != nil { + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error setting getting writer", err) } - } enc := expfmt.NewEncoder(w, contentType) @@ -373,12 +359,19 @@ type HandlerOpts struct { // no effect on the HTTP status code because ErrorHandling is set to // ContinueOnError. Registry prometheus.Registerer - // If DisableCompression is true, the handler will never compress the - // response, even if requested by the client. + // DisableCompression disables the response encoding (compression) and + // encoding negotiation. If true, the handler will + // never compress the response, even if requested + // by the client and the OfferedCompressions field is ignored. DisableCompression bool - // If DisableCompression is false, this option will allow to define the - // set of offered encoding algorithms. - EncodingOffers []string + // OfferedCompressions is a set of encodings (compressions) handler will + // try to offer when negotiating with the client. This defaults to zstd, + // gzip and identity. + // NOTE: If handler can't agree on the encodings with the client or + // caller using unsupported or empty encodings in OfferedCompressions, + // handler always fallbacks to no compression (identity), for + // compatibility reasons. In such cases ErrorLog will be used if set. + OfferedCompressions []Compression // The number of concurrent HTTP requests is limited to // MaxRequestsInFlight. Additional requests are responded to with 503 // Service Unavailable and a suitable message in the body. If @@ -426,3 +419,48 @@ func httpError(rsp http.ResponseWriter, err error) { http.StatusInternalServerError, ) } + +func GetWriter(r *http.Request, rsp http.ResponseWriter, disableCompression bool, offeredCompressions []Compression) (io.Writer, error) { + w := io.Writer(rsp) + if !disableCompression { + offers := defaultCompressionFormats + if len(offeredCompressions) > 0 { + offers = offeredCompressions + } + var compressions []string + for _, comp := range offers { + compressions = append(compressions, comp.String()) + } + // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. + compression := httputil.NegotiateContentEncoding(r, compressions) + switch compression { + case "zstd": + rsp.Header().Set(contentEncodingHeader, "zstd") + // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. + z, err := zstd.NewWriter(rsp, zstd.WithEncoderLevel(zstd.SpeedFastest)) + if err != nil { + return nil, err + } + + z.Reset(w) + defer z.Close() + + w = z + case "gzip": + rsp.Header().Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + w = gz + case "identity": + // This means the content is not compressed. + default: + // The content encoding was not implemented yet. + return w, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", compression, defaultCompressionFormats) + } + } + return w, nil +} diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 3a65e61c5..8a180ff0d 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -17,6 +17,7 @@ import ( "bytes" "errors" "fmt" + "io" "log" "net/http" "net/http/httptest" @@ -332,22 +333,62 @@ func TestHandlerTimeout(t *testing.T) { close(c.Block) // To not leak a goroutine. } -func BenchmarkEncoding(b *testing.B) { +func TestGetWriter(t *testing.T) { + testCases := []struct { + name string + disableCompression bool + offeredCompressions []Compression + req http.Request + rsp http.ResponseWriter + w io.Writer + err error + }{ + { + name: "test without compression", + disableCompression: true, + offeredCompressions: defaultCompressionFormats, + req: http.Request{}, + rsp: http.ResponseWriter{}, + w: io.Writer{}, + err: nil, + }, + { + name: "test with gzip compression requested", + disableCompression: false, + offeredCompressions: defaultCompressionFormats, + req: http.Request{}, + rsp: http.ResponseWriter{}, + w: io.Writer{}, + err: nil, + }, + } + + for _, test := range testCases { + w, err := GetWriter(&test.req, test.rsp, test.disableCompression, test.offeredCompressions) + + if err != test.err { + t.Errorf("got error %v, expected %v", err, test.err) + } + } + +} + +func BenchmarkCompression(b *testing.B) { benchmarks := []struct { - name string - encodingType string + name string + compressionType string }{ { - name: "test with gzip encoding", - encodingType: "gzip", + name: "test with gzip compression", + compressionType: "gzip", }, { - name: "test with zstd encoding", - encodingType: "zstd", + name: "test with zstd compression", + compressionType: "zstd", }, { - name: "test with no encoding", - encodingType: "identity", + name: "test with no compression", + compressionType: "identity", }, } sizes := []struct { @@ -416,7 +457,7 @@ func BenchmarkEncoding(b *testing.B) { for i := 0; i < b.N; i++ { writer := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) - request.Header.Add("Accept-Encoding", benchmark.encodingType) + request.Header.Add("Accept-Encoding", benchmark.compressionType) handler.ServeHTTP(writer, request) } })