Skip to content

Commit

Permalink
refactor: change unknown network handling
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgiga1993 committed May 13, 2024
1 parent 2171cca commit a885699
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 29 deletions.
61 changes: 33 additions & 28 deletions pollect/sources/K8sNamespaceTrafficSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ def __init__(self, config):
self._namespace_label = config.get('namespaceLabel', 'namespace')
self._traffic_log_mode = config.get('trafficLog')

self._dest_networks: List[NamedNetworks] = []
self.known_networks: List[NamedNetworks] = []
for network in config.get('networks', []):
name = network['name']
self._dest_networks.append(NamedNetworks(name, network['cidrs']))
self.known_networks.append(NamedNetworks(name, network['cidrs']))

# Add catch-any as last item
self._dest_networks.append(NamedNetworks('other', ['0.0.0.0/0']))
self._metrics = NamespacesMetrics(self._dest_networks)
self.known_networks.append(NamedNetworks('other', ['0.0.0.0/0'], catch_all=True))
self._metrics = NamespacesMetrics(self.known_networks)

def setup_source(self, global_conf):
src_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'bpf', 'tcp.c')
Expand Down Expand Up @@ -61,16 +61,16 @@ def _probe(self) -> Optional[ValueSet] or List[ValueSet]:
for data, collected_bytes in ipv4_send_bytes.items_lookup_and_delete_batch():
meta = to_ipv4_key(data)
namespace_metrics = self._metrics.get_namespace_metrics(meta.localAddr)
self._log_traffic(namespace_metrics, meta, 'sent')
namespace_metrics.add_traffic(meta.remoteAddr, collected_bytes,
lambda m, data_count: m.add_transmitted(data_count))
dest_network = namespace_metrics.add_traffic(meta.remoteAddr, collected_bytes,
lambda m, data_count: m.add_transmitted(data_count))
self._log_traffic(namespace_metrics, meta, dest_network, 'sent')

for data, collected_bytes in ipv4_recv_bytes.items_lookup_and_delete_batch():
meta = to_ipv4_key(data)
namespace_metrics = self._metrics.get_namespace_metrics(meta.localAddr)
self._log_traffic(namespace_metrics, meta, 'received')
namespace_metrics.add_traffic(meta.remoteAddr, collected_bytes,
lambda m, data_count: m.add_received(data_count))
dest_network = namespace_metrics.add_traffic(meta.remoteAddr, collected_bytes,
lambda m, data_count: m.add_received(data_count))
self._log_traffic(namespace_metrics, meta, dest_network, 'received')

# Now export the data
values = ValueSet(labels=[self._namespace_label, 'dest_network', 'direction'])
Expand All @@ -84,17 +84,20 @@ def _probe(self) -> Optional[ValueSet] or List[ValueSet]:

return values

def _log_traffic(self, namespace_metrics: NamespaceNetworkMetric, meta: TCPSessionKey, direction: str):
def _log_traffic(self, namespace_metrics: NamespaceNetworkMetric, meta: TCPSessionKey, dest_network: NamedNetworks,
direction: str):
if self._traffic_log_mode is None:
return

if self._traffic_log_mode == 'unknown' and not namespace_metrics.is_catch_all():
return
if self._traffic_log_mode == 'unknown':
is_unknown_traffic = namespace_metrics.is_catch_all() or dest_network.catch_all
if not is_unknown_traffic:
return

self.log.info(f'Unknown traffic: local {ipaddress.IPv4Network(meta.localAddr)}, '
self.log.info(f'Traffic: local {ipaddress.IPv4Network(meta.localAddr)}, '
f'remote {ipaddress.IPv4Network(meta.remoteAddr)}, '
f'direction {direction}, '
f'from process {meta.name}')
f'from process {meta.name}, catch all {namespace_metrics.is_catch_all()}')


class TCPSessionKey(NamedTuple):
Expand Down Expand Up @@ -130,9 +133,9 @@ def swap32(x: int) -> int:


