Skip to content

Commit

Permalink
Preemptively close chunks before big messages to isolate them (#1291)
Browse files Browse the repository at this point in the history
### Changelog
Close chunks when big messages are written to isolate them, leading to
much better read performance when going over small-message channels
since those isolated chunks can be skipped over entirely.

### Docs

None

### Description

A very common situation is to have a few topics with very large messages
(images, point clouds, etc.) and many topics with very small (and
frequent) messages (state, transforms, statuses, etc.). When writing all
those into a single ("merged") mcap file, it naturally leads to chunks
typically containing a series of small messages and ending with one big
message which fills the rest of the chunk and "closes" it. The result of
this is that nearly all chunks are "mixed" (i.e., messages from all
channels appear in each chunk). During playback / reading, some of the
client code (e.g., C++) implement logic to skip over chunks that don't
have any message of interest (topics being read). That logic is
completely thwarted if most chunks are mixed in this pathological
pattern.

So, this PR adds a very simple change. When an incoming message is very
large (larger than the chunk size), instead of writing that message to
the current chunk (mixing with prior smaller messages), it closes the
current chunk and writes the big message to a fresh new chunk (which
will be closed immediately after). The end result is that big-message
channels get isolated into their own series of chunks, enabling the
playback / readers to skip over them without loading those chunks to RAM
or decompressing them.

In theory (but I have not seen this), compression could also perform
better on the isolated channels, where it matters most. All I have seen
(see below) is that incompressible data (such a protobuf wireformat)
does not get uselessly compressed along with the larger "bytes" data
(images, point-clouds, etc.). This does not affect the resulting file
size but speeds up reading times. (BTW, it would be nice to be able to
configure the hardcoded to 1KB minimum compression size in the writer)

The only slight drawback that I can see is getting some smaller /
partial chunks. I guess there might be some rather unrealistic
pathological cases (like interleaving every big and small message) where
this could lead to a noticeable difference in file size (bigger chunk
index). If you want a writer option to disable that behavior, that's
fine, but I believe this should be the default.

I ran some tests on some simple mcap files with mixed channels, the
result are seen below.

<table><tr><th>Before</th><th>After</th></tr><tr><td>

Running `mcap-cli info` on the recorded mcap file (2.7 GB):

```
library:   libmcap 1.4.0                                               
profile:                                                               
messages:  31870                                                       
duration:  3m50.316691925s                                             
start:     2024-08-08T13:03:44.098069201-04:00 (1723136624.098069201)  
end:       2024-08-08T13:07:34.414761126-04:00 (1723136854.414761126)  
compression:
    zstd: [6724/6725 chunks] [3.30 GiB/2.52 GiB (23.58%)] [11.21 MiB/sec] 
    : [1/6725 chunks] [516.00 B/516.00 B (0.00%)] [2.00 B/sec] 
channels:
    (1) [redacted_topic_0]           2121 msgs (9.21 Hz)    : foxglove.Grid [protobuf]             
    (2) [redacted_topic_1]           2121 msgs (9.21 Hz)    : [redacted] [protobuf]     
    (3) [redacted_topic_2]           2297 msgs (9.97 Hz)    : foxglove.PointCloud [protobuf]       
    (4) [redacted_topic_3]           2297 msgs (9.97 Hz)    : [redacted] [protobuf]     
    (5) [redacted_topic_4]          11517 msgs (50.01 Hz)   : foxglove.FrameTransforms [protobuf]  
    (6) [redacted_topic_5]          11517 msgs (50.01 Hz)   : [redacted] [protobuf]     
attachments: 0
metadata: 1
```

Running `time mcap-cli cat --topic [redacted_topic_4]` on the recorded
mcap file (2.7 GB):

```
real  0m3.063s
user  0m4.179s
sys   0m0.662s
```

</td><td>

Running `mcap-cli info` on a recorded mcap file (2.7 GB):

```
library:   libmcap 1.4.0                                               
profile:                                                               
messages:  31870                                                       
duration:  3m50.316691925s                                             
start:     2024-08-08T13:03:44.098069201-04:00 (1723136624.098069201)  
end:       2024-08-08T13:07:34.414761126-04:00 (1723136854.414761126)  
compression:
    : [2121/8839 chunks] [276.02 KiB/276.02 KiB (0.00%)] [1.20 KiB/sec] 
    zstd: [6718/8839 chunks] [3.30 GiB/2.52 GiB (23.58%)] [11.21 MiB/sec] 
channels:
    (1) [redacted_topic_2]           2297 msgs (9.97 Hz)    : foxglove.PointCloud [protobuf]       
    (2) [redacted_topic_3]           2297 msgs (9.97 Hz)    : [redacted] [protobuf]     
    (3) [redacted_topic_4]          11517 msgs (50.01 Hz)   : foxglove.FrameTransforms [protobuf]  
    (4) [redacted_topic_5]          11517 msgs (50.01 Hz)   : [redacted] [protobuf]     
    (5) [redacted_topic_0]           2121 msgs (9.21 Hz)    : foxglove.Grid [protobuf]             
    (6) [redacted_topic_1]           2121 msgs (9.21 Hz)    : [redacted] [protobuf]     
attachments: 0
metadata: 1
```

Running `time mcap-cli cat --topic [redacted_topic_4]` on the recorded
mcap file (2.7 GB):

```
real  0m0.070s
user  0m0.059s
sys   0m0.024s
```

</td></tr></table>
  • Loading branch information
mikael-s-persson authored Jan 8, 2025
1 parent 9266303 commit b38d963
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
11 changes: 7 additions & 4 deletions cpp/mcap/include/mcap/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ struct MCAP_PUBLIC McapWriterOptions {
bool noSummary = false;
/**
* @brief Target uncompressed Chunk payload size in bytes. Once a Chunk's
* uncompressed data meets or exceeds this size, the Chunk will be compressed
* (if compression is enabled) and written to disk. Note that smaller Chunks
* may be written, such as the last Chunk in the Data section. This option is
* ignored if `noChunking=true`.
* uncompressed data is about to exceed this size, the Chunk will be
* compressed (if enabled) and written to disk. Note that this is a 'soft'
* ceiling as some Chunks could exceed this size due to either indexing
* data or when a single message is larger than `chunkSize`, in which case,
* the Chunk will contain only this one large message.
* This option is ignored if `noChunking=true`.
*/
uint64_t chunkSize = DefaultChunkSize;
/**
Expand Down Expand Up @@ -432,6 +434,7 @@ class MCAP_PUBLIC McapWriter final {
static uint64_t write(IWritable& output, const Footer& footer, bool crcEnabled);
static uint64_t write(IWritable& output, const Schema& schema);
static uint64_t write(IWritable& output, const Channel& channel);
static uint64_t getRecordSize(const Message& message);
static uint64_t write(IWritable& output, const Message& message);
static uint64_t write(IWritable& output, const Attachment& attachment);
static uint64_t write(IWritable& output, const Metadata& metadata);
Expand Down
19 changes: 16 additions & 3 deletions cpp/mcap/include/mcap/writer.inl
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,16 @@ Status McapWriter::write(const Message& message) {
++statistics_.channelCount;
}

// Before writing a message that would overflow the current chunk, close it.
auto* chunkWriter = getChunkWriter();
if (chunkWriter != nullptr && /* Chunked? */
uncompressedSize_ != 0 && /* Current chunk is not empty/new? */
9 + getRecordSize(message) + uncompressedSize_ >= chunkSize_ /* Overflowing? */) {
auto& fileOutput = *output_;
writeChunk(fileOutput, *chunkWriter);
}

// For the chunk-local message index.
const uint64_t messageOffset = uncompressedSize_;

// Write the message
Expand All @@ -565,8 +575,7 @@ Status McapWriter::write(const Message& message) {
channelMessageCounts[message.channelId] += 1;
}

auto* chunkWriter = getChunkWriter();
if (chunkWriter) {
if (chunkWriter != nullptr) {
if (!options_.noMessageIndex) {
// Update the message index
auto& messageIndex = currentMessageIndex_[message.channelId];
Expand Down Expand Up @@ -875,8 +884,12 @@ uint64_t McapWriter::write(IWritable& output, const Channel& channel) {
return 9 + recordSize;
}

uint64_t McapWriter::getRecordSize(const Message& message) {
return 2 + 4 + 8 + 8 + message.dataSize;
}

uint64_t McapWriter::write(IWritable& output, const Message& message) {
const uint64_t recordSize = 2 + 4 + 8 + 8 + message.dataSize;
const uint64_t recordSize = getRecordSize(message);

write(output, OpCode::Message);
write(output, recordSize);
Expand Down
12 changes: 7 additions & 5 deletions cpp/test/unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ TEST_CASE("McapReader::readMessages()", "[reader]") {

/**
* @brief ensures that message index records are only written for the channels present in the
* previous chunk. This test writes two chunks with one message each in separate channels.
* previous chunk. This test writes two chunks with one message each in separate channels, with
* the second message being large enough to guarantee the current chunk will be written out.
* If the writer is working correctly, there will be one message index record after each chunk,
* one for each message.
*/
Expand All @@ -539,7 +540,7 @@ TEST_CASE("Message index records", "[writer]") {

mcap::McapWriter writer;
mcap::McapWriterOptions opts("test");
opts.chunkSize = 100;
opts.chunkSize = 200;
opts.compression = mcap::Compression::None;

writer.open(buffer, opts);
Expand All @@ -552,9 +553,10 @@ TEST_CASE("Message index records", "[writer]") {
writer.addChannel(channel2);

mcap::Message msg;
std::vector<std::byte> data(150);
WriteMsg(writer, channel1.id, 0, 100, 100, data);
WriteMsg(writer, channel2.id, 0, 200, 200, data);
// First message should not fill first chunk.
WriteMsg(writer, channel1.id, 0, 100, 100, std::vector<std::byte>{20});
// Second message fills current chunk and triggers a new one.
WriteMsg(writer, channel2.id, 0, 200, 200, std::vector<std::byte>{400});

writer.close();

Expand Down

0 comments on commit b38d963

Please sign in to comment.