Skip to content

Commit

Permalink
Non Parser related changes removal and minor code fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Chinmay <[email protected]>
  • Loading branch information
ChinmayaSharma-hue committed Nov 9, 2023
1 parent 8b7a8dc commit ed808de
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 256 deletions.
2 changes: 1 addition & 1 deletion src/stirling/binaries/stirling_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ DEFINE_string(trace, "",
"Dynamic trace to deploy. Either (1) the path to a file containing PxL or IR trace "
"spec, or (2) <path to object file>:<symbol_name> for full-function tracing.");
DEFINE_string(print_record_batches,
"http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events",
"http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events,mqtt_events",
"Comma-separated list of tables to print.");
DEFINE_bool(init_only, false, "If true, only runs the init phase and exits. For testing.");
DEFINE_int32(timeout_secs, -1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,124 +683,6 @@ static __inline enum message_type_t infer_nats_message(const char* buf, size_t c
return kUnknown;
}

static __inline enum message_type_t infer_mqtt_message(const char* buf, size_t count) {
static const uint8_t kConnect = 0x10;
static const uint8_t kConnack = 0x20;
static const uint8_t kPublish = 0x30;
static const uint8_t kPuback = 0x40;
static const uint8_t kPubrec = 0x50;
static const uint8_t kPubrel = 0x62;
static const uint8_t kPubcomp = 0x70;
static const uint8_t kSubscribe = 0x82;
static const uint8_t kSuback = 0x90;
static const uint8_t kUnsubscribe = 0xa2;
static const uint8_t kUnsuback = 0xb0;
static const uint8_t kPingreq = 0xc0;
static const uint8_t kPingresp = 0xd0;
static const uint8_t kDisconnect = 0xe0;

static const uint8_t kPublishQos1 = 0x32;
static const uint8_t kPublishQos2 =0x34;
static const uint8_t kDupMask = 0x08;
static const uint8_t kRetainMask = 0x01;

static const uint8_t kMinFixedHeaderLength = 2;

const uint8_t* ubuf = (const uint8_t*)buf;

// Minimum Size of Fixed Header in MQTT is 2
if (count < kMinFixedHeaderLength) {
return kUnknown;
}

// Remaining length can be ranging from 2 to 4 bytes, decoding the variable length remaining length field
int byte_counter = 1;
int multiplier = 1;
int decoded_remaining_length = 0;
uint8_t encoded_byte;

do {
// message size cannot be less than what the remaining length needs based on variable encoding
if (count <= (size_t)byte_counter) {
return kUnknown;
}

encoded_byte = ubuf[byte_counter];
decoded_remaining_length += (encoded_byte & 127) * multiplier;
// size of the remaining length cannot be above 4 bytes
if (multiplier > 128*128*128) {
return kUnknown;
}
multiplier *= 128;
byte_counter += 1;
} while ((encoded_byte & 128) != 0);

int fixed_header_length = byte_counter;
size_t actual_remaining_length = count - (size_t)fixed_header_length;

if (actual_remaining_length != decoded_remaining_length) {
return kUnknown;
}

switch (ubuf[0])
{
case kConnect:
return kRequest;
case kConnack:
return kRequest;
case kPublish:
return kRequest;
case kPublishQos1:
return kRequest;
case kPublishQos2:
return kRequest;
case kPublish | kDupMask:
return kRequest;
case kPublishQos1 | kDupMask:
return kRequest;
case kPublishQos2 | kDupMask:
return kRequest;
case kPublish | kRetainMask:
return kRequest;
case kPublishQos1 | kRetainMask:
return kRequest;
case kPublishQos2 | kRetainMask:
return kRequest;
case kPublish | kDupMask | kRetainMask:
return kRequest;
case kPublishQos1 | kDupMask | kRetainMask:
return kRequest;
case kPublishQos2 | kDupMask | kRetainMask:
return kRequest;
case kPuback:
return kRequest;
case kPubrec:
return kRequest;
case kPubrel:
return kRequest;
case kPubcomp:
return kRequest;
case kSubscribe:
return kRequest;
case kSuback:
return kRequest;
case kUnsubscribe:
return kRequest;
case kUnsuback:
return kRequest;
case kPingreq:
return kRequest;
case kPingresp:
return kRequest;
case kDisconnect:
return kRequest;
default:
return kUnknown;
}

return kUnknown;
}

static __inline struct protocol_message_t infer_protocol(const char* buf, size_t count,
struct conn_info_t* conn_info) {
struct protocol_message_t inferred_message;
Expand Down Expand Up @@ -850,12 +732,8 @@ static __inline struct protocol_message_t infer_protocol(const char* buf, size_t
} else if (ENABLE_NATS_TRACING &&
(inferred_message.type = infer_nats_message(buf, count)) != kUnknown) {
inferred_message.protocol = kProtocolNATS;
} else if (ENABLE_MQTT_TRACING &&
(inferred_message.type = infer_mqtt_message(buf, count)) != kUnknown) {
inferred_message.protocol = kProtocolMQTT;
}


conn_info->prev_count = count;
if (count == 4) {
conn_info->prev_buf[0] = buf[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ pl_cc_library(
"//src/stirling/source_connectors/socket_tracer/protocols/nats:cc_library",
"//src/stirling/source_connectors/socket_tracer/protocols/pgsql:cc_library",
"//src/stirling/source_connectors/socket_tracer/protocols/redis:cc_library",
"//src/stirling/source_connectors/socket_tracer/protocols/mqtt:cc_library",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pl_cc_library(
],
),
deps = [
"//src/common/json:cc_library",
"//src/stirling/source_connectors/socket_tracer/protocols/common:cc_library",
"//src/stirling/utils:cc_library",
],
Expand Down
Loading

0 comments on commit ed808de

Please sign in to comment.