diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.cc b/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.cc index 6ed2801dedf..afabb75cbe4 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.cc @@ -263,14 +263,42 @@ std::map::const_iterator AlwaysContiguousDataStreamBufferImpl::G return iter; } -std::string_view AlwaysContiguousDataStreamBufferImpl::Get(size_t pos) const { +void AlwaysContiguousDataStreamBufferImpl::EnforceTimestampMonotonicity(size_t pos, + size_t chunk_end) { + // Get timestamp for chunk which is <= pos. + auto it = timestamps_.upper_bound(pos); + if (it == timestamps_.begin()) { + return; + } + --it; + + // Loop from chunk_start up to but not including chunk_end. + // The next element after chunk_end should not be part of the contiguous block. + prev_timestamp_ = 0; + for (; it != timestamps_.end() && it->first < chunk_end; ++it) { + if (prev_timestamp_ > 0 && it->second < prev_timestamp_) { + LOG(WARNING) << absl::Substitute( + "For chunk pos $0, detected non-monotonically increasing timestamp $1. Adjusting to " + "previous timestamp + 1: $2", + it->first, it->second, prev_timestamp_ + 1); + it->second = prev_timestamp_ + 1; + } + prev_timestamp_ = it->second; + } +} + +std::string_view AlwaysContiguousDataStreamBufferImpl::Get(size_t pos) { auto iter = GetChunkForPos(pos); if (iter == chunks_.cend()) { return {}; } - size_t chunk_pos = iter->first; - size_t chunk_size = iter->second; + size_t chunk_pos = iter->first; // start of contiguous head + size_t chunk_size = iter->second; // size of contiguous (already merged in Add) + + // since we only call Get() in Head() and the event parser fully processes a contiguous head, + // we need only enforce timestamp monotonicity once per head. + EnforceTimestampMonotonicity(chunk_pos, chunk_pos + chunk_size); ssize_t bytes_available = chunk_size - (pos - chunk_pos); DCHECK_GT(bytes_available, 0); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.h b/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.h index 8cc48b64728..a74022a4893 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/always_contiguous_data_stream_buffer_impl.h @@ -78,7 +78,10 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl { size_t EndPosition(); // Get a string_view for the chunk at pos. - std::string_view Get(size_t pos) const; + std::string_view Get(size_t pos); + + // Ensure that timestamps are monotonically increasing for a given chunk. + void EnforceTimestampMonotonicity(size_t chunk_start, size_t chunk_end); const size_t capacity_; const size_t max_gap_size_; @@ -101,6 +104,7 @@ class AlwaysContiguousDataStreamBufferImpl : public DataStreamBufferImpl { // Unlike chunks_, which will fuse when adjacent, timestamps never fuse. // Also, we don't track gaps in the buffer with timestamps; must use chunks_ for that. std::map timestamps_; + size_t prev_timestamp_ = 0; }; } // namespace protocols diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer.h b/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer.h index 70d288d5954..17033e7d375 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer.h @@ -96,20 +96,7 @@ class DataStreamBuffer { * @param pos The logical position of the data. * @return The timestamp or error if the position does not contain valid data. */ - StatusOr GetTimestamp(size_t pos) { - StatusOr timestamp_ns_status = impl_->GetTimestamp(pos); - if (!timestamp_ns_status.ok()) { - return timestamp_ns_status; - } - uint64_t current_timestamp_ns = timestamp_ns_status.ConsumeValueOrDie(); - if (current_timestamp_ns < prev_timestamp_ns_) { - LOG(WARNING) << "Detected non-monotonically increasing timestamp " << current_timestamp_ns - << ". Adjusting to previous timestamp + 1: " << prev_timestamp_ns_ + 1; - current_timestamp_ns = prev_timestamp_ns_ + 1; - } - prev_timestamp_ns_ = current_timestamp_ns; - return current_timestamp_ns; - } + StatusOr GetTimestamp(size_t pos) { return impl_->GetTimestamp(pos); } /** * Remove n bytes from the head of the buffer. @@ -162,7 +149,6 @@ class DataStreamBuffer { private: std::unique_ptr impl_; - uint64_t prev_timestamp_ns_ = 0; }; } // namespace protocols diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer_test.cc index 3cfbc0b9aa3..84cc2f34d32 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/data_stream_buffer_test.cc @@ -147,10 +147,15 @@ TEST_P(DataStreamBufferTest, Timestamp) { EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(7), 4); EXPECT_NOT_OK(stream_buffer.GetTimestamp(8)); - // Test automatic adjustment of non-monotonic timestamp - stream_buffer.Add(8, "89", 3); // timestamp is 3, which is less than previous timestamp 4 + // Same timestamp as previous timestamp + stream_buffer.Add(8, "89", 4); EXPECT_EQ(stream_buffer.Head(), "123456789"); - EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(8), + EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(8), 4); + + // Test automatic adjustment of non-monotonic timestamp + stream_buffer.Add(10, "ab", 3); // timestamp is 3, which is less than previous timestamp 4 + EXPECT_EQ(stream_buffer.Head(), "123456789ab"); + EXPECT_OK_AND_EQ(stream_buffer.GetTimestamp(10), 5); // timestamp is adjusted to previous timestamp + 1 } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.cc b/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.cc index 3d99b1964f3..6f11c48e255 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.cc @@ -151,9 +151,19 @@ void LazyContiguousDataStreamBufferImpl::MergeContiguousEventsIntoHead() { } it = events_.begin(); + // end_it stopped at the first non-contiguous event (at the end of current head) while (it != end_it) { size_t event_size = it->second.data.size(); memcpy(new_buffer->Data() + offset, it->second.data.data(), event_size); + // Ensure that the event timestamps are monotonically increasing for a given contiguous head + if (prev_timestamp_ > 0 && it->second.timestamp < prev_timestamp_) { + LOG(WARNING) << absl::Substitute( + "Detected non-monotonically increasing timestamp $0. Adjusting to previous timestamp + " + "1: $1", + it->second.timestamp, prev_timestamp_ + 1); + it->second.timestamp = prev_timestamp_ + 1; + } + prev_timestamp_ = it->second.timestamp; head_pos_to_ts_.emplace(it->first, it->second.timestamp); offset += event_size; events_size_ -= event_size; diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.h b/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.h index 219eba9d858..e1d830468fb 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/lazy_contiguous_data_stream_buffer_impl.h @@ -128,6 +128,7 @@ class LazyContiguousDataStreamBufferImpl : public DataStreamBufferImpl { size_t head_position_ = 0; std::unique_ptr head_; std::map head_pos_to_ts_; + uint64_t prev_timestamp_ = 0; std::map events_; size_t events_size_ = 0;