Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Parse incomplete chunks 3/9] Add ChunkInfo metadata to DS buffer to record (un)filled gap locations #1788

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ namespace px {
namespace stirling {

void DataStream::AddData(std::unique_ptr<SocketDataEvent> event) {
// Note that msg.size() includes filler \0 bytes under certain circumstances. See socket_trace.hpp
// for details.
LOG_IF(WARNING, event->attr.msg_size > event->msg.size() && !event->msg.empty())
<< absl::Substitute("Message truncated, original size: $0, transferred size: $1",
event->attr.msg_size, event->msg.size());

data_buffer_.Add(event->attr.pos, event->msg, event->attr.timestamp_ns);
data_buffer_.Add(event->attr.pos, event->msg, event->attr.timestamp_ns,
event->attr.incomplete_chunk, event->attr.bytes_missed);

has_new_events_ = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ typename TMapType::const_iterator MapLE(const TMapType& map, size_t key) {
} // namespace

void AlwaysContiguousDataStreamBufferImpl::Reset() {
VLOG(1) << "Resetting AlwaysContiguousDataStreamBufferImpl.";
buffer_.clear();
chunks_.clear();
timestamps_.clear();
Expand All @@ -57,7 +58,7 @@ bool AlwaysContiguousDataStreamBufferImpl::CheckOverlap(size_t pos, size_t size)
if (r_iter != chunks_.end()) {
// There is a chunk to the right. Check for overlap.
size_t r_pos = r_iter->first;
size_t r_size = r_iter->second;
size_t r_size = r_iter->second.size;

right_overlap = (pos + size > r_pos);
ECHECK(!right_overlap) << absl::Substitute(
Expand All @@ -73,7 +74,7 @@ bool AlwaysContiguousDataStreamBufferImpl::CheckOverlap(size_t pos, size_t size)
--l_iter;
// There is a chunk to the left. Check for overlap.
size_t l_pos = l_iter->first;
size_t l_size = l_iter->second;
size_t l_size = l_iter->second.size;

left_overlap = (pos < l_pos + l_size);
ECHECK(!left_overlap) << absl::Substitute(
Expand All @@ -85,49 +86,169 @@ bool AlwaysContiguousDataStreamBufferImpl::CheckOverlap(size_t pos, size_t size)
return left_overlap || right_overlap;
}

void AlwaysContiguousDataStreamBufferImpl::AddNewChunk(size_t pos, size_t size) {
void AlwaysContiguousDataStreamBufferImpl::AddNewChunk(size_t pos, size_t size,
chunk_t incomplete_chunk, size_t gap_size) {
// Look for the chunks to the left and right of this new chunk.
auto r_iter = chunks_.lower_bound(pos);
auto l_iter = r_iter;
if (l_iter != chunks_.begin()) {
--l_iter;
}

// Does this chunk fuse with the chunk on the left of it?
bool left_fuse = false;
if (l_iter != chunks_.end()) {
size_t l_pos = l_iter->first;
size_t l_size = l_iter->second;

left_fuse = (l_pos + l_size == pos);
if (incomplete_chunk == chunk_t::kSendFile) {
// SendFile is a special case where the data is not actually in the buffer.
// We just need to update the chunk_info metadata for closest chunk so that the filler
// event is processed properly (if lazy parsing is off).
// If pos is at the end of l_iter, then we need to update r_iter's chunk_info.
DCHECK_EQ(size, 0); // SendFile events should have size 0 because we don't track them in bpf
DCHECK_GE(gap_size, 0); // SendFile events should have a non-zero gap size
DCHECK_EQ(l_iter->first + l_iter->second.size, pos);
VLOG(1) << absl::Substitute("l_iter is [$0, $1]", l_iter->first,
l_iter->first + l_iter->second.size);
VLOG(1) << absl::Substitute("pos is at the end of l_iter, so updating l_iter's chunk_info");
// pos is at the end of l_iter, so update l_iter's chunk_info.
l_iter->second.AddIncompleteChunkInfo(
IncompleteChunkInfo(incomplete_chunk, pos, gap_size, pos));
VLOG(1) << absl::Substitute("l_iter now has incomplete_chunk: $0, at [$1, $2]",
l_iter->second.MostRecentIncompleteChunkInfo().incomplete_chunk,
l_iter->second.MostRecentIncompleteChunkInfo().gap_start,
l_iter->second.MostRecentIncompleteChunkInfo().gap_start +
l_iter->second.MostRecentIncompleteChunkInfo().gap_size);
return;
}

// Does this chunk fuse with the chunk on the left of it?
bool left_fuse = (l_iter != chunks_.end() && l_iter->first + l_iter->second.size == pos);
// Does this chunk fuse with the chunk on the right of it?
bool right_fuse = false;
if (r_iter != chunks_.end()) {
size_t r_pos = r_iter->first;

right_fuse = (pos + size == r_pos);
}
bool right_fuse = (r_iter != chunks_.end() && pos + size == r_iter->first);

if (left_fuse && right_fuse) {
// The new chunk bridges two previously separate chunks together.
// Keep the left one and increase its size to cover all three chunks.
l_iter->second += (size + r_iter->second);
chunks_.erase(r_iter);
l_iter->second.size += (size + r_iter->second.size);

// Left and right chunks must have been added prior to the middle chunk, so either
// 1. left chunk has no incomplete chunks
// 2. the most recent incomplete chunk has a filled gap,
// 3. or the new chunk is a filler event that completely plugs the gap of the left chunk.
DCHECK(!l_iter->second.HasIncompleteChunks() ||
l_iter->second.MostRecentIncompleteChunkInfo().gap_filled ||
incomplete_chunk == chunk_t::kFiller);

// Middle chunk cannot be incomplete, because it would have been added before the right chunk
// (which must be filler if the middle chunk is incomplete)
DCHECK(incomplete_chunk == chunk_t::kFullyFormed || incomplete_chunk == chunk_t::kFiller ||
incomplete_chunk == chunk_t::kHeaderEvent);

// New chunk is a filler event that plugs the gap of the left incomplete chunk.
if (incomplete_chunk == chunk_t::kFiller) {
DCHECK(l_iter->second.HasIncompleteChunks());
DCHECK_EQ(l_iter->second.MostRecentIncompleteChunkInfo().gap_start, pos);
DCHECK_EQ(l_iter->second.MostRecentIncompleteChunkInfo().gap_size, size);
DCHECK(!l_iter->second.MostRecentIncompleteChunkInfo().gap_filled);
// Mark the gap as filled: gap_start is pos, while gap_end should be pos + gap_size, or the
// size of this filler.
l_iter->second.MostRecentIncompleteChunkInfo().MarkGapAsFilled(pos, pos + size);
DCHECK(l_iter->second.MostRecentIncompleteChunkInfo().gap_filled);
}

// If right chunk is incomplete, the merged chunk (new left) should inherit its
// incompleteness.
if (r_iter->second.HasIncompleteChunks()) {
for (const auto& incomplete_chunk_info : r_iter->second.incomplete_chunks) {
l_iter->second.AddIncompleteChunkInfo(incomplete_chunk_info);
}
}
// If all chunks complete, then the merged chunk is complete
chunks_.erase(r_iter); // Remove the right chunk since it will be merged into left.
} else if (left_fuse) {
// Merge new chunk directly to the one on its left.
l_iter->second += size;
VLOG(1) << absl::Substitute("Merging new chunk [$0, $1] into left chunk [$2, $3]", pos,
pos + size, l_iter->first, l_iter->first + l_iter->second.size);
l_iter->second.size += size;
// 1. If the left chunk is incomplete with an unfilled gap starting at pos, this new chunk must
// be a filler event that partially or fully plugs the gap.
if (l_iter->second.HasIncompleteChunks() &&
l_iter->second.MostRecentIncompleteChunkInfo().gap_start == pos) {
VLOG(1) << absl::Substitute("Left chunk has incomplete_chunk: $0, gap_size: $1",
l_iter->second.MostRecentIncompleteChunkInfo().incomplete_chunk,
l_iter->second.MostRecentIncompleteChunkInfo().gap_size);
DCHECK(incomplete_chunk == chunk_t::kFiller ||
incomplete_chunk == chunk_t::kIncompleteFiller);
DCHECK_NE(l_iter->second.MostRecentIncompleteChunkInfo().gap_size, 0);
if (size == l_iter->second.MostRecentIncompleteChunkInfo().gap_size) {
// This filler event completely fills the gap of the incomplete chunk.
DCHECK_EQ(size, l_iter->second.MostRecentIncompleteChunkInfo().gap_size);
VLOG(1) << absl::Substitute(
"New chunk is a filler event that completely fills the left chunk's gap.");
// Mark the gap as filled: gap_start is pos, while gap_end should be pos + gap_size, or the
// size of this filler.
l_iter->second.MostRecentIncompleteChunkInfo().MarkGapAsFilled(pos, pos + size);
DCHECK(l_iter->second.MostRecentIncompleteChunkInfo().gap_filled);
} else {
// This filler event only partially fills the gap of the incomplete chunk.
// Add a new incomplete chunk info
l_iter->second.AddIncompleteChunkInfo(IncompleteChunkInfo(
chunk_t::kIncompleteFiller, pos,
l_iter->second.MostRecentIncompleteChunkInfo().gap_size - size, pos + size));
VLOG(1) << absl::Substitute(
"New chunk is an incomplete filler event that only partially fills the left chunk's "
"gap.");
}
}

// 2. If the new chunk is incomplete (and not filler), the left chunk must be complete or an
// incomplete chunk with a filled gap.
if (incomplete_chunk != chunk_t::kFullyFormed && incomplete_chunk != chunk_t::kFiller &&
incomplete_chunk != chunk_t::kIncompleteFiller &&
incomplete_chunk != chunk_t::kHeaderEvent) {
DCHECK_GE(gap_size, 0); // new incomplete chunk must have a gap
DCHECK(!l_iter->second.HasIncompleteChunks() ||
l_iter->second.MostRecentIncompleteChunkInfo().gap_filled);
// The merged chunk (new left) should inherit the new chunk's incompleteness.
l_iter->second.AddIncompleteChunkInfo(
IncompleteChunkInfo(incomplete_chunk, pos, gap_size, pos + size));
VLOG(1) << absl::Substitute(
"New chunk has size: $0, gap_start: $1, incomplete_chunk: $2, gap_size: $3", size,
l_iter->second.MostRecentIncompleteChunkInfo().gap_start, incomplete_chunk, gap_size);
}
} else if (right_fuse) {
// Merge new chunk into the one on its right.
VLOG(1) << absl::Substitute("Merging new chunk [$0, $1] into right chunk [$2, $3]", pos,
pos + size, r_iter->first, r_iter->first + r_iter->second.size);
// 1. New chunk cannot be incomplete, because the right would have to be a filler event added
// after it nor can it be filler, because then it would merge into an incomplete chunk on its
// left
DCHECK(incomplete_chunk == chunk_t::kFullyFormed || incomplete_chunk == chunk_t::kFiller);
// 2. If right chunk is incomplete, the merged chunk should inherit its incompleteness.
if (r_iter->second.HasIncompleteChunks()) {
VLOG(1) << absl::Substitute("Right chunk has incomplete_chunk: $0, gap_size: $1",
r_iter->second.MostRecentIncompleteChunkInfo().incomplete_chunk,
r_iter->second.MostRecentIncompleteChunkInfo().gap_size);
}
// Since its key changes, this requires removing and re-inserting the node.
auto node = chunks_.extract(r_iter);
auto node = chunks_.extract(r_iter); // transfers ChunkInfo from right chunk to merged chunk
node.key() = pos;
node.mapped() += size;
node.mapped().size += size;
chunks_.insert(std::move(node));
} else {
// No fusing, so just add the new chunk.
chunks_[pos] = size;
VLOG(1) << absl::Substitute("Adding new chunk at pos: [$0, $1]", pos, pos + size);
ChunkInfo new_chunk_info(size);

// If the new chunk is incomplete (i.e. has a gap) we take note of the gap size and the start of
// the incomplete event.
if (incomplete_chunk != kFullyFormed && incomplete_chunk != kHeaderEvent) {
DCHECK_GE(gap_size, 0);
size_t incomplete_event_start = pos; // start of the incomplete event (not the gap or filler)
size_t gap_start = pos + size; // start of the gap at the end of this chunk
IncompleteChunkInfo new_incomplete_chunk_info(incomplete_chunk, incomplete_event_start,
gap_size, gap_start);
new_chunk_info.AddIncompleteChunkInfo(new_incomplete_chunk_info);
VLOG(1) << absl::Substitute("New chunk has incomplete_chunk: $0, gap_size: $1, size: $2",
incomplete_chunk, gap_size, size);
}
chunks_.emplace(pos, new_chunk_info);
}
}

Expand All @@ -136,7 +257,8 @@ void AlwaysContiguousDataStreamBufferImpl::AddNewTimestamp(size_t pos, uint64_t
}

void AlwaysContiguousDataStreamBufferImpl::Add(size_t pos, std::string_view data,
uint64_t timestamp) {
uint64_t timestamp, chunk_t incomplete_chunk,
size_t gap_size) {
if (data.size() > capacity_) {
size_t oversize_amount = data.size() - capacity_;
data.remove_prefix(oversize_amount);
Expand Down Expand Up @@ -229,22 +351,25 @@ void AlwaysContiguousDataStreamBufferImpl::Add(size_t pos, std::string_view data

if (CheckOverlap(pos, data.size())) {
// This chunk overlaps with an existing chunk. Don't add the new chunk.
LOG(WARNING) << absl::Substitute(
"Not adding chunk at pos $0, size $1, because it overlaps with an existing chunk.", pos,
data.size());
return;
}

// Now copy the data into the buffer.
memcpy(buffer_.data() + ppos_front, data.data(), data.size());

// Update the metadata.
AddNewChunk(pos, data.size());
AddNewChunk(pos, data.size(), incomplete_chunk, gap_size);
AddNewTimestamp(pos, timestamp);

if (run_metadata_cleanup) {
CleanupMetadata();
}
}

std::map<size_t, size_t>::const_iterator AlwaysContiguousDataStreamBufferImpl::GetChunkForPos(
std::map<size_t, ChunkInfo>::const_iterator AlwaysContiguousDataStreamBufferImpl::GetChunkForPos(
size_t pos) const {
// Get chunk which is <= pos.
auto iter = MapLE(chunks_, pos);
Expand All @@ -255,14 +380,23 @@ std::map<size_t, size_t>::const_iterator AlwaysContiguousDataStreamBufferImpl::G
DCHECK_GE(pos, iter->first);

// Does the chunk include pos? If not, return {}.
ssize_t available = iter->second - (pos - iter->first);
ssize_t available = iter->second.size - (pos - iter->first);
if (available <= 0) {
return chunks_.cend();
}

return iter;
}

ChunkInfo AlwaysContiguousDataStreamBufferImpl::GetChunkInfoForHead() {
auto iter = GetChunkForPos(position_);

if (iter == chunks_.cend()) {
return ChunkInfo(0);
}
return iter->second;
}

void AlwaysContiguousDataStreamBufferImpl::EnforceTimestampMonotonicity(size_t pos,
size_t chunk_end) {
// Get timestamp for chunk which is <= pos.
Expand Down Expand Up @@ -293,8 +427,9 @@ std::string_view AlwaysContiguousDataStreamBufferImpl::Get(size_t pos) {
return {};
}

size_t chunk_pos = iter->first; // start of contiguous head
size_t chunk_size = iter->second; // size of contiguous (already merged in Add)
ChunkInfo chunk_info = iter->second;
size_t chunk_pos = iter->first;
size_t chunk_size = chunk_info.size;

// since we only call Get() in Head() and the event parser fully processes a contiguous head,
// we need only enforce timestamp monotonicity once per head.
Expand Down Expand Up @@ -345,7 +480,7 @@ void AlwaysContiguousDataStreamBufferImpl::CleanupChunks() {
}

size_t chunk_pos = iter->first;
size_t chunk_size = iter->second;
size_t chunk_size = iter->second.size;

DCHECK_GE(position_, chunk_pos);
ssize_t available = chunk_size - (position_ - chunk_pos);
Expand All @@ -362,7 +497,7 @@ void AlwaysContiguousDataStreamBufferImpl::CleanupChunks() {
DCHECK(!chunks_.empty());
auto node = chunks_.extract(chunks_.begin());
node.key() = position_;
node.mapped() = available;
node.mapped().size = available;
chunks_.insert(std::move(node));
}

Expand Down Expand Up @@ -420,7 +555,7 @@ size_t AlwaysContiguousDataStreamBufferImpl::EndPosition() {
size_t end_position = position_;
if (!chunks_.empty()) {
auto last_chunk = std::prev(chunks_.end());
end_position = last_chunk->first + last_chunk->second;
end_position = last_chunk->first + last_chunk->second.size;
}
return end_position;
}
Expand All @@ -431,8 +566,9 @@ std::string AlwaysContiguousDataStreamBufferImpl::DebugInfo() const {
absl::StrAppend(&s, absl::Substitute("Position: $0\n", position_));
absl::StrAppend(&s, absl::Substitute("BufferSize: $0/$1\n", buffer_.size(), capacity_));
absl::StrAppend(&s, "Chunks:\n");
for (const auto& [pos, size] : chunks_) {
absl::StrAppend(&s, absl::Substitute(" position:$0 size:$1\n", pos, size));
for (const auto& [pos, chunk_info] : chunks_) {
absl::StrAppend(&s, absl::Substitute(" position:$0 size:$1 incomplete_chunks:$2", pos,
chunk_info.size, chunk_info.incomplete_chunks.size()));
}
absl::StrAppend(&s, "Timestamps:\n");
for (const auto& [pos, timestamp] : timestamps_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
max_gap_size_(max_gap_size),
allow_before_gap_size_(allow_before_gap_size) {}

void Add(size_t pos, std::string_view data, uint64_t timestamp) override;
void Add(size_t pos, std::string_view data, uint64_t timestamp,
chunk_t incomplete_chunk = chunk_t::kFullyFormed, size_t gap_size = 0) override;

std::string_view Head() override { return Get(position_); }

Expand All @@ -59,9 +60,16 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl {

void ShrinkToFit() override { buffer_.shrink_to_fit(); }

// For a contiguous section pointed to by position_ (the head), return
// ChunkInfo, which contains the size of the chunk and whether it is incomplete.
// If incomplete, ChunkInfo also tells us where the incomplete event starts
// Note that with filler events, a single contiguous head may contain multiple
// incomplete chunks joined together with filler bytes.
ChunkInfo GetChunkInfoForHead() override;

private:
std::map<size_t, size_t>::const_iterator GetChunkForPos(size_t pos) const;
void AddNewChunk(size_t pos, size_t size);
std::map<size_t, ChunkInfo>::const_iterator GetChunkForPos(size_t pos) const;
void AddNewChunk(size_t pos, size_t size, chunk_t incomplete_chunk, size_t gap_size);
void AddNewTimestamp(size_t pos, uint64_t timestamp);

// CheckOverlap checks if the chunk to be added as indicated by pos and size
Expand Down Expand Up @@ -98,7 +106,7 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
// Map of chunk start positions to chunk sizes.
// A chunk is a contiguous sequence of bytes.
// Adjacent chunks are always fused, so a chunk either ends at a gap or the end of the buffer.
std::map<size_t, size_t> chunks_;
std::map<size_t, ChunkInfo> chunks_;

// Map of positions to timestamps.
// Unlike chunks_, which will fuse when adjacent, timestamps never fuse.
Expand Down
Loading
Loading