Skip to content

Commit

Permalink
Minor typo fixes and modifications
Browse files Browse the repository at this point in the history
Signed-off-by: Chinmay <[email protected]>
  • Loading branch information
ChinmayaSharma-hue committed Nov 29, 2023
1 parent 0bd1375 commit bf35978
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 73 deletions.
124 changes: 54 additions & 70 deletions src/stirling/source_connectors/socket_tracer/protocols/mqtt/parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
PX_ASSIGN_OR(expr, val_or, return ParseState::kNeedsMoreData)
#define PX_ASSIGN_OR_RETURN_INVALID(expr, val_or) \
PX_ASSIGN_OR(expr, val_or, return ParseState::kInvalid)

namespace px {
namespace stirling {
namespace protocols {
Expand Down Expand Up @@ -84,16 +85,21 @@ enum class PropertyCode : uint8_t {
SharedSubscriptionAvailable = 0x2A
};

constexpr int kMaxVarInt8 = 128;
constexpr int kMaxVarInt16 = 16384;
constexpr int kMaxVarInt24 = 2097152;
constexpr int kMaxVarInt32 = 268435456;

static inline StatusOr<size_t> VariableEncodingNumBytes(unsigned long integer) {
if (integer >= 268435456) {
if (integer >= kMaxVarInt32) {
return error::ResourceUnavailable("Maximum number of bytes exceeded for variable encoding.");
}

if (integer < 128) {
if (integer < kMaxVarInt8) {
return 1;
} else if (integer < 16384) {
} else if (integer < kMaxVarInt16) {
return 2;
} else if (integer < 2097152) {
} else if (integer < kMaxVarInt24) {
return 3;
}
return 4;
Expand Down Expand Up @@ -155,20 +161,19 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop
properties_length -= 2;
PX_ASSIGN_OR_RETURN_INVALID(std::string_view correlation_data,
decoder->ExtractString(property_length));
result->properties["correlation_data"] = std::string(correlation_data);
result->properties["correlation_data"] =
BytesToString<bytes_format::HexAsciiMix>(correlation_data);
;
properties_length -= property_length;
break;
}
case PropertyCode::SubscriptionIdentifier: {
unsigned long subscription_id;
size_t num_bytes;

PX_ASSIGN_OR_RETURN_INVALID(subscription_id, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(unsigned long subscription_id, decoder->ExtractUVarInt());
StatusOr<size_t> num_bytes_status = VariableEncodingNumBytes(subscription_id);
if (!num_bytes_status.ok()) {
return ParseState::kInvalid;
}
num_bytes = num_bytes_status.ValueOrDie();
size_t num_bytes = num_bytes_status.ValueOrDie();

result->properties["subscription_id"] = std::to_string(subscription_id);
properties_length -= num_bytes;
Expand Down Expand Up @@ -210,7 +215,7 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop
properties_length -= 2;
PX_ASSIGN_OR_RETURN_INVALID(std::string_view auth_data,
decoder->ExtractString(property_length));
result->properties["auth_data"] = std::string(auth_data);
result->properties["auth_data"] = BytesToString<bytes_format::HexAsciiMix>(auth_data);
properties_length -= property_length;
break;
}
Expand Down Expand Up @@ -284,14 +289,14 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop
break;
}
case PropertyCode::MaximumQos: {
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_alias, decoder->ExtractBEInt<uint16_t>());
result->properties["topic_alias"] = std::to_string(topic_alias);
properties_length -= 2;
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_alias, decoder->ExtractBEInt<uint8_t>());
result->properties["maximum_qos"] = std::to_string(topic_alias);
properties_length -= 1;
break;
}
case PropertyCode::RetainAvailable: {
PX_ASSIGN_OR_RETURN_INVALID(uint8_t retain_available, decoder->ExtractBEInt<uint8_t>());
result->properties["retain_available"] = std::to_string(retain_available);
result->properties["retain_available"] = (retain_available == 1) ? "true" : "false";
properties_length -= 1;
break;
}
Expand Down Expand Up @@ -324,7 +329,7 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop
case PropertyCode::WildcardSubscriptionAvailable: {
PX_ASSIGN_OR_RETURN_INVALID(uint8_t wildcard_subscription_available,
decoder->ExtractBEInt<uint8_t>());
result->properties["retain_available"] =
result->properties["wildcard_subscription_available"] =
(wildcard_subscription_available == 1) ? "true" : "false";
properties_length -= 1;
break;
Expand All @@ -340,7 +345,7 @@ ParseState ParseProperties(Message* result, BinaryDecoder* decoder, size_t& prop
case PropertyCode::SharedSubscriptionAvailable: {
PX_ASSIGN_OR_RETURN_INVALID(uint8_t shared_subscription_available,
decoder->ExtractBEInt<uint8_t>());
result->properties["subscription_id_available"] =
result->properties["shared_subscription_available"] =
(shared_subscription_available == 1) ? "true" : "false";
properties_length -= 1;
break;
Expand Down Expand Up @@ -374,9 +379,7 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder,
PX_ASSIGN_OR_RETURN_INVALID(result->header_fields["keep_alive"],
decoder->ExtractBEInt<uint16_t>());

size_t properties_length;

PX_ASSIGN_OR_RETURN_INVALID(properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t properties_length, decoder->ExtractUVarInt());
if (!VariableEncodingNumBytes(properties_length).ok()) {
return ParseState::kInvalid;
}
Expand All @@ -390,9 +393,7 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder,

result->header_fields["session_present"] = connack_flags;

size_t properties_length;

PX_ASSIGN_OR_RETURN_INVALID(properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t properties_length, decoder->ExtractUVarInt());
if (!VariableEncodingNumBytes(properties_length).ok()) {
return ParseState::kInvalid;
}
Expand All @@ -417,14 +418,13 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder,
decoder->ExtractBEInt<uint16_t>());
result->header_fields["variable_header_length"] += 2;
}
size_t properties_length, num_bytes;

PX_ASSIGN_OR_RETURN_INVALID(properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t properties_length, decoder->ExtractUVarInt());
StatusOr<size_t> num_bytes_status = VariableEncodingNumBytes(properties_length);
if (!num_bytes_status.ok()) {
return ParseState::kInvalid;
}
num_bytes = num_bytes_status.ValueOrDie();
size_t num_bytes = num_bytes_status.ValueOrDie();

result->header_fields["variable_header_length"] += (uint32_t)(num_bytes + properties_length);

Expand All @@ -445,8 +445,7 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder,
}

if (result->header_fields["remaining_length"] >= 4) {
size_t properties_length;
PX_ASSIGN_OR_RETURN_INVALID(properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t properties_length, decoder->ExtractUVarInt());
if (!VariableEncodingNumBytes(properties_length).ok()) {
return ParseState::kInvalid;
}
Expand All @@ -463,14 +462,12 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder,
decoder->ExtractBEInt<uint16_t>());
// Storing variable header length for use in payload length calculation
result->header_fields["variable_header_length"] = 2;
size_t properties_length, num_bytes;

PX_ASSIGN_OR_RETURN_INVALID(properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t properties_length, decoder->ExtractUVarInt());
StatusOr<size_t> num_bytes_status = VariableEncodingNumBytes(properties_length);
if (!num_bytes_status.ok()) {
return ParseState::kInvalid;
}
num_bytes = num_bytes_status.ValueOrDie();
size_t num_bytes = num_bytes_status.ValueOrDie();

result->header_fields["variable_header_length"] += num_bytes + properties_length;
return ParseProperties(result, decoder, properties_length);
Expand All @@ -480,9 +477,7 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder,
decoder->ExtractBEInt<uint8_t>());

if (result->header_fields["remaining_length"] > 1) {
size_t properties_length;

PX_ASSIGN_OR_RETURN_INVALID(properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t properties_length, decoder->ExtractUVarInt());
if (!VariableEncodingNumBytes(properties_length).ok()) {
return ParseState::kInvalid;
}
Expand All @@ -492,16 +487,14 @@ ParseState ParseVariableHeader(Message* result, BinaryDecoder* decoder,
return ParseState::kSuccess;
}
case MqttControlPacketType::AUTH: {
size_t properties_length;

if (result->header_fields["remaining_length"] == 0) {
result->header_fields["reason_code"] = 0x00;
return ParseState::kSuccess;
}
PX_ASSIGN_OR_RETURN_INVALID(result->header_fields["reason_code"],
decoder->ExtractBEInt<uint8_t>());

PX_ASSIGN_OR_RETURN_INVALID(properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t properties_length, decoder->ExtractUVarInt());
if (!VariableEncodingNumBytes(properties_length).ok()) {
return ParseState::kInvalid;
}
Expand All @@ -523,9 +516,7 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder,
result->payload["client_id"] = std::string(client_id);

if (result->header_fields["will_flag"]) {
size_t will_properties_length, will_topic_length, will_payload_length;

PX_ASSIGN_OR_RETURN_INVALID(will_properties_length, decoder->ExtractUVarInt());
PX_ASSIGN_OR_RETURN_INVALID(size_t will_properties_length, decoder->ExtractUVarInt());
if (!VariableEncodingNumBytes(will_properties_length).ok()) {
return ParseState::kInvalid;
}
Expand All @@ -534,12 +525,12 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder,
return ParseState::kInvalid;
}

PX_ASSIGN_OR_RETURN_INVALID(will_topic_length, decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(size_t will_topic_length, decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(std::string_view will_topic,
decoder->ExtractString(will_topic_length));
result->payload["will_topic"] = std::string(will_topic);

PX_ASSIGN_OR_RETURN_INVALID(will_payload_length, decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(size_t will_payload_length, decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(std::string_view will_payload,
decoder->ExtractString(will_payload_length));
result->payload["will_payload"] = std::string(will_payload);
Expand Down Expand Up @@ -578,52 +569,48 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder,
case MqttControlPacketType::PUBCOMP:
return ParseState::kSuccess;
case MqttControlPacketType::SUBSCRIBE: {
size_t payload_length;
uint16_t topic_filter_length;
uint8_t subscription_options;

if ((result->header_fields.find("remaining_length") == result->header_fields.end()) ||
(result->header_fields.find("variable_header_length") == result->header_fields.end())) {
return ParseState::kInvalid;
}

result->payload["topic_filter"] = "";
result->payload["subscription_options"] = "";
payload_length = result->header_fields["remaining_length"] -
result->header_fields["variable_header_length"];
size_t payload_length = result->header_fields["remaining_length"] -
result->header_fields["variable_header_length"];
while (payload_length > 0) {
PX_ASSIGN_OR_RETURN_INVALID(topic_filter_length, decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_filter_length,
decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(std::string_view topic_filter,
decoder->ExtractString(topic_filter_length));
if (result->payload["topic_filter"].empty()) {
result->payload["topic_filter"] += std::string(topic_filter);
} else {
result->payload["topic_filter"] += ", " + std::string(topic_filter);
}
PX_ASSIGN_OR_RETURN_INVALID(subscription_options, decoder->ExtractBEInt<uint8_t>());
result->payload["subscription_options"] +=
"{maximum_qos : " + std::to_string(subscription_options & 0x3) +
", no_local : " + std::to_string((subscription_options >> 2) & 0x1) +
", retain_as_published : " + std::to_string((subscription_options >> 3) & 0x1) +
", retain_handling : " + std::to_string((subscription_options >> 4) & 0x3) + "}";
PX_ASSIGN_OR_RETURN_INVALID(uint8_t subscription_options, decoder->ExtractBEInt<uint8_t>());
std::map<std::string, uint8_t> subscription_opts(
{{"maximum_qos", subscription_options & 0x3},
{"no_local", (subscription_options >> 2) & 0x1},
{"retain_as_published", (subscription_options >> 3) & 0x1},
{"retain_handling", (subscription_options >> 4) & 0x3}});
result->payload["subscription_options"] += ToJSONString(subscription_opts);
payload_length -= (3 + topic_filter_length);
}
return ParseState::kSuccess;
}
case MqttControlPacketType::UNSUBSCRIBE: {
size_t payload_length;
uint16_t topic_filter_length;

if ((result->header_fields.find("remaining_length") == result->header_fields.end()) ||
(result->header_fields.find("variable_header_length") == result->header_fields.end())) {
return ParseState::kInvalid;
}

result->payload["topic_filter"] = "";
payload_length = result->header_fields["remaining_length"] -
result->header_fields["variable_header_length"];
size_t payload_length = result->header_fields["remaining_length"] -
result->header_fields["variable_header_length"];
while (payload_length > 0) {
PX_ASSIGN_OR_RETURN_INVALID(topic_filter_length, decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_filter_length,
decoder->ExtractBEInt<uint16_t>());
PX_ASSIGN_OR_RETURN_INVALID(std::string_view topic_filter,
decoder->ExtractString(topic_filter_length));
if (result->payload["topic_filter"].empty()) {
Expand All @@ -637,19 +624,16 @@ ParseState ParsePayload(Message* result, BinaryDecoder* decoder,
}
case MqttControlPacketType::SUBACK:
case MqttControlPacketType::UNSUBACK: {
size_t payload_length;
uint8_t reason_code;

if ((result->header_fields.find("remaining_length") == result->header_fields.end()) ||
(result->header_fields.find("variable_header_length") == result->header_fields.end())) {
return ParseState::kInvalid;
}

result->payload["reason_code"] = "";
payload_length = result->header_fields["remaining_length"] -
result->header_fields["variable_header_length"];
size_t payload_length = result->header_fields["remaining_length"] -
result->header_fields["variable_header_length"];
while (payload_length > 0) {
PX_ASSIGN_OR_RETURN_INVALID(reason_code, decoder->ExtractBEInt<uint8_t>());
PX_ASSIGN_OR_RETURN_INVALID(uint8_t reason_code, decoder->ExtractBEInt<uint8_t>());
if (result->payload["reason_code"].empty()) {
result->payload["reason_code"] += std::to_string(reason_code);
} else {
Expand Down Expand Up @@ -711,8 +695,8 @@ ParseState ParseFrame(message_type_t type, std::string_view* buf, Message* resul
}

// Eliminating cases where kNeedsMoreData needs to be returned
// If buffer size is less tan 4, there are chances that the remaining length is not present in its
// entirety
// If buffer size is less than 4, there are chances that the remaining length is not present in
// its entirety
if (decoder.BufSize() < 4) {
// Checking if buffer is complete
PX_ASSIGN_OR_RETURN_NEEDS_MORE_DATA(remaining_length, decoder.ExtractUVarInt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,9 @@ TEST_F(MQTTParserTest, Payload) {
result_state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
ASSERT_EQ(result_state, ParseState::kSuccess);
EXPECT_EQ(frame.payload["topic_filter"], "test/topic");
EXPECT_EQ(frame.payload["subscription_options"],
"{maximum_qos : 0, no_local : 0, retain_as_published : 0, retain_handling : 0}");
std::map<std::string, uint8_t> subscription_opts(
{{"maximum_qos", 0}, {"no_local", 0}, {"retain_as_published", 0}, {"retain_handling", 0}});
EXPECT_EQ(frame.payload["subscription_options"], ToJSONString(subscription_opts));
frame = Message();

frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(kSubackFrame));
Expand Down Expand Up @@ -863,4 +864,3 @@ TEST_F(MQTTParserTest, Headers) {
} // namespace protocols
} // namespace stirling
} // namespace px

0 comments on commit bf35978

Please sign in to comment.