Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to ignore MongoDB handshaking frames #1776

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,16 @@ ParseState ProcessOpMsg(BinaryDecoder* decoder, Frame* frame) {
// The type of all request commands and the response to all find command requests
// will always be the first key.
auto op_msg_type = doc.MemberBegin()->name.GetString();
if ((op_msg_type == insert || op_msg_type == delete_ || op_msg_type == update ||
op_msg_type == find || op_msg_type == cursor)) {
if ((op_msg_type == kInsert || op_msg_type == kDelete || op_msg_type == kUpdate ||
op_msg_type == kFind || op_msg_type == kCursor)) {
frame->op_msg_type = op_msg_type;

} else if (op_msg_type == kHello || op_msg_type == kIsMaster ||
op_msg_type == kIsMasterAlternate) {
// The frame is a handshaking message.
frame->op_msg_type = op_msg_type;
frame->is_handshake = true;

} else {
// The frame is a response message, find the "ok" key and its value.
auto itr = doc.FindMember("ok");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ RecordsWithErrorCount<mongodb::Record> StitchFrames(
resp_frame.consumed = true;
FlattenSections(&req_frame);
FlattenSections(&resp_frame);

// Ignore stitching the request/response if either one is a handshaking frame.
if (req_frame.is_handshake || resp_frame.is_handshake) {
req_frame.consumed = true;
resp_frame.consumed = true;
break;
}

records.push_back({std::move(req_frame), std::move(resp_frame)});
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ using ::testing::SizeIs;
class MongoDBStitchFramesTest : public ::testing::Test {};

Frame CreateMongoDBFrame(uint64_t ts_ns, int32_t request_id, int32_t response_to, bool more_to_come,
std::string doc = "") {
std::string doc = "", bool is_handshake = false) {
mongodb::Frame frame;
frame.timestamp_ns = ts_ns;
frame.request_id = request_id;
frame.response_to = response_to;
frame.more_to_come = more_to_come;
frame.is_handshake = is_handshake;

mongodb::Section section;
section.documents.push_back(doc);
Expand Down Expand Up @@ -343,6 +344,39 @@ TEST_F(MongoDBStitchFramesTest, MissingTailFrameInNResponses) {
EXPECT_THAT(state.stream_order, SizeIs(0));
}

TEST_F(MongoDBStitchFramesTest, VerifyHandshakingMessages) {
absl::flat_hash_map<mongodb::stream_id_t, std::deque<mongodb::Frame>> reqs;
absl::flat_hash_map<mongodb::stream_id_t, std::deque<mongodb::Frame>> resps;

// Add requests to map.
reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false));
reqs[3].push_back(CreateMongoDBFrame(2, 3, 0, false));
reqs[5].push_back(CreateMongoDBFrame(4, 5, 0, false, "", true)); // Request handshake frame.
reqs[7].push_back(CreateMongoDBFrame(6, 7, 0, false));

// Add responses to map.
resps[1].push_back(CreateMongoDBFrame(1, 2, 1, false));
resps[3].push_back(CreateMongoDBFrame(3, 4, 3, false));
resps[5].push_back(CreateMongoDBFrame(5, 6, 5, false, "", true)); // Response handshake frame.
resps[7].push_back(CreateMongoDBFrame(7, 8, 7, false));

// Add the order in which the transactions's streamID's were found.
State state = {};
state.stream_order.push_back({1, false});
state.stream_order.push_back({3, false});
state.stream_order.push_back({5, false});
state.stream_order.push_back({7, false});

RecordsWithErrorCount<mongodb::Record> result = mongodb::StitchFrames(&reqs, &resps, &state);
EXPECT_EQ(result.error_count, 0);
// There should be 3 records in vector since the stitcher ignores handshaking frames but will
// still consume them successfully.
EXPECT_THAT(result.records, SizeIs(3));
EXPECT_TRUE(AreAllDequesEmpty(reqs));
EXPECT_TRUE(AreAllDequesEmpty(resps));
EXPECT_THAT(state.stream_order, SizeIs(0));
}

} // namespace mongodb
} // namespace protocols
} // namespace stirling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,17 @@ enum class SectionKind : uint8_t {
};

// Types of OP_MSG requests/responses
constexpr std::string_view insert = "insert";
constexpr std::string_view delete_ = "delete";
constexpr std::string_view update = "update";
constexpr std::string_view find = "find";
constexpr std::string_view cursor = "cursor";
constexpr std::string_view ok = "ok";
constexpr std::string_view kInsert = "insert";
constexpr std::string_view kDelete = "delete";
constexpr std::string_view kUpdate = "update";
constexpr std::string_view kFind = "find";
constexpr std::string_view kCursor = "cursor";
constexpr std::string_view kOk = "ok";

// Types of top level keys for handshaking messages
constexpr std::string_view kHello = "hello";
constexpr std::string_view kIsMaster = "isMaster";
constexpr std::string_view kIsMasterAlternate = "ismaster";

constexpr int32_t kMaxBSONObjSize = 16000000;

Expand Down Expand Up @@ -116,6 +121,9 @@ constexpr int32_t kMaxBSONObjSize = 16000000;
* https://github.com/mongodb/specifications/blob/e09b41df206f9efaa36ba4c332c47d04ddb7d6d1/source/message/OP_MSG.rst#command-arguments-as-payload
*
* There can be 0 or more documents in a section of kind 1 without a separator between them.
*
* Information about MongoDB handshaking messages can be found here:
* https://github.com/mongodb/specifications/blob/022fbf64fb36c80b9295ba93acec150c94362767/source/mongodb-handshake/handshake.rst
*/

struct Frame : public FrameBase {
Expand All @@ -135,6 +143,7 @@ struct Frame : public FrameBase {
std::string op_msg_type;
std::string frame_body;
uint32_t checksum = 0;
bool is_handshake = false;

bool consumed = false;
size_t ByteSize() const override { return sizeof(Frame); }
Expand Down
Loading