Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate latency, receive uplink RSSI, calculate bandwidth congestion and packet rate #492

Merged
merged 20 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f6e6186
Echo timestamps to calculate latency
gemenerik Sep 25, 2024
ed096ac
Merge branch 'master' into rik/latency
gemenerik Sep 25, 2024
fda63c1
Refactor latency measurement into dedicated Latency class
gemenerik Sep 26, 2024
bb2e55c
Undo redundant radiodriver changes
gemenerik Sep 26, 2024
1b1f1b1
🐍 Python Formatting Fiasco: The Quest for Perfect Quotation!
gemenerik Sep 26, 2024
a6a9215
Enhance Latency class to allow restarting the ping thread
gemenerik Sep 26, 2024
a80e670
Add Caller to indicate updated latency + example how to use
gemenerik Sep 26, 2024
4136a69
" -> '
gemenerik Sep 30, 2024
da4fcbb
Receive uplink RSSI
gemenerik Oct 3, 2024
4435c7c
add congestion statistics
knmcguire Oct 3, 2024
73963e0
Move bandwidth congestion and packet rate into signal health class
gemenerik Oct 3, 2024
c0186ba
Only try to callback signal health when set
gemenerik Oct 4, 2024
f91dcf4
Merge branch 'master' into rik/uplink_rssi
gemenerik Oct 11, 2024
716071f
Merge branch 'master' into rik/latency
gemenerik Oct 22, 2024
8264b02
Merge branch 'master' into rik/uplink_rssi
gemenerik Oct 22, 2024
45c537d
Merge branch 'rik/uplink_rssi' into rik/link_quality
gemenerik Oct 22, 2024
81bfd4a
Refactor SignalHealth to RadioLinkStatistics for clarity
gemenerik Oct 22, 2024
31ff8e4
Rename latency update callback, reduce ping interval to 0.1 seconds
gemenerik Nov 12, 2024
fba6170
Move radio link statistics into (universal) link statistics object
gemenerik Dec 5, 2024
965f3d2
Enable deprecation warnings lib-wide
gemenerik Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cflib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
cf.close_link()
```
"""
import warnings

warnings.simplefilter('always', DeprecationWarning) # Enbable DeprecationWarnings

__pdoc__ = {}
__pdoc__['cflib.crtp.cflinkcppdriver'] = False
__pdoc__['cflib.cpx.transports'] = False
54 changes: 43 additions & 11 deletions cflib/crazyflie/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import datetime
import logging
import time
import warnings
from collections import namedtuple
from threading import current_thread
from threading import Lock
Expand All @@ -44,6 +45,7 @@
from .commander import Commander
from .console import Console
from .extpos import Extpos
from .link_statistics import LinkStatistics
from .localization import Localization
from .log import Log
from .mem import Memory
Expand Down Expand Up @@ -99,8 +101,6 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None):
self.packet_received = Caller()
# Called for every packet sent
self.packet_sent = Caller()
# Called when the link driver updates the link quality measurement
self.link_quality_updated = Caller()

self.state = State.DISCONNECTED

Expand All @@ -109,7 +109,7 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None):
rw_cache=rw_cache)

self.incoming = _IncomingPacketHandler(self)
self.incoming.setDaemon(True)
self.incoming.daemon = True
if self.link:
self.incoming.start()

Expand All @@ -123,6 +123,7 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None):
self.mem = Memory(self)
self.platform = PlatformService(self)
self.appchannel = Appchannel(self)
self.link_statistics = LinkStatistics(self)

self.link_uri = ''

Expand Down Expand Up @@ -155,6 +156,22 @@ def __init__(self, link=None, ro_cache=None, rw_cache=None):
self.fully_connected.add_callback(
lambda uri: logger.info('Callback->Connection completed [%s]', uri))

self.connected.add_callback(
lambda uri: self.link_statistics.start())
self.disconnected.add_callback(
lambda uri: self.link_statistics.stop())

@property
def link_quality_updated(self):
# Issue a deprecation warning when the deprecated attribute is accessed
warnings.warn(
'link_quality_updated is deprecated and will be removed soon. '
'Please use link_statistics.link_quality_updated directly and/or update your client.',
DeprecationWarning,
stacklevel=2 # To point to the caller's code
)
return self.link_statistics.link_quality_updated

def _disconnected(self, link_uri):
""" Callback when disconnected."""
self.connected_ts = None
Expand Down Expand Up @@ -208,10 +225,6 @@ def _link_error_cb(self, errmsg):
self.disconnected_link_error.call(self.link_uri, errmsg)
self.state = State.DISCONNECTED

def _link_quality_cb(self, percentage):
"""Called from link driver to report link quality"""
self.link_quality_updated.call(percentage)

def _check_for_initial_packet_cb(self, data):
"""
Called when first packet arrives from Crazyflie.
Expand All @@ -233,7 +246,7 @@ def open_link(self, link_uri):
self.link_uri = link_uri
try:
self.link = cflib.crtp.get_link_driver(
link_uri, self._link_quality_cb, self._link_error_cb)
link_uri, self.link_statistics.radio_link_statistics_callback, self._link_error_cb)

if not self.link:
message = 'No driver found or malformed URI: {}' \
Expand Down Expand Up @@ -288,6 +301,14 @@ def remove_port_callback(self, port, cb):
"""Remove the callback cb on port"""
self.incoming.remove_port_callback(port, cb)

def add_header_callback(self, cb, port, channel, port_mask=0xFF, channel_mask=0xFF):
"""Add a callback to cb on port and channel"""
self.incoming.add_header_callback(cb, port, channel, port_mask, channel_mask)

def remove_header_callback(self, cb, port, channel, port_mask=0xFF, channel_mask=0xFF):
"""Remove the callback cb on port and channel"""
self.incoming.remove_header_callback(cb, port, channel, port_mask, channel_mask)

def _no_answer_do_retry(self, pk, pattern):
"""Resend packets that we have not gotten answers to"""
logger.info('Resending for pattern %s', pattern)
Expand Down Expand Up @@ -384,9 +405,7 @@ def add_port_callback(self, port, cb):
def remove_port_callback(self, port, cb):
"""Remove a callback for data that comes on a specific port"""
logger.debug('Removing callback on port [%d] to [%s]', port, cb)
for port_callback in self.cb:
if port_callback.port == port and port_callback.callback == cb:
self.cb.remove(port_callback)
self.remove_header_callback(cb, port, 0, 0xff, 0x0)

def add_header_callback(self, cb, port, channel, port_mask=0xFF,
channel_mask=0xFF):
Expand All @@ -398,6 +417,19 @@ def add_header_callback(self, cb, port, channel, port_mask=0xFF,
self.cb.append(_CallbackContainer(port, port_mask,
channel, channel_mask, cb))

def remove_header_callback(self, cb, port, channel, port_mask=0xFF,
channel_mask=0xFF):
"""
Remove a callback for a specific port/header callback with the
possibility to add a mask for channel and port for multiple
hits for same callback.
"""
for port_callback in self.cb:
if port_callback.port == port and port_callback.port_mask == port_mask and \
port_callback.channel == channel and port_callback.channel_mask == channel_mask and \
port_callback.callback == cb:
self.cb.remove(port_callback)

def run(self):
while True:
if self.cf.link is None:
Expand Down
234 changes: 234 additions & 0 deletions cflib/crazyflie/link_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
#
# ,---------, ____ _ __
# | ,-^-, | / __ )(_) /_______________ _____ ___
# | ( O ) | / __ / / __/ ___/ ___/ __ `/_ / / _ \
# | / ,--' | / /_/ / / /_/ /__/ / / /_/ / / /_/ __/
# +------` /_____/_/\__/\___/_/ \__,_/ /___/\___/
#
# Copyright (C) 2024 Bitcraze AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, in version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This module provides tools for tracking statistics related to the communication
link between the Crazyflie and the lib. Currently, it focuses on tracking latency
but is designed to be extended with additional link statistics in the future.
"""
import struct
import time
from threading import Event
from threading import Thread

import numpy as np

from cflib.crtp.crtpstack import CRTPPacket
from cflib.crtp.crtpstack import CRTPPort
from cflib.utils.callbacks import Caller

__author__ = 'Bitcraze AB'
__all__ = ['LinkStatistics']

PING_HEADER = 0x0
ECHO_CHANNEL = 0


class LinkStatistics:
"""
LinkStatistics class manages the collection of various statistics related to the
communication link between the Crazyflie and the lib.

