Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazily allocate context for chunk compression #1249

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go/go.work
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
go 1.22.5
go 1.23

toolchain go1.23.1

use (
./cli/mcap
Expand Down
162 changes: 160 additions & 2 deletions go/go.work.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go/mcap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
15 changes: 13 additions & 2 deletions go/mcap/go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions go/mcap/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func TestReadingMessageOrderWithOverlappingChunks(t *testing.T) {
}
var now uint64 = 100
addMsg(now)
for writer.compressedWriter.Size() != 0 {
for writer.uncompressedSize() != 0 {
now += 10
addMsg(now)
}
Expand All @@ -782,7 +782,7 @@ func TestReadingMessageOrderWithOverlappingChunks(t *testing.T) {
now -= 55

addMsg(now)
for writer.compressedWriter.Size() != 0 {
for writer.uncompressedSize() != 0 {
now += 10
addMsg(now)
}
Expand Down
182 changes: 104 additions & 78 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ type Writer struct {
// MetadataIndexes created over the course of the recording.
MetadataIndexes []*MetadataIndex

channelIDs []uint16
schemaIDs []uint16
channels map[uint16]*Channel
schemas map[uint16]*Schema
messageIndexes map[uint16]*MessageIndex
w *writeSizer
buf []byte
msg []byte
uncompressed *bytes.Buffer
compressed *bytes.Buffer
compressedWriter *countingCRCWriter
channelIDs []uint16
schemaIDs []uint16
channels map[uint16]*Channel
schemas map[uint16]*Schema
messageIndexes map[uint16]*MessageIndex
w *writeSizer
buf []byte
msg []byte
uncompressed bytes.Buffer

currentChunkStartTime uint64
currentChunkEndTime uint64
Expand Down Expand Up @@ -110,7 +108,7 @@ func (w *Writer) WriteSchema(s *Schema) (err error) {
offset += putPrefixedString(w.msg[offset:], s.Encoding)
offset += putPrefixedBytes(w.msg[offset:], s.Data)
if w.opts.Chunked && !w.closed {
_, err = w.writeRecord(w.compressedWriter, OpSchema, w.msg[:offset])
_, err = w.writeRecord(w.uncompressedWriter(), OpSchema, w.msg[:offset])
} else {
_, err = w.writeRecord(w.w, OpSchema, w.msg[:offset])
}
Expand Down Expand Up @@ -149,7 +147,7 @@ func (w *Writer) WriteChannel(c *Channel) error {
offset += copy(w.msg[offset:], userdata)
var err error
if w.opts.Chunked && !w.closed {
_, err = w.writeRecord(w.compressedWriter, OpChannel, w.msg[:offset])
_, err = w.writeRecord(w.uncompressedWriter(), OpChannel, w.msg[:offset])
if err != nil {
return err
}
Expand Down Expand Up @@ -193,8 +191,8 @@ func (w *Writer) WriteMessage(m *Message) error {
}
w.messageIndexes[m.ChannelID] = idx
}
idx.Add(m.LogTime, uint64(w.compressedWriter.Size()))
_, err := w.writeRecord(w.compressedWriter, OpMessage, w.msg[:offset])
idx.Add(m.LogTime, uint64(w.uncompressedSize()))
_, err := w.writeRecord(w.uncompressedWriter(), OpMessage, w.msg[:offset])
if err != nil {
return err
}
Expand All @@ -205,7 +203,7 @@ func (w *Writer) WriteMessage(m *Message) error {
if m.LogTime < w.currentChunkStartTime {
w.currentChunkStartTime = m.LogTime
}
if w.compressedWriter.Size() > w.opts.ChunkSize {
if int64(w.uncompressedSize()) > w.opts.ChunkSize {
err := w.flushActiveChunk()
if err != nil {
return err
Expand Down Expand Up @@ -414,21 +412,34 @@ func (w *Writer) WriteDataEnd(e *DataEnd) error {
return err
}

func (w *Writer) uncompressedSize() int {
return w.uncompressed.Len()
}

func (w *Writer) flushActiveChunk() error {
if w.compressedWriter.Size() == 0 {
if w.uncompressedSize() == 0 {
return nil
}

err := w.compressedWriter.Close()
var buf bytes.Buffer
cw, err := w.newCompressedWriter(&buf)
if err != nil {
return err
return fmt.Errorf("creating compressed writer: %w", err)
}
_, err = cw.Write(w.uncompressed.Bytes())
if err != nil {
return fmt.Errorf("writing uncompressed data to compressor: %w", err)
}
err = cw.Close()
if err != nil {
return fmt.Errorf("finalizing compressor: %w", err)
}
crc := w.compressedWriter.CRC()
compressedlen := w.compressed.Len()
uncompressedlen := w.compressedWriter.Size()

compressedLen := buf.Len()
uncompressedLen := w.uncompressedSize()
// the "top fields" are all fields of the chunk record except for the compressed records.
topFieldsLen := 8 + 8 + 8 + 4 + 4 + len(w.opts.Compression) + 8
msglen := topFieldsLen + compressedlen
msgLen := topFieldsLen + compressedLen
chunkStartOffset := w.w.Size()
var start, end uint64
if w.currentChunkMessageCount != 0 {
Expand All @@ -446,25 +457,22 @@ func (w *Writer) flushActiveChunk() error {
return err
}

offset += putUint64(w.msg[offset:], uint64(msglen))
offset += putUint64(w.msg[offset:], uint64(msgLen))
offset += putUint64(w.msg[offset:], start)
offset += putUint64(w.msg[offset:], end)
offset += putUint64(w.msg[offset:], uint64(uncompressedlen))
offset += putUint32(w.msg[offset:], crc)
offset += putUint64(w.msg[offset:], uint64(uncompressedLen))
offset += putUint32(w.msg[offset:], cw.CRC())
offset += putPrefixedString(w.msg[offset:], string(w.opts.Compression))
offset += putUint64(w.msg[offset:], uint64(w.compressed.Len()))
offset += putUint64(w.msg[offset:], uint64(buf.Len()))
_, err = w.w.Write(w.msg[:offset])
if err != nil {
return err
}
_, err = w.w.Write(w.compressed.Bytes())
_, err = w.w.Write(buf.Bytes())
if err != nil {
return err
}
w.compressed.Reset()
w.compressedWriter.Reset(w.compressed)
w.compressedWriter.ResetSize()
w.compressedWriter.ResetCRC()
w.uncompressed.Reset()
chunkEndOffset := w.w.Size()

// message indexes
Expand Down Expand Up @@ -492,8 +500,8 @@ func (w *Writer) flushActiveChunk() error {
MessageIndexOffsets: messageIndexOffsets,
MessageIndexLength: messageIndexLength,
Compression: w.opts.Compression,
CompressedSize: uint64(compressedlen),
UncompressedSize: uint64(uncompressedlen),
CompressedSize: uint64(compressedLen),
UncompressedSize: uint64(uncompressedLen),
})
for _, idx := range w.messageIndexes {
idx.Reset()
Expand Down Expand Up @@ -744,13 +752,16 @@ const (
)

type CustomCompressor interface {
Compressor() ResettableWriteCloser
NewCompressor() (ResettableWriteCloser, error)
Compression() CompressionFormat
}

// NewCustomCompressor returns a structure that may be supplied to writer
// options as a custom chunk compressor.
func NewCustomCompressor(compression CompressionFormat, compressor ResettableWriteCloser) CustomCompressor {
func NewCustomCompressor(
compression CompressionFormat,
compressor func() (ResettableWriteCloser, error),
) CustomCompressor {
return &customCompressor{
compression: compression,
compressor: compressor,
Expand All @@ -759,11 +770,11 @@ func NewCustomCompressor(compression CompressionFormat, compressor ResettableWri

type customCompressor struct {
compression CompressionFormat
compressor ResettableWriteCloser
compressor func() (ResettableWriteCloser, error)
}

func (c *customCompressor) Compressor() ResettableWriteCloser {
return c.compressor
func (c *customCompressor) NewCompressor() (ResettableWriteCloser, error) {
return c.compressor()
}

func (c *customCompressor) Compression() CompressionFormat {
Expand Down Expand Up @@ -856,57 +867,67 @@ func encoderLevelFromZstd(level CompressionLevel) zstd.EncoderLevel {
}
}

func newCompressor(compressed *bytes.Buffer, opts *WriterOptions) (ResettableWriteCloser, error) {
switch {
case opts.Compressor != nil: // must be top
// override the compression option. We can't check for a mismatch here
// because "none compression" is an empty string.
opts.Compression = opts.Compressor.Compression()
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
c, err := opts.Compressor.NewCompressor()
if err != nil {
return nil, err
}
c.Reset(compressed)
return c, nil
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
return zstd.NewWriter(compressed, zstd.WithEncoderLevel(level))
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
return lzw, nil
case opts.Compression == CompressionNone:
return newCountingCRCWriter(bufCloser{compressed}, opts.IncludeCRC), nil
default:
return nil, fmt.Errorf("unsupported compression: %v", opts.Compression)
}
}

func (w *Writer) newCompressedWriter(compressed *bytes.Buffer) (*countingCRCWriter, error) {
opts := w.opts
if !opts.Chunked {
panic("this should never get called when chunking isn't enabled")
}
compressor, err := newCompressor(compressed, opts)
if err != nil {
return nil, err
}
return newCountingCRCWriter(compressor, opts.IncludeCRC), nil
}

// NewWriter returns a new MCAP writer.
func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) {
func NewWriter(w io.Writer, opts *WriterOptions) (ret *Writer, err error) {
writer := newWriteSizer(w, opts.IncludeCRC)
if !opts.SkipMagic {
if _, err := writer.Write(Magic); err != nil {
return nil, err
}
}
compressed := bytes.Buffer{}
var compressedWriter *countingCRCWriter
if opts.Chunked {
switch {
case opts.Compressor != nil: // must be top
// override the compression option. We can't check for a mismatch here
// because "none compression" is an empty string.
opts.Compression = opts.Compressor.Compression()
if opts.Compressor.Compression() == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
opts.Compressor.Compressor().Reset(&compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor.Compressor(), opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(&compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{&compressed}, opts.IncludeCRC)
default:
return nil, fmt.Errorf("unsupported compression")
}
if opts.ChunkSize == 0 {
opts.ChunkSize = 1024 * 1024
}
if opts.ChunkSize == 0 {
opts.ChunkSize = 1024 * 1024
}
return &Writer{
// Should we check here that compression options are valid? For now I haven't added it as it's
// going to come up pretty quickly while writing anyway, and introduces some extra complexity.
ret = &Writer{
w: writer,
buf: make([]byte, 32),
channels: make(map[uint16]*Channel),
schemas: make(map[uint16]*Schema),
messageIndexes: make(map[uint16]*MessageIndex),
uncompressed: &bytes.Buffer{},
compressed: &compressed,
compressedWriter: compressedWriter,
currentChunkStartTime: math.MaxUint64,
currentChunkEndTime: 0,
currentChunkMessageCount: 0,
Expand All @@ -916,5 +937,10 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) {
MessageEndTime: 0,
},
opts: opts,
}, nil
}
return ret, nil
}

func (w *Writer) uncompressedWriter() io.Writer {
return &w.uncompressed
}
16 changes: 9 additions & 7 deletions go/mcap/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,16 +703,18 @@ func assertReadable(t *testing.T, rs io.ReadSeeker) {
func TestBYOCompressor(t *testing.T) {
buf := &bytes.Buffer{}
// example - custom lz4 settings
lzw := lz4.NewWriter(nil)
blockCount := 0
require.NoError(t, lzw.Apply(lz4.OnBlockDoneOption(func(int) {
blockCount++
})))

writer, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compressor: NewCustomCompressor("lz4", lzw),
Chunked: true,
ChunkSize: 1024,
Compressor: NewCustomCompressor("lz4", func() (ResettableWriteCloser, error) {
lzw := lz4.NewWriter(nil)
require.NoError(t, lzw.Apply(lz4.OnBlockDoneOption(func(int) {
blockCount++
})))
return lzw, nil
}),
})
require.NoError(t, err)

Expand Down
Loading