Skip to content

Commit

Permalink
Reworked RTT calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaderi committed Feb 9, 2025
1 parent cb59d90 commit 35f7594
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 80 deletions.
55 changes: 34 additions & 21 deletions include/Flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ typedef struct {
TCPSeqNum tcp_seq_s2d, tcp_seq_d2s;
u_int16_t cli2srv_window, srv2cli_window;
struct timeval synTime, synAckTime, ackTime; /* network Latency (3-way handshake) */
struct timeval clientRTT3WH, serverRTT3WH; /* Computed at 3WH */
float clientRTT3WH, serverRTT3WH; /* Computed at 3WH (msec) */

struct {
u_int32_t last_cli_ack, last_srv_ack;
Expand All @@ -49,6 +49,16 @@ typedef struct {
} rtt; /* Computed continuously */
} FlowTCP;

typedef struct {
struct timeval first_cli_to_srv, first_srv_to_cli, second_cli_to_srv; /* Time of the first packet in each direction */
float clientRTT3WH, serverRTT3WH; /* Computed at 3WH (msec) */
struct {
bool last_spin_set;
struct timeval last_ts;
struct ndpi_analyze_struct cli_min_rtt /* cli <-> ntopng RTT */, srv_min_rtt /* ntopng <-> dst RTT */;
} rtt;
} FlowUDP;

typedef struct {
u_int32_t prevAdjacentAS, nextAdjacentAS;
u_int32_t vrfId;
Expand Down Expand Up @@ -77,6 +87,7 @@ class Flow : public GenericHashEntry {
Host *cli_host, *srv_host; /* They are ALWAYS NULL on ViewInterfaces. For shared hosts see below viewFlowStats */
IpAddress *cli_ip_addr, *srv_ip_addr;
FlowTCP *tcp;
FlowUDP *udp;
FlowCollectionInfo *collection;

/* Data collected from nProbe */
Expand Down Expand Up @@ -481,22 +492,23 @@ class Flow : public GenericHashEntry {
return (ndpi_is_encrypted_proto(iface->get_ndpi_struct(),
ndpiDetectedProtocol));
}
inline bool isSSH() const { return (isProto(NDPI_PROTOCOL_SSH)); }
inline bool isMining() const { return (isProto(NDPI_PROTOCOL_MINING));}
inline bool isDNS() const { return (isProto(NDPI_PROTOCOL_DNS)); }
inline bool isSTUN() const { return (isProto(NDPI_PROTOCOL_STUN)); }
inline bool isSSH() const { return (isProto(NDPI_PROTOCOL_SSH)); }
inline bool isMining() const { return (isProto(NDPI_PROTOCOL_MINING)); }
inline bool isDNS() const { return (isProto(NDPI_PROTOCOL_DNS)); }
inline bool isSTUN() const { return (isProto(NDPI_PROTOCOL_STUN)); }
inline bool isQUIC() const { return (isProto(NDPI_PROTOCOL_QUIC)); }
inline bool isZoomRTP() const {
return (isProto(NDPI_PROTOCOL_ZOOM) && (isProto(NDPI_PROTOCOL_RTP) || isProto(NDPI_PROTOCOL_SRTP)) );
}
inline bool isIEC60870() const { return (isProto(NDPI_PROTOCOL_IEC60870)); }
inline bool isModbus() const { return (isProto(NDPI_PROTOCOL_MODBUS)); }
inline bool isMDNS() const { return (isProto(NDPI_PROTOCOL_MDNS)); }
inline bool isSSDP() const { return (isProto(NDPI_PROTOCOL_SSDP)); }
inline bool isNetBIOS() const { return (isProto(NDPI_PROTOCOL_NETBIOS)); }
inline bool isSIP() const { return (isProto(NDPI_PROTOCOL_SIP)); }
inline bool isDHCP() const { return (isProto(NDPI_PROTOCOL_DHCP)); }
inline bool isNTP() const { return (isProto(NDPI_PROTOCOL_NTP)); }
inline bool isSMTPorSMTPS() const { return (isSMTP() || isSMTPS()); }
inline bool isIEC60870() const { return (isProto(NDPI_PROTOCOL_IEC60870)); }
inline bool isModbus() const { return (isProto(NDPI_PROTOCOL_MODBUS)); }
inline bool isMDNS() const { return (isProto(NDPI_PROTOCOL_MDNS)); }
inline bool isSSDP() const { return (isProto(NDPI_PROTOCOL_SSDP)); }
inline bool isNetBIOS() const { return (isProto(NDPI_PROTOCOL_NETBIOS)); }
inline bool isSIP() const { return (isProto(NDPI_PROTOCOL_SIP)); }
inline bool isDHCP() const { return (isProto(NDPI_PROTOCOL_DHCP)); }
inline bool isNTP() const { return (isProto(NDPI_PROTOCOL_NTP)); }
inline bool isSMTPorSMTPS() const { return (isSMTP() || isSMTPS()); }
inline bool isSMTP() const { return (isProto(NDPI_PROTOCOL_MAIL_SMTP)); }
inline bool isSMTPS() const { return (isProto(NDPI_PROTOCOL_MAIL_SMTPS)); }
inline bool isHTTP() const { return (isProto(NDPI_PROTOCOL_HTTP)); }
Expand Down Expand Up @@ -634,7 +646,7 @@ class Flow : public GenericHashEntry {

void updateSeqNum(time_t when, u_int32_t sN, u_int32_t aN);
void setDetectedProtocol(ndpi_protocol proto_id);
void processPacket(const struct pcap_pkthdr *h, const u_char *ip_packet,
void processPacket(bool src2dst_direction, const struct pcap_pkthdr *h, const u_char *ip_packet,
u_int16_t ip_len, u_int64_t packet_time, u_int8_t *payload,
u_int16_t payload_len, u_int16_t src_port);
void processDNSPacket(const u_char *ip_packet, u_int16_t ip_len,
Expand All @@ -646,6 +658,9 @@ class Flow : public GenericHashEntry {
void processModbusPacket(bool is_query, const u_char *payload,
u_int16_t payload_len,
const struct pcap_pkthdr *h);
void updateQUICStats(bool src2dst_direction, const struct timeval *tv,
u_int8_t *payload, u_int16_t payload_len);
void updateUDPTimestamp(bool src2dst_direction, const struct timeval *tv);
#endif
void endProtocolDissection();
inline void setCustomApp(custom_app_t ca) {
Expand Down Expand Up @@ -1210,7 +1225,7 @@ inline float get_goodput_bytes_thpt() const { return (goodput_bytes_thpt); };
if(tcp == NULL)
return(0.0);
else
return client ? Utils::timeval2ms(&tcp->clientRTT3WH) : Utils::timeval2ms(&tcp->serverRTT3WH);
return client ? tcp->clientRTT3WH : tcp->serverRTT3WH;
};

inline void setFlowRTT(const struct timeval *const tv, bool client) {
Expand All @@ -1219,12 +1234,12 @@ inline float get_goodput_bytes_thpt() const { return (goodput_bytes_thpt); };
memcpy(&tcp->clientRTT3WH, tv, sizeof(*tv));

if (cli_host)
cli_host->updateNetworkRTT(Utils::timeval2ms(&tcp->clientRTT3WH));
cli_host->updateNetworkRTT(tcp->clientRTT3WH);
} else {
memcpy(&tcp->serverRTT3WH, tv, sizeof(*tv));

if (srv_host)
srv_host->updateNetworkRTT(Utils::timeval2ms(&tcp->serverRTT3WH));
srv_host->updateNetworkRTT(tcp->serverRTT3WH);
}
}
}
Expand All @@ -1238,9 +1253,7 @@ inline float get_goodput_bytes_thpt() const { return (goodput_bytes_thpt); };
}
inline void setRTT() {
if(tcp != NULL)
rttSec = ((float)(tcp->serverRTT3WH.tv_sec + tcp->clientRTT3WH.tv_sec)) +
((float)(tcp->serverRTT3WH.tv_usec + tcp->clientRTT3WH.tv_usec)) /
(float)1000000;
rttSec = (tcp->serverRTT3WH + tcp->clientRTT3WH)/1000.;
}
inline void setFlowApplLatency(float latency_msecs) {
applLatencyMsec = latency_msecs;
Expand Down
79 changes: 51 additions & 28 deletions src/Flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,23 @@ Flow::Flow(NetworkInterface *_iface,
srcAS = dstAS = 0, rttSec = 0;

if(_protocol == IPPROTO_TCP) {
tcp = (FlowTCP*)calloc(1, sizeof(FlowTCP));
tcp = (FlowTCP*)calloc(1, sizeof(FlowTCP)), udp = NULL;

if(tcp != NULL)
ndpi_init_data_analysis(&tcp->rtt.cli_to_srv, 4),
ndpi_init_data_analysis(&tcp->rtt.srv_to_cli, 4);
} else
ndpi_init_data_analysis(&tcp->rtt.srv_to_cli, 4);
} else {
if(protocol == IPPROTO_UDP) {
udp = (FlowUDP*)calloc(1, sizeof(FlowUDP));

if(udp != NULL)
ndpi_init_data_analysis(&udp->rtt.cli_min_rtt, 4),
ndpi_init_data_analysis(&udp->rtt.srv_min_rtt, 4);
} else
udp = NULL;

tcp = NULL;
}

collection = NULL;

Expand Down Expand Up @@ -506,6 +516,12 @@ Flow::~Flow() {
free(tcp);
}

if(udp != NULL) {
ndpi_free_data_analysis(&udp->rtt.cli_min_rtt, 0),
ndpi_free_data_analysis(&udp->rtt.srv_min_rtt, 0);
free(udp);
}

if(riskInfo) free(riskInfo);
if(end_reason) free(end_reason);

Expand Down Expand Up @@ -1039,7 +1055,8 @@ bool Flow::needsExtraDissection() {
/* *************************************** */

/* Process a packet and advance the flow detection state. */
void Flow::processPacket(const struct pcap_pkthdr *h, const u_char *ip_packet,
void Flow::processPacket(bool src2dst_direction,
const struct pcap_pkthdr *h, const u_char *ip_packet,
u_int16_t ip_len, u_int64_t packet_time,
u_int8_t *payload, u_int16_t payload_len,
u_int16_t src_port) {
Expand Down Expand Up @@ -2435,9 +2452,9 @@ void Flow::periodic_stats_update(const struct timeval *tv) {
/*
Do the stats update on the actual peers, i.e.,
peers possibly swapped due to the heuristic
*/
*/
get_actual_peers(&cli_h, &srv_h);

Mac *cli_mac = cli_h ? cli_h->getMac() : NULL;
Mac *srv_mac = srv_h ? srv_h->getMac() : NULL;

Expand All @@ -2446,7 +2463,7 @@ void Flow::periodic_stats_update(const struct timeval *tv) {
if(cli_h && srv_h) {
if(diff_sent_bytes || diff_rcvd_bytes) {
/* Update L2 Device stats */

if(srv_mac) {
//#ifdef HAVE_NEDGE
srv_mac->incSentStats(tv->tv_sec, diff_rcvd_packets, diff_rcvd_bytes);
Expand Down Expand Up @@ -3780,7 +3797,7 @@ void Flow::formatECSHost(json_object *my_object, bool is_client,
json_object_object_add(host_object,
Utils::jsonLabel(is_client ? CLIENT_NW_LATENCY_MS : SERVER_NW_LATENCY_MS,
"latency", jsonbuf, sizeof(jsonbuf)),
json_object_new_double(toMs(&tcp->clientRTT3WH)/2));
json_object_new_double(tcp->clientRTT3WH/2.));

json_object_object_add(my_object, is_client ? "client" : "server", host_object);
}
Expand Down Expand Up @@ -4080,11 +4097,11 @@ void Flow::formatGenericFlow(json_object *my_object) {
json_object_object_add(my_object,
Utils::jsonLabel(CLIENT_NW_LATENCY_MS, "CLIENT_NW_LATENCY_MS", jsonbuf,
sizeof(jsonbuf)),
json_object_new_double(toMs(&tcp->clientRTT3WH)/2));
json_object_new_double(tcp->clientRTT3WH/2.));
json_object_object_add(my_object,
Utils::jsonLabel(SERVER_NW_LATENCY_MS, "SERVER_NW_LATENCY_MS", jsonbuf,
sizeof(jsonbuf)),
json_object_new_double(toMs(&tcp->serverRTT3WH)/2));
json_object_new_double(tcp->serverRTT3WH/2.));
}

c = cli_host ? cli_host->get_country(buf, sizeof(buf)) : NULL;
Expand Down Expand Up @@ -5436,36 +5453,42 @@ void Flow::updateTcpFlags(const struct bpf_timeval *when, u_int8_t flags,
}
} else if(flags_3wh == (TH_SYN | TH_ACK)) {
if((tcp->synAckTime.tv_sec == 0) && (tcp->synTime.tv_sec > 0)) {
struct timeval t;

memcpy(&tcp->synAckTime, when, sizeof(struct timeval));
timeval_diff(&tcp->synTime, (struct timeval *)when, &tcp->serverRTT3WH, 0);
timeval_diff(&tcp->synTime, (struct timeval *)when, &t, 0);
tcp->serverRTT3WH = Utils::timeval2ms(&t);

/* Coherence check */
if(tcp->serverRTT3WH.tv_sec > 5)
memset(&tcp->serverRTT3WH, 0, sizeof(tcp->serverRTT3WH));
if(tcp->serverRTT3WH > 5000 /* 5 sec */ )
tcp->serverRTT3WH = 0;
else if(srv_host)
srv_host->updateNetworkRTT(Utils::timeval2ms(&tcp->serverRTT3WH));
srv_host->updateNetworkRTT(tcp->serverRTT3WH);
}
} else if((flags_3wh == TH_ACK) ||
(flags_3wh == (TH_ACK | TH_PUSH)) /* TCP Fast Open may contain data and PSH
in the final TWH ACK */) {
if((tcp->ackTime.tv_sec == 0) && (tcp->synAckTime.tv_sec > 0)) {
struct timeval t;

memcpy(&tcp->ackTime, when, sizeof(struct timeval));
timeval_diff(&tcp->synAckTime, (struct timeval *)when, &tcp->clientRTT3WH, 0);
timeval_diff(&tcp->synAckTime, (struct timeval *)when, &t, 0);

tcp->clientRTT3WH = Utils::timeval2ms(&t);
#ifdef DEBUG
ntop->getTrace()->traceEvent(TRACE_WARNING, "Client RTT: %.1f ms", toMs(&tcp->clientRTT3WH));
ntop->getTrace()->traceEvent(TRACE_WARNING, "Client RTT: %.1f ms", tcp->clientRTT3WH);
#endif

/* Coherence check */
if(tcp->clientRTT3WH.tv_sec > 5)
memset(&tcp->clientRTT3WH, 0, sizeof(tcp->clientRTT3WH));
if(tcp->clientRTT3WH > 5000 /* 5 sec */)
tcp->clientRTT3WH = 0;
else if(cli_host)
cli_host->updateNetworkRTT(Utils::timeval2ms(&tcp->clientRTT3WH));
cli_host->updateNetworkRTT(tcp->clientRTT3WH);

#ifdef DEBUG
ntop->getTrace()->traceEvent(TRACE_WARNING, "Server RTT: %.1f ms", toMs(&tcp->serverRTT3WH));
ntop->getTrace()->traceEvent(TRACE_WARNING, "Server RTT: %.1f ms", tcp->serverRTT3WH);
#endif

setRTT();
iface->getTcpFlowStats()->incEstablished();
}
Expand Down Expand Up @@ -6810,7 +6833,7 @@ void Flow::setPacketsBytes(time_t now, u_int32_t s2d_pkts, u_int32_t d2s_pkts,
s2d_pkts, d2s_pkts,
s2d_pkts_delta, d2s_pkts_delta,
get_bytes_cli2srv(), get_bytes_srv2cli(),
s2d_bytes, d2s_bytes,
s2d_bytes, d2s_bytes,
s2d_bytes_delta, d2s_bytes_delta);
}
#endif
Expand All @@ -6836,7 +6859,7 @@ void Flow::setPacketsBytes(time_t now, u_int32_t s2d_pkts, u_int32_t d2s_pkts,
the conntrack handler, and thus the flow is still alive.
*/
last_conntrack_update = now;

if(s2d_set) {
static_cast<NetfilterInterface *>(iface)->incStatsConntrack(isIngress2EgressDirection(),
now, eth_proto, getStatsProtocol(),
Expand Down Expand Up @@ -7778,10 +7801,10 @@ void Flow::lua_get_tcp_info(lua_State *vm) const {
? true
: false);

lua_push_float_table_entry(vm, "tcp.nw_latency.3wh_client_rtt", toMs(&tcp->clientRTT3WH));
lua_push_float_table_entry(vm, "tcp.nw_latency.3wh_server_rtt", toMs(&tcp->serverRTT3WH));

lua_push_float_table_entry(vm, "tcp.nw_latency.3wh_client_rtt", tcp->clientRTT3WH);
lua_push_float_table_entry(vm, "tcp.nw_latency.3wh_server_rtt", tcp->serverRTT3WH);

lua_push_float_table_entry(vm, "tcp.appl_latency", applLatencyMsec);
lua_push_float_table_entry(vm, "tcp.max_thpt.cli2srv", getCli2SrvMaxThpt());
lua_push_float_table_entry(vm, "tcp.max_thpt.srv2cli", getSrv2CliMaxThpt());
Expand Down
Loading

0 comments on commit 35f7594

Please sign in to comment.