Skip to content

Commit

Permalink
Merge pull request #43 from Kapim/ros2
Browse files Browse the repository at this point in the history
Ros2
  • Loading branch information
ZdenekM authored Mar 5, 2024
2 parents cad6f1a + 04f4e38 commit 9359679
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 11 deletions.
2 changes: 1 addition & 1 deletion era_5g_relay_network_application/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ python_distribution(
sdist=True,
provides=setup_py(
name="era_5g_relay_network_application",
version="1.2.0",
version="1.3.0",
description="Relay Network Application",
author="Michal Kapinus",
author_email="[email protected]",
Expand Down
28 changes: 23 additions & 5 deletions era_5g_relay_network_application/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from cv_bridge import CvBridge # pants: no-infer-dep
from rclpy.executors import MultiThreadedExecutor # pants: no-infer-dep
from rclpy.node import Node # pants: no-infer-dep
from rclpy.parameter import Parameter # pants: no-infer-dep
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy # pants: no-infer-dep
from rosbridge_library.internal import ros_loader # pants: no-infer-dep

from era_5g_client.client import NetAppClient
from era_5g_client.client_base import NetAppClientBase
Expand All @@ -27,6 +27,7 @@
load_entities_list,
load_transform_list,
)
from era_5g_relay_network_application.worker_clock import WorkerClock
from era_5g_relay_network_application.worker_image_publisher import WorkerImagePublisher
from era_5g_relay_network_application.worker_image_subscriber import WorkerImageSubscriber
from era_5g_relay_network_application.worker_publisher import WorkerPublisher
Expand All @@ -45,6 +46,9 @@
# ip address or hostname of the computer, where the netapp is deployed
NETAPP_ADDRESS = os.getenv("NETAPP_ADDRESS", "http://localhost:5896")

# flag to generate and send /clock topic to the server
SEND_CLOCK = os.getenv("SEND_CLOCK", "false").lower() in ("true", "1", "t")

# parameters for register method
WAIT_UNTIL_AVAILABLE = os.getenv("WAIT_UNTIL_AVAILABLE", "false").lower() in ("true", "1")
WAIT_TIMEOUT = int(os.getenv("WAIT_UNTIL_AVAILABLE_TO", -1))
Expand All @@ -60,6 +64,9 @@
# middleware robot id (robot id)
MIDDLEWARE_ROBOT_ID = os.getenv("MIDDLEWARE_ROBOT_ID", "00000000-0000-0000-0000-000000000000")


USE_SIM_TIME = os.getenv("USE_SIM_TIME", "false").lower() in ("true", "1", "t")

QUEUE_LENGTH_TOPICS = int(os.getenv("QUEUE_LENGTH_TOPICS", 1))
QUEUE_LENGTH_SERVICES = int(os.getenv("QUEUE_LENGTH_SERVICES", 1))
QUEUE_LENGTH_TF = int(os.getenv("QUEUE_LENGTH_TF", 1))
Expand All @@ -73,6 +80,7 @@
services_workers: Dict[str, WorkerServiceServer] = dict()
socketio_workers: List[WorkerSocketIO] = list()


node: Optional[Node] = None

BEST_EFFORT = QoSProfile(
Expand Down Expand Up @@ -208,6 +216,8 @@ def main(args=None) -> None:
rclpy.init(args=args)
global node
node = rclpy.create_node("relay_client")
use_sim_time = Parameter("use_sim_time", Parameter.Type.BOOL, USE_SIM_TIME)
node.set_parameters([use_sim_time])
node.get_logger().set_level(logging.DEBUG)
executor = MultiThreadedExecutor()
executor.add_node(node)
Expand All @@ -226,6 +236,7 @@ def main(args=None) -> None:

node.get_logger().debug(f"Loaded outgoing topics: {topics_outgoing_list}")
node.get_logger().debug(f"Loaded incoming topics: {topics_incoming_list}")

node.get_logger().debug(f"Loaded outgoing services: {services}")

global client
Expand Down Expand Up @@ -292,12 +303,19 @@ def main(args=None) -> None:
client = NetAppClientBase(callbacks_info, extended_measuring=EXTENDED_MEASURING)
client.register(f"{NETAPP_ADDRESS}", {"subscribe_results": True}, WAIT_UNTIL_AVAILABLE, WAIT_TIMEOUT)

if SEND_CLOCK:
send_function_clock: SendFunctionProtocol = partial(
client.send_data,
event="topic//clock",
channel_type=ChannelType.JSON,
can_be_dropped=True,
)
worker_clock = WorkerClock(send_function_clock, node)
worker_clock.daemon = True
worker_clock.start()

# create socketio workers for all topics and services to be sent to the relay server
for topic_out in topics_outgoing_list:
topic_type_class = ros_loader.get_message_instance(topic_out.type)

logger.info(f"Topic class is {topic_type_class}")

subscriber_queue: Queue = Queue(QUEUE_LENGTH_TOPICS)
channel_type = get_channel_type(topic_out.compression, topic_out.type)

Expand Down
7 changes: 4 additions & 3 deletions era_5g_relay_network_application/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Dict, Optional, Tuple, Union

import rclpy # pants: no-infer-dep
from rclpy.parameter import Parameter # pants: no-infer-dep
from rclpy.qos import QoSProfile # pants: no-infer-dep

from era_5g_interface.channels import CallbackInfoServer, Channels, ChannelType
Expand Down Expand Up @@ -43,7 +44,7 @@
QUEUE_LENGTH_TOPICS = int(os.getenv("QUEUE_LENGTH_TOPICS", 1))
QUEUE_LENGTH_SERVICES = int(os.getenv("QUEUE_LENGTH_SERVICES", 1))
QUEUE_LENGTH_TF = int(os.getenv("QUEUE_LENGTH_TF", 1))

USE_SIM_TIME = os.getenv("USE_SIM_TIME", "false").lower() in ("true", "1", "t")
EXTENDED_MEASURING = bool(os.getenv("EXTENDED_MEASURING", False))


Expand Down Expand Up @@ -304,8 +305,6 @@ def json_callback(self, sid: str, data: Dict, queue: Queue):
data (Dict): The json data.
queue (Queue): The queue to pass the data to the publisher worker.
"""

print(f"json_callback: {sid}, {data}")
try:
queue.put_nowait((data, Channels.get_timestamp_from_data(data)))
except Full:
Expand Down Expand Up @@ -397,6 +396,8 @@ def main(args=None) -> None:

rclpy.init(args=args)
node = rclpy.create_node("relay_netapp")
use_sim_time = Parameter("use_sim_time", Parameter.Type.BOOL, USE_SIM_TIME)
node.set_parameters([use_sim_time])
node.get_logger().set_level(logging.DEBUG)
node.get_logger().debug(f"Loaded outgoing topics: {topics_outgoing_list}")
node.get_logger().debug(f"Loaded incoming topics: {topics_incoming_list}")
Expand Down
3 changes: 2 additions & 1 deletion era_5g_relay_network_application/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, List, Optional, Tuple, Union

from rclpy.qos import QoSPresetProfiles, QoSProfile # pants: no-infer-dep
from ros2bag.api import interpret_dict_as_qos_profile # pants: no-infer-dep
from rosbridge_library.internal import ros_loader # pants: no-infer-dep
from sensor_msgs.msg import Image # pants: no-infer-dep

Expand Down Expand Up @@ -116,7 +117,7 @@ def load_entities_list(env_name: str = "TOPICS_TO_SERVER") -> List[EntityConfig]
try:
ec.qos = QoSPresetProfiles.get_from_short_key(entity["qos"]["preset"])
except KeyError:
ec.qos = QoSProfile(**entity["qos"])
ec.qos = interpret_dict_as_qos_profile(entity["qos"])

ret.append(ec)

Expand Down
42 changes: 42 additions & 0 deletions era_5g_relay_network_application/worker_clock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
from threading import Event, Thread
from typing import Any

from rclpy.node import Node # pants: no-infer-dep
from rosbridge_library.internal.message_conversion import extract_values # pants: no-infer-dep
from rosgraph_msgs.msg import Clock # pants: no-infer-dep

from era_5g_interface.exceptions import BackPressureException
from era_5g_interface.utils.rate_timer import RateTimer
from era_5g_relay_network_application import SendFunctionProtocol


class WorkerClock(Thread):
"""Worker object for sending data over socket io."""

def __init__(self, send_function: SendFunctionProtocol, node: Node, **kw):
super().__init__(**kw)
self.stop_event = Event()
self.send_function = send_function
self.rt = RateTimer(20)
self.node = node

def stop(self) -> None:
self.stop_event.set()

def run(self) -> None:
logging.debug(f"{self.name} thread is running.")

while not self.stop_event.is_set():
cl = Clock()
cl.clock = self.node.get_clock().now().to_msg()
data = extract_values(cl)
self.send_data(data)
self.rt.sleep()

def send_data(self, data: Any) -> None:
assert self.send_function is not None
try:
self.send_function(data)
except BackPressureException:
logging.warning("Backpressure applied.")
2 changes: 1 addition & 1 deletion era_5g_relay_network_application/worker_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def callback(self, data: Any):
if isinstance(data, PointCloud2) and self.compression == Compressions.DRACO:
np_arr = read_points_numpy(data, field_names=["x", "y", "z"], skip_nans=True) # drop intensity, etc....
cpc = DracoPy.encode(np_arr, compression_level=1)
msg = cpc
msg["data"] = cpc

try:
if self.action_topic_variant == ActionTopicVariant.ACTION_FEEDBACK:
Expand Down
6 changes: 6 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,9 @@ ignore_missing_imports = True

[mypy-example_interfaces.*]
ignore_missing_imports = True

[mypy-ros2bag.*]
ignore_missing_imports = True

[mypy-rosgraph_msgs.*]
ignore_missing_imports = True

0 comments on commit 9359679

Please sign in to comment.