diff --git a/src/stirling/binaries/stirling_wrapper.cc b/src/stirling/binaries/stirling_wrapper.cc index 8bb8ac8bb63..485b460e150 100644 --- a/src/stirling/binaries/stirling_wrapper.cc +++ b/src/stirling/binaries/stirling_wrapper.cc @@ -62,7 +62,7 @@ DEFINE_string(trace, "", "Dynamic trace to deploy. Either (1) the path to a file containing PxL or IR trace " "spec, or (2) : for full-function tracing."); DEFINE_string(print_record_batches, - "http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events", + "http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events,mqtt_events", "Comma-separated list of tables to print."); DEFINE_bool(init_only, false, "If true, only runs the init phase and exits. For testing."); DEFINE_int32(timeout_secs, -1, diff --git a/src/stirling/source_connectors/socket_tracer/bcc_bpf/BUILD.bazel b/src/stirling/source_connectors/socket_tracer/bcc_bpf/BUILD.bazel index 3111f24a27a..5d13f478652 100644 --- a/src/stirling/source_connectors/socket_tracer/bcc_bpf/BUILD.bazel +++ b/src/stirling/source_connectors/socket_tracer/bcc_bpf/BUILD.bazel @@ -82,7 +82,6 @@ pl_cc_test( "ENABLE_NATS_TRACING=true", "ENABLE_MONGO_TRACING=true", "ENABLE_AMQP_TRACING=true", - "ENABLE_MQTT_TRACING=true", ], deps = [ "//src/stirling/bpf_tools/bcc_bpf:headers", diff --git a/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/common.h b/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/common.h index 4eb0d859c7b..18018ea0a85 100644 --- a/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/common.h +++ b/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/common.h @@ -51,7 +51,6 @@ enum traffic_protocol_t { kProtocolKafka = 10, kProtocolMux = 11, kProtocolAMQP = 12, - kProtocolMQTT = 13, // We use magic enum to iterate through protocols in C++ land, // and don't want the C-enum-size trick to show up there. #ifndef __cplusplus diff --git a/src/stirling/source_connectors/socket_tracer/conn_tracker.cc b/src/stirling/source_connectors/socket_tracer/conn_tracker.cc index dc47efbe975..1898f74a562 100644 --- a/src/stirling/source_connectors/socket_tracer/conn_tracker.cc +++ b/src/stirling/source_connectors/socket_tracer/conn_tracker.cc @@ -661,7 +661,6 @@ auto CreateTraceRoles() { res.Set(kProtocolKafka, {kRoleServer}); res.Set(kProtocolMux, {kRoleServer}); res.Set(kProtocolAMQP, {kRoleServer}); - res.Set(kProtocolMQTT, {kRoleClient, kRoleServer}); DCHECK(res.AreAllKeysSet()); return res; diff --git a/src/stirling/source_connectors/socket_tracer/data_stream.cc b/src/stirling/source_connectors/socket_tracer/data_stream.cc index 1007bd00c8f..6383535b911 100644 --- a/src/stirling/source_connectors/socket_tracer/data_stream.cc +++ b/src/stirling/source_connectors/socket_tracer/data_stream.cc @@ -204,8 +204,6 @@ template void DataStream::ProcessBytesToFrames( message_type_t type, protocols::NoState* state); -template void DataStream::ProcessBytesToFrames( - message_type_t type, protocols::NoState* state); void DataStream::Reset() { data_buffer_.Reset(); has_new_events_ = false; diff --git a/src/stirling/source_connectors/socket_tracer/mqtt_table.h b/src/stirling/source_connectors/socket_tracer/mqtt_table.h deleted file mode 100644 index 946ade5cc77..00000000000 --- a/src/stirling/source_connectors/socket_tracer/mqtt_table.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2018- The Pixie Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include - -#include "src/stirling/core/output.h" -#include "src/stirling/core/types.h" -#include "src/stirling/source_connectors/socket_tracer/canonical_types.h" - -namespace px { -namespace stirling { - -// clang-format off -constexpr DataElement kMQTTElements[] = { - canonical_data_elements::kTime, - canonical_data_elements::kUPID, - canonical_data_elements::kRemoteAddr, - canonical_data_elements::kRemotePort, - canonical_data_elements::kTraceRole, - {"req_control_packet_type", "Type Of the request MQTT Control Packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - {"req_header_fields", "MQTT Header Fields in request packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - {"req_properties", "MQTT properties in request packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - {"req_payload", "Payload of request packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - {"resp_control_packet_type", "Type Of the response MQTT Control Packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - {"resp_header_fields", "MQTT Header Fields in response packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - {"resp_properties", "MQTT properties in response packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - {"resp_payload", "Payload of response packet", - types::DataType::STRING, - types::SemanticType::ST_NONE, - types::PatternType::GENERAL}, - canonical_data_elements::kLatencyNS, -#ifndef NDEBUG - canonical_data_elements::kPXInfo, -#endif -}; -// clang-format on - -constexpr auto kMQTTTable = - DataTableSchema("mqtt_events", "MQTT request-response pair events", kMQTTElements); -DEFINE_PRINT_TABLE(MQTT) - -constexpr int kMQTTTimeIdx = kMQTTTable.ColIndex("time_"); -constexpr int kMQTTUPIDIdx = kMQTTTable.ColIndex("upid"); -constexpr int kMQTTRemoteAddrIdx = kMQTTTable.ColIndex("remote_addr"); -constexpr int kMQTTRemotePortIdx = kMQTTTable.ColIndex("remote_port"); -constexpr int kMQTTTraceRoleIdx = kMQTTTable.ColIndex("trace_role"); -constexpr int kMQTTReqControlTypeIdx = kMQTTTable.ColIndex("req_control_packet_type"); -constexpr int kMQTTReqHeaderFieldsIdx = kMQTTTable.ColIndex("req_header_fields"); -constexpr int kMQTTReqPropertiesIdx = kMQTTTable.ColIndex("req_properties"); -constexpr int kMQTTReqPayloadIdx = kMQTTTable.ColIndex("req_payload"); -constexpr int kMQTTRespControlTypeIdx = kMQTTTable.ColIndex("resp_control_packet_type"); -constexpr int kMQTTRespHeaderFieldsIdx = kMQTTTable.ColIndex("resp_header_fields"); -constexpr int kMQTTRespPropertiesIdx = kMQTTTable.ColIndex("resp_properties"); -constexpr int kMQTTRespPayloadIdx = kMQTTTable.ColIndex("resp_payload"); - -} // namespace stirling -} // namespace px diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/BUILD.bazel b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/BUILD.bazel index e2e35fc19c5..366a2b8e035 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/BUILD.bazel +++ b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/BUILD.bazel @@ -35,8 +35,6 @@ pl_cc_library( ], ), deps = [ - "//src/common/json:cc_library", - "//src/common/zlib:cc_library", "//src/stirling/source_connectors/socket_tracer/protocols/common:cc_library", "//src/stirling/utils:cc_library", ], diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.cc b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.cc index 864a2adb9da..25a55829255 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.cc @@ -138,24 +138,19 @@ static inline MqttControlPacketType GetControlPacketType(uint8_t control_packet_ } } -static inline std::tuple VariableLengthEncodingDecoder(BinaryDecoder* decoder) { - unsigned long multiplier = 1; - unsigned long decoded_value = 0; - size_t num_bytes = 0; - uint8_t encoded_byte; - do { - // encoded byte cannot be 0 as there is minimum variable header size greater than 0 - encoded_byte = decoder->ExtractBEInt().ValueOrDie(); - decoded_value += (encoded_byte & 127) * multiplier; - // size of the remaining length cannot be above 4 bytes - if (multiplier > (size_t)128 * 128 * 128) { - return std::make_tuple(0, num_bytes); - } - multiplier *= 128; - num_bytes += 1; - } while ((encoded_byte & 128) != 0); +static inline StatusOr VariableEncodingNumBytes(unsigned long integer) { + if (integer >= 268435456) { + return error::ResourceUnavailable("Maximum number of bytes exceeded for variable encoding."); + } - return std::make_tuple(decoded_value, num_bytes); + if (integer < 128) { + return 1; + } else if (integer < 16384) { + return 2; + } else if (integer < 2097152) { + return 3; + } + return 4; } ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& properties_length) { @@ -203,7 +198,7 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop case static_cast(PropertyCode::CorrelationData): { PX_ASSIGN_OR_RETURN_ERROR(uint16_t property_length, decoder->ExtractBEInt()); properties_length -= 2; - PX_ASSIGN_OR_RETURN_ERROR(std::string_view correlation_data, decoder->ExtractString((size_t)property_length)); + PX_ASSIGN_OR_RETURN_ERROR(std::string_view correlation_data, decoder->ExtractString(property_length)); result->properties["correlation_data"] = std::string(correlation_data); properties_length -= property_length; break; @@ -211,7 +206,14 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop case static_cast(PropertyCode::SubscriptionIdentifier): { unsigned long subscription_id; size_t num_bytes; - std::tie(subscription_id, num_bytes) = VariableLengthEncodingDecoder(decoder); + + PX_ASSIGN_OR_RETURN_ERROR(subscription_id, decoder->ExtractUVarInt()); + StatusOr num_bytes_status = VariableEncodingNumBytes(subscription_id); + if (!num_bytes_status.ok()) { + return ParseState::kInvalid; + } + num_bytes = num_bytes_status.ValueOrDie(); + result->properties["subscription_id"] = std::to_string(subscription_id); properties_length -= num_bytes; break; @@ -247,7 +249,7 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop case static_cast(PropertyCode::AuthenticationData): { PX_ASSIGN_OR_RETURN_ERROR(uint16_t property_length, decoder->ExtractBEInt()); properties_length -= 2; - PX_ASSIGN_OR_RETURN_ERROR(std::string_view auth_data, decoder->ExtractString((size_t)property_length)); + PX_ASSIGN_OR_RETURN_ERROR(std::string_view auth_data, decoder->ExtractString(property_length)); result->properties["auth_data"] = std::string(auth_data); properties_length -= property_length; break; @@ -327,11 +329,11 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop case static_cast(PropertyCode::UserProperty): { PX_ASSIGN_OR_RETURN_ERROR(uint16_t key_length, decoder->ExtractBEInt()); properties_length -= 2; - PX_ASSIGN_OR_RETURN_ERROR(std::string_view key, decoder->ExtractString((size_t)key_length)); + PX_ASSIGN_OR_RETURN_ERROR(std::string_view key, decoder->ExtractString(key_length)); properties_length -= key_length; PX_ASSIGN_OR_RETURN_ERROR(uint16_t value_length, decoder->ExtractBEInt()); properties_length -= 2; - PX_ASSIGN_OR_RETURN_ERROR(std::string_view value, decoder->ExtractString((size_t)value_length)); + PX_ASSIGN_OR_RETURN_ERROR(std::string_view value, decoder->ExtractString(value_length)); properties_length -= value_length; // For multiple user properties present, append to string if user property already present if (result->properties.find("user-properties") == result->properties.end()) { @@ -375,8 +377,8 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder, MqttControlPacketType& control_packet_type) { switch (control_packet_type) { case MqttControlPacketType::CONNECT: { - PX_ASSIGN_OR_RETURN_ERROR(uint8_t protocol_name_length, decoder->ExtractBEInt()); - PX_ASSIGN_OR_RETURN_ERROR(std::string_view protocol_name,decoder->ExtractString((size_t)protocol_name_length)); + PX_ASSIGN_OR_RETURN_ERROR(uint16_t protocol_name_length, decoder->ExtractBEInt()); + PX_ASSIGN_OR_RETURN_ERROR(std::string_view protocol_name, decoder->ExtractString(protocol_name_length)); CTX_DCHECK(protocol_name == "MQTT"); PX_ASSIGN_OR_RETURN_ERROR(uint8_t protocol_version, decoder->ExtractBEInt()); CTX_DCHECK(protocol_version == 5); @@ -392,7 +394,12 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder, MqttCont PX_ASSIGN_OR_RETURN_ERROR(result->header_fields["keep_alive"], decoder->ExtractBEInt()); size_t properties_length; - std::tie(properties_length, std::ignore) = VariableLengthEncodingDecoder(decoder); + + PX_ASSIGN_OR_RETURN_ERROR(properties_length, decoder->ExtractUVarInt()); + if (!VariableEncodingNumBytes(properties_length).ok()) { + return ParseState::kInvalid; + } + return ParseProperties(result, decoder, properties_length); } case MqttControlPacketType::CONNACK: { @@ -402,7 +409,12 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder, MqttCont result->header_fields["session_present"] = connack_flags; size_t properties_length; - std::tie(properties_length, std::ignore) = VariableLengthEncodingDecoder(decoder); + + PX_ASSIGN_OR_RETURN_ERROR(properties_length, decoder->ExtractUVarInt()); + if (!VariableEncodingNumBytes(properties_length).ok()) { + return ParseState::kInvalid; + } + return ParseProperties(result, decoder, properties_length); } case MqttControlPacketType::PUBLISH: { @@ -422,7 +434,14 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder, MqttCont result->header_fields["variable_header_length"] += 2; } size_t properties_length, num_bytes; - std::tie(properties_length, num_bytes) = VariableLengthEncodingDecoder(decoder); + + PX_ASSIGN_OR_RETURN_ERROR(properties_length, decoder->ExtractUVarInt()); + StatusOr num_bytes_status = VariableEncodingNumBytes(properties_length); + if (!num_bytes_status.ok()) { + return ParseState::kInvalid; + } + num_bytes = num_bytes_status.ValueOrDie(); + result->header_fields["variable_header_length"] += (uint32_t)(num_bytes + properties_length); return ParseProperties(result, decoder, properties_length); @@ -441,7 +460,10 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder, MqttCont if (result->header_fields["remaining_length"] >= 4) { size_t properties_length; - std::tie(properties_length, std::ignore) = VariableLengthEncodingDecoder(decoder); + PX_ASSIGN_OR_RETURN_ERROR(properties_length, decoder->ExtractUVarInt()); + if (!VariableEncodingNumBytes(properties_length).ok()) { + return ParseState::kInvalid; + } return ParseProperties(result, decoder, properties_length); } @@ -455,7 +477,14 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder, MqttCont // Storing variable header length for use in payload length calculation result->header_fields["variable_header_length"] = 2; size_t properties_length, num_bytes; - std::tie(properties_length, num_bytes) = VariableLengthEncodingDecoder(decoder); + + PX_ASSIGN_OR_RETURN_ERROR(properties_length, decoder->ExtractUVarInt()); + StatusOr num_bytes_status = VariableEncodingNumBytes(properties_length); + if (!num_bytes_status.ok()) { + return ParseState::kInvalid; + } + num_bytes = num_bytes_status.ValueOrDie(); + result->header_fields["variable_header_length"] += num_bytes + properties_length; return ParseProperties(result, decoder, properties_length); } @@ -464,7 +493,12 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder, MqttCont if (result->header_fields["remaining_length"] > 1) { size_t properties_length; - std::tie(properties_length, std::ignore) = VariableLengthEncodingDecoder(decoder); + + PX_ASSIGN_OR_RETURN_ERROR(properties_length, decoder->ExtractUVarInt()); + if (!VariableEncodingNumBytes(properties_length).ok()) { + return ParseState::kInvalid; + } + return ParseProperties(result, decoder, properties_length); } return ParseState::kSuccess; @@ -484,7 +518,11 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder, MqttControlPack if (result->header_fields["will_flag"]) { size_t will_properties_length, will_topic_length, will_payload_length; - std::tie(will_properties_length, std::ignore) = VariableLengthEncodingDecoder(decoder); + PX_ASSIGN_OR_RETURN_ERROR(will_properties_length, decoder->ExtractUVarInt()); + if (!VariableEncodingNumBytes(will_properties_length).ok()) { + return ParseState::kInvalid; + } + if (ParseProperties(result, decoder, will_properties_length) == ParseState::kInvalid) { return ParseState::kInvalid; } @@ -506,8 +544,7 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder, MqttControlPack if (result->header_fields["password_flag"]) { PX_ASSIGN_OR_RETURN_ERROR(size_t password_length, decoder->ExtractBEInt()); - PX_ASSIGN_OR_RETURN_ERROR(std::string_view password, decoder->ExtractString(password_length)); - result->payload["password"] = std::string(password); + PX_ASSIGN_OR_RETURN_ERROR(std::ignore, decoder->ExtractString(password_length)); } return ParseState::kSuccess; @@ -519,7 +556,7 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder, MqttControlPack (result->header_fields.find("variable_header_length") == result->header_fields.end())) { return ParseState::kInvalid; } - size_t payload_length = (size_t)(result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]); + size_t payload_length = result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]; PX_ASSIGN_OR_RETURN_ERROR(std::string_view payload, decoder->ExtractString(payload_length)); result->payload["publish_message"] = std::string(payload); return ParseState::kSuccess; @@ -541,7 +578,7 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder, MqttControlPack result->payload["topic_filter"] = ""; result->payload["subscription_options"] = ""; - payload_length = (size_t)(result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]); + payload_length = result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]; while (payload_length > 0) { PX_ASSIGN_OR_RETURN_ERROR(topic_filter_length, decoder->ExtractBEInt()); PX_ASSIGN_OR_RETURN_ERROR(std::string_view topic_filter, decoder->ExtractString(topic_filter_length)); @@ -569,7 +606,7 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder, MqttControlPack } result->payload["topic_filter"] = ""; - payload_length = (size_t)(result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]); + payload_length = result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]; while (payload_length > 0) { PX_ASSIGN_OR_RETURN_ERROR(topic_filter_length, decoder->ExtractBEInt()); PX_ASSIGN_OR_RETURN_ERROR(std::string_view topic_filter, decoder->ExtractString(topic_filter_length)); @@ -593,7 +630,7 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder, MqttControlPack } result->payload["reason_code"] = ""; - payload_length = (size_t)(result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]); + payload_length = result->header_fields["remaining_length"] - result->header_fields["variable_header_length"]; while (payload_length > 0) { PX_ASSIGN_OR_RETURN_ERROR(reason_code, decoder->ExtractBEInt()); if (result->payload["reason_code"].empty()) { @@ -610,7 +647,7 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder, MqttControlPack case MqttControlPacketType::DISCONNECT: return ParseState::kSuccess; default: - return ParseState::kInvalid;; + return ParseState::kInvalid; } } @@ -634,25 +671,35 @@ ParseState ParseFrame(message_type_t type, std::string_view* buf, // Saving the flags if control packet type is PUBLISH if (control_packet_type == MqttControlPacketType::PUBLISH) { - result->header_fields["dup"] = (control_packet_flags >> 3) != 0; - result->header_fields["retain"] = (control_packet_flags & 0x1) != 0; + result->dup = (control_packet_flags >> 3) != 0; + result->retain = (control_packet_flags & 0x1) != 0; result->header_fields["qos"] = (control_packet_flags >> 1) & 0x3; } // Decoding the variable encoding of remaining length field - size_t remaining_length; - std::tie(remaining_length, std::ignore) = VariableLengthEncodingDecoder(&decoder); + PX_ASSIGN_OR_RETURN_ERROR(size_t remaining_length, decoder.ExtractUVarInt()); + if (!VariableEncodingNumBytes(remaining_length).ok()) { + return ParseState::kInvalid; + } + if (decoder.BufSize() < remaining_length) { + return ParseState::kNeedsMoreData; + } + + if (remaining_length < 0) { return ParseState::kInvalid; } result->header_fields["remaining_length"] = remaining_length; - if (ParseVariableHeader(result, &decoder, control_packet_type) == ParseState::kInvalid) { - return ParseState::kInvalid; + ParseState parse_variable_header_state = ParseVariableHeader(result, &decoder, control_packet_type); + ParseState parse_payload_state = ParsePayload(result, &decoder, control_packet_type); + + if ((parse_variable_header_state == ParseState::kInvalid) || (parse_variable_header_state == ParseState::kNeedsMoreData)) { + return parse_variable_header_state; } - if (ParsePayload(result, &decoder, control_packet_type) == ParseState::kInvalid) { - return ParseState::kInvalid; + if ((parse_payload_state == ParseState::kInvalid) || (parse_payload_state == ParseState::kNeedsMoreData)) { + return parse_payload_state; } *buf = decoder.Buf(); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse_test.cc index 92682e95c3b..158256f54e4 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse_test.cc @@ -592,7 +592,6 @@ TEST_F(MQTTParserTest, Payload) { EXPECT_EQ(frame.payload["will_topic"], "will-topic"); EXPECT_EQ(frame.payload["will_payload"], "goodbye"); EXPECT_EQ(frame.payload["username"], "dummyuser"); - EXPECT_EQ(frame.payload["password"], "dummypass"); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kPublishFrame)); @@ -791,13 +790,13 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "CONNECT"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 16); - EXPECT_EQ(frame.header_fields["username_flag"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["password_flag"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["will_retain"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["will_qos"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["will_flag"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["clean_start"], (unsigned long) 1); - EXPECT_EQ(frame.header_fields["keep_alive"], (unsigned long) 60); + EXPECT_EQ(frame.header_fields["username_flag"], 0); + EXPECT_EQ(frame.header_fields["password_flag"], 0); + EXPECT_EQ(frame.header_fields["will_retain"], 0); + EXPECT_EQ(frame.header_fields["will_qos"], 0); + EXPECT_EQ(frame.header_fields["will_flag"], 0); + EXPECT_EQ(frame.header_fields["clean_start"], 1); + EXPECT_EQ(frame.header_fields["keep_alive"], 60); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kConnackFrame)); @@ -805,8 +804,8 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "CONNACK"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 53); - EXPECT_EQ(frame.header_fields["session_present"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["reason_code"], (unsigned long) 0); + EXPECT_EQ(frame.header_fields["session_present"], 0); + EXPECT_EQ(frame.header_fields["reason_code"], 0); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kPublishFrame)); @@ -814,9 +813,9 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "PUBLISH"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 26); - EXPECT_EQ(frame.header_fields["dup"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["retain"], (unsigned long) 0); - EXPECT_EQ(frame.header_fields["qos"], (unsigned long) 1); + EXPECT_EQ(frame.dup, false); + EXPECT_EQ(frame.retain, false); + EXPECT_EQ(frame.header_fields["qos"], 1); EXPECT_EQ(frame.payload["topic_name"], "test/topic"); frame = Message(); @@ -825,7 +824,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "PUBACK"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 3); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 1); + EXPECT_EQ(frame.header_fields["packet_identifier"], 1); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kPubrecFrame)); @@ -833,7 +832,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "PUBREC"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 2); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 1); + EXPECT_EQ(frame.header_fields["packet_identifier"], 1); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kPubrelFrame)); @@ -841,7 +840,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "PUBREL"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 2); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 1); + EXPECT_EQ(frame.header_fields["packet_identifier"], 1); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kPubcompFrame)); @@ -849,7 +848,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "PUBCOMP"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 2); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 1); + EXPECT_EQ(frame.header_fields["packet_identifier"], 1); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kSubscribeFrame)); @@ -857,7 +856,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "SUBSCRIBE"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 16); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 1); + EXPECT_EQ(frame.header_fields["packet_identifier"], 1); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kSubackFrame)); @@ -866,7 +865,7 @@ TEST_F(MQTTParserTest, Headers) { EXPECT_EQ(frame.control_packet_type, "SUBACK"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 4); EXPECT_EQ(frame.payload["reason_code"], "0"); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 1); + EXPECT_EQ(frame.header_fields["packet_identifier"], 1); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kUnsubscribeFrame)); @@ -874,7 +873,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "UNSUBSCRIBE"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 15); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 2); + EXPECT_EQ(frame.header_fields["packet_identifier"], 2); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kUnsubackFrame)); @@ -882,7 +881,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "UNSUBACK"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 4); - EXPECT_EQ(frame.header_fields["packet_identifier"], (unsigned long) 2); + EXPECT_EQ(frame.header_fields["packet_identifier"], 2); frame = Message(); frame_view = CreateStringView(CharArrayStringView(kPingreqFrame)); @@ -904,7 +903,7 @@ TEST_F(MQTTParserTest, Headers) { ASSERT_EQ(result_state, ParseState::kSuccess); EXPECT_EQ(frame.control_packet_type, "DISCONNECT"); EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 1); - EXPECT_EQ(frame.header_fields["reason_code"], (unsigned long) 4); + EXPECT_EQ(frame.header_fields["reason_code"], 4); frame = Message(); } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/stitcher.h b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/stitcher.h deleted file mode 100644 index cc886732599..00000000000 --- a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/stitcher.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2018- The Pixie Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include -#include -#include -#include - -#include "src/stirling/source_connectors/socket_tracer/protocols/common/interface.h" -#include "src/stirling/source_connectors/socket_tracer/protocols/common/timestamp_stitcher.h" -#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h" - - -namespace px { -namespace stirling { -namespace protocols { - -template <> -inline RecordsWithErrorCount StitchFrames(std::deque* req_messages, - std::deque* resp_messages, - NoState* /*state*/) { - return StitchMessagesWithTimestampOrder(req_messages, resp_messages); -} - -} // namespace protocols -} // namespace stirling -} // namespace px diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h index 4a55c90f399..97c12adff4d 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h @@ -26,12 +26,17 @@ namespace stirling { namespace protocols { namespace mqtt { +// The protocol specification : https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.pdf +// This supports MQTTv5 + struct Message: public FrameBase { message_type_t type = message_type_t::kUnknown; std::string control_packet_type = "UNKNOWN"; - //TODO: Find the best way to store the payload + bool dup; + bool retain; + std::map header_fields; std::map properties, payload; @@ -78,9 +83,9 @@ struct Message: public FrameBase { payload_str += "}"; return absl::Substitute( - "Message: {type: $0, control_packet_type: $1, header_fields: $2, " - "payload: $3, properties: $4}", - magic_enum::enum_name(type), control_packet_type, + "Message: {type: $0, control_packet_type: $1, dup: $2, retain: $3, header_fields: $4, " + "payload: $5, properties: $6}", + magic_enum::enum_name(type), control_packet_type, dup, retain, header_fields_str, payload_str, properties_str); } }; @@ -102,16 +107,6 @@ struct Record{ } }; -struct State { - bool conn_closed = false; -}; - -struct StateWrapper { - State global; - std::monostate send; - std::monostate recv; -}; - struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Message; using record_type = Record; diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_tables.h b/src/stirling/source_connectors/socket_tracer/socket_trace_tables.h index 33453efad9a..d0e73317595 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_tables.h +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_tables.h @@ -31,5 +31,3 @@ #include "src/stirling/source_connectors/socket_tracer/nats_table.h" #include "src/stirling/source_connectors/socket_tracer/pgsql_table.h" #include "src/stirling/source_connectors/socket_tracer/redis_table.h" -#include "src/stirling/source_connectors/socket_tracer/mqtt_table.h" -