Skip to content

Commit

Permalink
Move timestamp monotonicity check to DataStreamBuffer::Head() to ac…
Browse files Browse the repository at this point in the history
…commodate #1732 (#1761)

Summary: Preemptively adapts the timestamp monotonicity change
introduced in #1733 to the last stitcher api PR #1732, which modifies
`ParseResult.frame_positions` to be an unordered `flat_hash_map`. This
changes the order in which `GetTimestamp` is called because we are now
iterating over an unordered map of streamIDs to positions when matching
timestamps with the parsed frames in the
[event_parser](https://github.com/pixie-io/pixie/blob/e6bfab707f1f4871f4b7b8ed53321ec9e7b5807d/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h#L138C29-L138C36).

Previously, we were always iterating over the `frame_position` with the
oldest timestamp first, meaning that `prev_timestamp_` in the datastream
buffer was set correctly. With `frame_positions` being an unordered map,
we no longer have this guarantee.

To address this, we move the monotonicity check to the `Head()`
implementation of the datastream buffer and enforce increasing
timestamps for the contiguous chunk returned by `Head()` only.

Type of change: /kind bug

Test Plan: Extended the data stream buffer test + existing targets.

Signed-off-by: Benjamin Kilimnik <[email protected]>
  • Loading branch information
benkilimnik authored Nov 8, 2023
1 parent 62b8080 commit 4dce436
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,42 @@ std::map<size_t, size_t>::const_iterator AlwaysContiguousDataStreamBufferImpl::G
return iter;
}

std::string_view AlwaysContiguousDataStreamBufferImpl::Get(size_t pos) const {
void AlwaysContiguousDataStreamBufferImpl::EnforceTimestampMonotonicity(size_t pos,
size_t chunk_end) {
// Get timestamp for chunk which is <= pos.
auto it = timestamps_.upper_bound(pos);
if (it == timestamps_.begin()) {
return;
}
--it;

// Loop from chunk_start up to but not including chunk_end.
// The next element after chunk_end should not be part of the contiguous block.
prev_timestamp_ = 0;
for (; it != timestamps_.end() && it->first < chunk_end; ++it) {
if (prev_timestamp_ > 0 && it->second < prev_timestamp_) {
LOG(WARNING) << absl::Substitute(
"For chunk pos $0, detected non-monotonically increasing timestamp $1. Adjusting to "
"previous timestamp + 1: $2",
it->first, it->second, prev_timestamp_ + 1);
it->second = prev_timestamp_ + 1;
}
prev_timestamp_ = it->second;
}
}

std::string_view AlwaysContiguousDataStreamBufferImpl::Get(size_t pos) {
auto iter = GetChunkForPos(pos);
if (iter == chunks_.cend()) {
return {};
}

size_t chunk_pos = iter->first;
size_t chunk_size = iter->second;
size_t chunk_pos = iter->first; // start of contiguous head
size_t chunk_size = iter->second; // size of contiguous (already merged in Add)

// since we only call Get() in Head() and the event parser fully processes a contiguous head,
// we need only enforce timestamp monotonicity once per head.
EnforceTimestampMonotonicity(chunk_pos, chunk_pos + chunk_size);

ssize_t bytes_available = chunk_size - (pos - chunk_pos);
DCHECK_GT(bytes_available, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
size_t EndPosition();

// Get a string_view for the chunk at pos.
std::string_view Get(size_t pos) const;
std::string_view Get(size_t pos);

// Ensure that timestamps are monotonically increasing for a given chunk.
void EnforceTimestampMonotonicity(size_t chunk_start, size_t chunk_end);

const size_t capacity_;
const size_t max_gap_size_;
Expand All @@ -101,6 +104,7 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
// Unlike chunks_, which will fuse when adjacent, timestamps never fuse.
// Also, we don't track gaps in the buffer with timestamps; must use chunks_ for that.
std::map<size_t, uint64_t> timestamps_;
size_t prev_timestamp_ = 0;
};

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,7 @@ class DataStreamBuffer {
* @param pos The logical position of the data.
* @return The timestamp or error if the position does not contain valid data.
*/
StatusOr<uint64_t> GetTimestamp(size_t pos) {
StatusOr<uint64_t> timestamp_ns_status = impl_->GetTimestamp(pos);
if (!timestamp_ns_status.ok()) {
return timestamp_ns_status;
}
uint64_t current_timestamp_ns = timestamp_ns_status.ConsumeValueOrDie();
if (current_timestamp_ns < prev_timestamp_ns_) {
LOG(WARNING) << "Detected non-monotonically increasing timestamp " << current_timestamp_ns
<< ". Adjusting to previous timestamp + 1: " << prev_timestamp_ns_ + 1;
current_timestamp_ns = prev_timestamp_ns_ + 1;
}
prev_timestamp_ns_ = current_timestamp_ns;
return current_timestamp_ns;
}
StatusOr<uint64_t> GetTimestamp(size_t pos) { return impl_->GetTimestamp(pos); }

/**
* Remove n bytes from the head of the buffer.
Expand Down Expand Up @@ -162,7 +149,6 @@ class DataStreamBuffer {

private:
std::unique_ptr<DataStreamBufferImpl> impl_;
uint64_t prev_timestamp_ns_ = 0;
};

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,15 @@ TEST_P(DataStreamBufferTest, Timestamp) {
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(7), 4);
EXPECT_NOT_OK(stream_buffer.GetTimestamp(8));

// Test automatic adjustment of non-monotonic timestamp
stream_buffer.Add(8, "89", 3); // timestamp is 3, which is less than previous timestamp 4
// Same timestamp as previous timestamp
stream_buffer.Add(8, "89", 4);
EXPECT_EQ(stream_buffer.Head(), "123456789");
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(8),
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(8), 4);

// Test automatic adjustment of non-monotonic timestamp
stream_buffer.Add(10, "ab", 3); // timestamp is 3, which is less than previous timestamp 4
EXPECT_EQ(stream_buffer.Head(), "123456789ab");
EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(10),
5); // timestamp is adjusted to previous timestamp + 1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,19 @@ void LazyContiguousDataStreamBufferImpl::MergeContiguousEventsIntoHead() {
}

it = events_.begin();
// end_it stopped at the first non-contiguous event (at the end of current head)
while (it != end_it) {
size_t event_size = it->second.data.size();
memcpy(new_buffer->Data() + offset, it->second.data.data(), event_size);
// Ensure that the event timestamps are monotonically increasing for a given contiguous head
if (prev_timestamp_ > 0 && it->second.timestamp < prev_timestamp_) {
LOG(WARNING) << absl::Substitute(
"Detected non-monotonically increasing timestamp $0. Adjusting to previous timestamp + "
"1: $1",
it->second.timestamp, prev_timestamp_ + 1);
it->second.timestamp = prev_timestamp_ + 1;
}
prev_timestamp_ = it->second.timestamp;
head_pos_to_ts_.emplace(it->first, it->second.timestamp);
offset += event_size;
events_size_ -= event_size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class LazyContiguousDataStreamBufferImpl : public DataStreamBufferImpl {
size_t head_position_ = 0;
std::unique_ptr<FixedSizeContiguousBuffer> head_;
std::map<size_t, uint64_t> head_pos_to_ts_;
uint64_t prev_timestamp_ = 0;

std::map<size_t, Event> events_;
size_t events_size_ = 0;
Expand Down

0 comments on commit 4dce436

Please sign in to comment.