This class serves as a high-level manager, initializing and coordinating multiple
statistics trackers, such as Latency. It allows starting and stopping all
statistics trackers simultaneously. Future statistics can be added to extend
the class's functionality.

Attributes:
_cf (Crazyflie): A reference to the Crazyflie instance.
latency (Latency): An instance of the Latency class that tracks latency statistics.
"""

def __init__(self, crazyflie):
self._cf = crazyflie

# Flag to track if the statistics are active
self._is_active = False

# Universal statistics
self.latency = Latency(self._cf)

# Proxy for latency callback
self.latency_updated = self.latency.latency_updated

# Callers for radio link statistics
self.link_quality_updated = Caller()
self.uplink_rssi_updated = Caller()
self.uplink_rate_updated = Caller()
self.downlink_rate_updated = Caller()
self.uplink_congestion_updated = Caller()
self.downlink_congestion_updated = Caller()

def start(self):
"""
Start collecting all statistics.
"""
self._is_active = True
self.latency.start()

def stop(self):
"""
Stop collecting all statistics.
"""
self._is_active = False
self.latency.stop()

def radio_link_statistics_callback(self, radio_link_statistics):
"""
This callback is called by the RadioLinkStatistics class after it
processes the data provided by the radio driver.
"""
if not self._is_active:
return # Skip processing if link statistics are stopped

if 'link_quality' in radio_link_statistics:
self.link_quality_updated.call(radio_link_statistics['link_quality'])
if 'uplink_rssi' in radio_link_statistics:
self.uplink_rssi_updated.call(radio_link_statistics['uplink_rssi'])
if 'uplink_rate' in radio_link_statistics:
self.uplink_rate_updated.call(radio_link_statistics['uplink_rate'])
if 'downlink_rate' in radio_link_statistics:
self.downlink_rate_updated.call(radio_link_statistics['downlink_rate'])
if 'uplink_congestion' in radio_link_statistics:
self.uplink_congestion_updated.call(radio_link_statistics['uplink_congestion'])
if 'downlink_congestion' in radio_link_statistics:
self.downlink_congestion_updated.call(radio_link_statistics['downlink_congestion'])


class Latency:
"""
The Latency class measures and tracks the latency of the communication link
between the Crazyflie and the lib.

