Skip to content

Commit

Permalink
add coders pull
Browse files Browse the repository at this point in the history
  • Loading branch information
massimo-ua committed Jan 16, 2025
1 parent 4866761 commit dace026
Show file tree
Hide file tree
Showing 6 changed files with 1,840 additions and 1,579 deletions.
239 changes: 230 additions & 9 deletions pkg/ccl/changefeedccl/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
stdgzip "compress/gzip"
"io"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
Expand All @@ -26,6 +27,121 @@ var useFastGzip = settings.RegisterBoolSetting(
),
settings.WithPublic)

type poolableEncoder interface {
io.Writer
io.Closer
Reset(w io.Writer)
RefPool(p *codersPool)
}

type poolableDecoder interface {
io.Reader
io.Closer
Reset(r io.Reader)
RefPool(p *codersPool)
}

type gzipEncoder struct {
*stdgzip.Writer
pool *codersPool
}

func (e *gzipEncoder) Close() error {
if err := e.Writer.Close(); err != nil {
return err
}
if e.pool != nil {
e.pool.gzipEncoders.Put(e)
}
return nil
}

func (e *gzipEncoder) RefPool(p *codersPool) {
e.pool = p
}

type zstdEncoder struct {
*zstd.Encoder
pool *codersPool
}

func (e *zstdEncoder) Close() error {
if err := e.Encoder.Close(); err != nil {
return err
}
if e.pool != nil {
e.pool.zstdEncoders.Put(e)
}
return nil
}

func (e *zstdEncoder) RefPool(p *codersPool) {
e.pool = p
}

type pgzipEncoder struct {
*pgzip.Writer
pool *codersPool
}

func (e *pgzipEncoder) Close() error {
if err := e.Writer.Close(); err != nil {
return err
}
if e.pool != nil {
e.pool.gzipEncoders.Put(e)
}
return nil
}

func (e *pgzipEncoder) RefPool(p *codersPool) {
e.pool = p
}

type zstdDecoder struct {
*zstd.Decoder
pool *codersPool
}

func (d *zstdDecoder) Close() error {
d.Decoder.Close()

if d.pool != nil {
d.pool.zstdDecoders.Put(d)
}
return nil
}

func (d *zstdDecoder) RefPool(p *codersPool) {
d.pool = p
}

type pgzipDecoder struct {
*pgzip.Reader
pool *codersPool
}

func (d *pgzipDecoder) Close() error {
if err := d.Reader.Close(); err != nil {
return err
}
if d.pool != nil {
d.pool.gzipDecoders.Put(d)
}
return nil
}

func (d *pgzipDecoder) RefPool(p *codersPool) {
d.pool = p
}

type codersPool struct {
gzipEncoders sync.Pool
zstdEncoders sync.Pool
gzipDecoders sync.Pool
zstdDecoders sync.Pool
}

type compressionAlgo string

const sinkCompressionGzip compressionAlgo = "gzip"
Expand All @@ -45,11 +161,25 @@ func newCompressionCodec(
switch algo {
case sinkCompressionGzip:
if useFastGzip.Get(sv) {
return pgzip.NewWriterLevel(dest, pgzip.DefaultCompression)
return &pgzipEncoder{
pgzip.NewWriter(dest), nil,
}, nil
}

coder, err := stdgzip.NewWriterLevel(dest, stdgzip.DefaultCompression)

if err != nil {
return nil, err
}
return stdgzip.NewWriterLevel(dest, stdgzip.DefaultCompression)

return &gzipEncoder{coder, nil}, nil
case sinkCompressionZstd:
return zstd.NewWriter(dest, zstd.WithEncoderLevel(zstd.SpeedFastest))
coder, err := zstd.NewWriter(dest, zstd.WithEncoderLevel(zstd.SpeedFastest))
if err != nil {
return nil, err
}

return &zstdEncoder{coder, nil}, nil
default:
return nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo)
}
Expand All @@ -60,17 +190,19 @@ func newDecompressionReader(algo compressionAlgo, src io.Reader) (io.ReadCloser,
switch algo {
case sinkCompressionGzip:
// since we are using decompression only for reading error response body, we can use default reader
return pgzip.NewReader(src)
decoder, err := pgzip.NewReader(src)
if err != nil {
return nil, err
}

return &pgzipDecoder{decoder, nil}, nil
case sinkCompressionZstd:
// zstd reader does not implement io.Closer interface, so we need to wrap it
decoder, err := zstd.NewReader(src)
if err != nil {
return nil, err
}
return struct {
io.Reader
io.Closer
}{decoder, io.NopCloser(nil)}, nil

return &zstdDecoder{decoder, nil}, nil
default:
return nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo)
}
Expand All @@ -86,3 +218,92 @@ func compressionFromString(algo string) (_ compressionAlgo, ext string, _ error)
}
return "", "", errors.AssertionFailedf("unsupported compression algorithm %q", algo)
}

func newCodersPool(sv *settings.Values) *codersPool {
return &codersPool{
gzipEncoders: sync.Pool{
New: func() any {
coder, err := newCompressionCodec(sinkCompressionGzip, sv, nil)

if err != nil {
return nil
}

return coder
},
},
zstdEncoders: sync.Pool{
New: func() any {
coder, err := newCompressionCodec(sinkCompressionZstd, sv, nil)

if err != nil {
return nil
}

return coder
},
},
gzipDecoders: sync.Pool{
New: func() any {
decoder, err := newDecompressionReader(sinkCompressionGzip, nil)

if err != nil {
return nil
}

return decoder
},
},
zstdDecoders: sync.Pool{
New: func() any {
decoder, err := newDecompressionReader(sinkCompressionZstd, nil)

if err != nil {
return nil
}

return decoder
},
},
}
}

func (p *codersPool) GetEncoder(algo compressionAlgo, w io.Writer) (io.WriteCloser, error) {
var coder poolableEncoder
switch algo {
case sinkCompressionGzip:
coder = p.gzipEncoders.Get().(poolableEncoder)
case sinkCompressionZstd:
coder = p.zstdEncoders.Get().(poolableEncoder)
default:
return nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo)
}

if coder == nil {
return nil, errors.New("failed to get encoder from the pool")
}

coder.Reset(w)
coder.RefPool(p)
return coder, nil
}

func (p *codersPool) GetDecoder(algo compressionAlgo, r io.Reader) (io.ReadCloser, error) {
var decoder poolableDecoder
switch algo {
case sinkCompressionGzip:
decoder = p.gzipDecoders.Get().(poolableDecoder)
case sinkCompressionZstd:
decoder = p.zstdDecoders.Get().(poolableDecoder)
default:
return nil, errors.AssertionFailedf("unsupported compression algorithm %q", algo)
}

if decoder == nil {
return nil, errors.New("failed to get decoder from the pool")
}

decoder.Reset(r)
decoder.RefPool(p)
return decoder, nil
}
Loading

0 comments on commit dace026

Please sign in to comment.