class NamespaceNetworkMetric:
def __init__(self, name: str, dest_networks: List[NamedNetworks], catch_all: bool = False):
def __init__(self, name: str, known_networks: List[NamedNetworks], catch_all: bool = False):
self.namespace: str = name
self._dest_networks: List[NamedNetworks] = dest_networks
self._known_networks: List[NamedNetworks] = known_networks

self.metrics: Dict[NamedNetworks, NetworkMetrics] = dict()
"""
Expand All @@ -141,21 +144,23 @@ def __init__(self, name: str, dest_networks: List[NamedNetworks], catch_all: boo

self._is_catch_all = catch_all

def add_traffic(self, remote_addr: int, data_bytes: int, assign: Callable[[NetworkMetrics, int], None]):
def add_traffic(self, remote_addr: int, data_bytes: int,
assign: Callable[[NetworkMetrics, int], None]) -> Optional[NamedNetworks]:
"""
Adds traffic metrics for the given remote address to this namespace
:param remote_addr: Remote ip address
:param data_bytes: Number of bytes to add
:param assign: Lambda for assigning the traffic to the correct metric field
:return:
:return: The network that matched the destination
"""
for dest_network in self._dest_networks:
for dest_network in self._known_networks:
if not dest_network.contains(remote_addr):
continue
if dest_network not in self.metrics:
self.metrics[dest_network] = NetworkMetrics()
assign(self.metrics[dest_network], data_bytes)
return
return dest_network
return None

def is_catch_all(self) -> bool:
"""
Expand All @@ -168,12 +173,12 @@ def is_catch_all(self) -> bool:
class NamespacesMetrics:
CATCH_ALL_NAME = 'unknown'

def __init__(self, dest_networks: List[NamedNetworks]):
def __init__(self, known_networks: List[NamedNetworks]):
self.metrics: Dict[str, NamespaceNetworkMetric] = {
self.CATCH_ALL_NAME: NamespaceNetworkMetric(self.CATCH_ALL_NAME, dest_networks, catch_all=True)
self.CATCH_ALL_NAME: NamespaceNetworkMetric(self.CATCH_ALL_NAME, known_networks, catch_all=True)
}

self._dest_networks: List[NamedNetworks] = dest_networks
self._known_networks: List[NamedNetworks] = known_networks

"""
Holds the send/received bytes grouped by namespace
Expand All @@ -185,12 +190,12 @@ def get_namespace_metrics(self, local_address: int) -> NamespaceNetworkMetric:
if network is None:
# The local networks is not known to k8s, maybe the source is one of the known networks?
# This happens for example for cross-node traffic
network = self._get_dest_network(local_address)
network = self._get_known_network(local_address)
if network is None: # No idea what this traffic is
return self.metrics[self.CATCH_ALL_NAME]

if network.name not in self.metrics:
self.metrics[network.name] = NamespaceNetworkMetric(network.name, self._dest_networks)
self.metrics[network.name] = NamespaceNetworkMetric(network.name, self._known_networks)
return self.metrics[network.name]

def update_networks(self):
Expand All @@ -209,8 +214,8 @@ def _get_container_network(self, local_address: int) -> Optional[NamedNetworks]:
return network
return None

def _get_dest_network(self, local_address: int) -> Optional[NamedNetworks]:
for network in self._dest_networks:
def _get_known_network(self, local_address: int) -> Optional[NamedNetworks]:
for network in self._known_networks:
if network.contains(local_address):
return network
return None
3 changes: 2 additions & 1 deletion pollect/sources/helper/NetworkStats.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ def contains(self, ip: int):

class NamedNetworks:

def __init__(self, name: str, subnets: List[str]):
def __init__(self, name: str, subnets: List[str], catch_all: bool = False):
self.name = name
self._subnets: List[Subnet] = []
for subnet in subnets:
self._subnets.append(Subnet(subnet))
self.catch_all = catch_all

def contains(self, ip: int):
for net in self._subnets:
Expand Down

0 comments on commit a885699

Please sign in to comment.