This class periodically sends ping requests to the Crazyflie and tracks
the round-trip time (latency). It calculates and stores the 95th percentile
latency over a rolling window of recent latency measurements.

Attributes:
_cf (Crazyflie): A reference to the Crazyflie instance.
latency (float): The current calculated 95th percentile latency in milliseconds.
_stop_event (Event): An event object to control the stopping of the ping thread.
_ping_thread_instance (Thread): Thread instance for sending ping requests at intervals.
"""

def __init__(self, crazyflie):
self._cf = crazyflie
self._cf.add_header_callback(self._ping_response, CRTPPort.LINKCTRL, 0)
self._stop_event = Event()
self._ping_thread_instance = None
self.latency = 0
self.latency_updated = Caller()

def start(self):
"""
Start the latency tracking process.

This method initiates a background thread that sends ping requests
at regular intervals to measure and track latency statistics.
"""
if self._ping_thread_instance is None or not self._ping_thread_instance.is_alive():
self._stop_event.clear()
self._ping_thread_instance = Thread(target=self._ping_thread)
self._ping_thread_instance.start()

def stop(self):
"""
Stop the latency tracking process.

This method stops the background thread and ceases sending further
ping requests, halting latency measurement.
"""
self._stop_event.set()
if self._ping_thread_instance is not None:
self._ping_thread_instance.join()
self._ping_thread_instance = None

def _ping_thread(self, interval: float = 0.1) -> None:
"""
Background thread method that sends a ping to the Crazyflie at regular intervals.

This method runs in a separate thread and continues to send ping requests
until the stop event is set.

Args:
interval (float): The time (in seconds) to wait between ping requests. Default is 0.1 seconds.
"""
while not self._stop_event.is_set():
self.ping()
time.sleep(interval)

def ping(self) -> None:
"""
Send a ping request to the Crazyflie to measure latency.

A ping packet is sent to the Crazyflie with the current timestamp and a
header identifier to differentiate it from other echo responses. The latency
is calculated upon receiving the response.
"""
ping_packet = CRTPPacket()
ping_packet.set_header(CRTPPort.LINKCTRL, ECHO_CHANNEL)

# Pack the current time as the ping timestamp
current_time = time.time()
ping_packet.data = struct.pack('<Bd', PING_HEADER, current_time)
self._cf.send_packet(ping_packet)

def _ping_response(self, packet):
"""
Callback method for processing the echo response received from the Crazyflie.

This method is called when a ping response is received. It checks the header
to verify that it matches the sent ping header before calculating the latency
based on the timestamp included in the ping request.

Args:
packet (CRTPPacket): The packet received from the Crazyflie containing
the echo response data.
"""
received_header, received_timestamp = struct.unpack('<Bd', packet.data)
if received_header != PING_HEADER:
return
self.latency = self._calculate_p95_latency(received_timestamp)
self.latency_updated.call(self.latency)

def _calculate_p95_latency(self, timestamp):
"""
Calculate the 95th percentile latency based on recent ping measurements.

This method records the round-trip time for a ping response and maintains
a rolling window of latency values to compute the 95th percentile.

Args:
timestamp (float): The timestamp from the sent ping packet to calculate
the round-trip time.

Returns:
float: The updated 95th percentile latency in milliseconds.
"""
if not hasattr(self, '_latencies'):
self._latencies = []

instantaneous_latency = (time.time() - timestamp) * 1000
self._latencies.append(instantaneous_latency)
if len(self._latencies) > 100:
self._latencies.pop(0)
p95_latency = np.percentile(self._latencies, 95)
return p95_latency
2 changes: 1 addition & 1 deletion cflib/crazyflie/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ class _ParamUpdater(Thread):
def __init__(self, cf, useV2, updated_callback):
"""Initialize the thread"""
Thread.__init__(self)
self.setDaemon(True)
self.daemon = True
self.wait_lock = Lock()
self.cf = cf
self._useV2 = useV2
Expand Down
Loading
Loading