Skip to content

Commit

Permalink
Add a way to hook traffic
Browse files Browse the repository at this point in the history
It is needed for In meter gateway
  • Loading branch information
GwendalRaoul committed Feb 4, 2025
1 parent fc94c51 commit 468ee17
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 7 deletions.
12 changes: 6 additions & 6 deletions example/linux/gw-example/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <pthread.h>

#include "wpc_proto.h"
#include "meter_hook.h"

#include "MQTTClient.h"

Expand Down Expand Up @@ -77,6 +78,7 @@ static MQTTClient m_client = NULL;
static void signal_handler(int signum)
{
running = false;
Meter_Hook_stop();
}

static bool MQTT_publish(char * topic, uint8_t * payload, size_t payload_size, bool retained)
Expand Down Expand Up @@ -120,7 +122,6 @@ static bool MQTT_publish(char * topic, uint8_t * payload, size_t payload_size, b
pthread_cond_signal(&m_pub_queue_not_empty_cond);
m_pub_queue_empty = false;
ret = true;

}
}

Expand Down Expand Up @@ -360,7 +361,7 @@ static bool MQTT_connect(uint32_t timeout_s,

MQTTClient_init_options global_init_options = MQTTClient_init_options_initializer;
global_init_options.do_openssl_init = true;

snprintf(topic_all_requests, sizeof(topic_all_requests), "gw-request/+/%s/#", m_gateway_id);

MQTTClient_global_init(&global_init_options);
Expand Down Expand Up @@ -560,10 +561,9 @@ int main(int argc, char * argv[])

LOGI("Starting gw with id %s on host %s\n", gateway_id, mqtt_host);

while (running)
{
sleep(2);
}
Meter_Hook_init(123456);
// Next call will return only once Meter_Hook_stop is called
Meter_Hook_start();

LOGI("Clean exit requested\n");
mqtt_unsubscribe_topics();
Expand Down
3 changes: 3 additions & 0 deletions example/linux/gw-example/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ TARGET_APP := $(BUILDPREFIX)$(MAIN_APP)

# Add Api header
CFLAGS += -I$(MESH_LIB_FOLDER)api
# Add platform api too
CFLAGS += -I$(MESH_LIB_FOLDER)platform
# Add pthtread lib as needed by Mesh Lib
LDFLAGS += -pthread
# Add paho shared library
Expand All @@ -34,6 +36,7 @@ CFLAGS += -D_REENTRANT

# Specific sources for main
SOURCES:= $(SOURCEPREFIX)main.c
SOURCES+= $(SOURCEPREFIX)meter_hook.c

OBJECTS := $(patsubst $(SOURCEPREFIX)%, \
$(BUILDPREFIX)%, \
Expand Down
127 changes: 127 additions & 0 deletions example/linux/gw-example/meter_hook.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/* Wirepas Oy licensed under Apache License, Version 2.0
*
* See file LICENSE for full license details.
*
*/
#include <stdio.h>
#include <sys/msg.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <stdint.h>

#include "wpc.h"

#define LOG_MODULE_NAME "Meter Hook"
#define MAX_LOG_LEVEL DEBUG_LOG_LEVEL
#include "logger.h"

#define MAX_PAYLOAD_SIZE 1500

#define UPLINK_QUEUE_KEY 12345
#define DOWNLINK_QUEUE_KEY 54321

static int m_queue_uplink_id;
static int m_queue_downlink_id;

static bool m_meter_hook_running = false;
static uint32_t m_meter_address = 0;

static app_addr_t m_sink_add = 0;

/* HACK: platform.h should be part of public API too */
extern unsigned long long Platform_get_timestamp_ms_epoch();

/* Define the message structure */
typedef struct {
long int mtype;
uint8_t src_ep;
uint8_t dst_ep;
uint16_t len;
uint8_t payload[MAX_PAYLOAD_SIZE];
} message_t;

static bool onDownlinkTrafficReceived_cb(app_message_t * data_p)
{
int ret;

// Check if data is for our meter
if (data_p->dst_addr == m_meter_address)
{
// Message is for our meter, propagate it
message_t msg;
msg.src_ep = data_p->src_ep;
msg.dst_ep = data_p->dst_ep;

memcpy(&msg.payload, data_p->bytes, data_p->num_bytes);
msg.len = data_p->num_bytes;
msg.mtype = 1;

/* Send the message */
LOGI("Sending buffer of len %d of type %d\n", msg.len, msg.mtype);
ret = msgsnd(m_queue_downlink_id, &msg, sizeof(message_t) - sizeof(long), 0);
if (ret < 0)
{
LOGE("Cannot send %d, %s\n", ret, strerror(errno));
}
return true;
}
return false;
}
void Meter_Hook_init(uint32_t meter_address)
{
m_queue_uplink_id = msgget(UPLINK_QUEUE_KEY, IPC_CREAT | 0666);
m_queue_downlink_id = msgget(DOWNLINK_QUEUE_KEY, IPC_CREAT | 0666);

LOGD("Tx key = %d, Rx Key = %d\n", m_queue_uplink_id, m_queue_downlink_id);

m_meter_address = meter_address;

// Get the sink address to set the target address
WPC_get_node_address(&m_sink_add);

LOGI("m_meter_address=%d sink address = %d\n", m_meter_address, m_sink_add);
}

void Meter_Hook_start(void)
{
int ret;
message_t msg;
m_meter_hook_running = true;
app_res_e wpc_res;

WPC_register_downlink_data_hook(onDownlinkTrafficReceived_cb);

while (m_meter_hook_running)
{
LOGD("Waiting on queue\n");
ret = msgrcv(m_queue_uplink_id, &msg, sizeof(message_t) - sizeof(long), 1, 0);
if (ret < 0) {
LOGE("Cannot receive message %d, %s", ret, strerror(errno));
}
LOGI("Data to send of len %d %d->%d\n", msg.len, msg.src_ep, msg.dst_ep);

wpc_res = WPC_inject_uplink_data(
msg.payload,
msg.len,
m_meter_address,
m_sink_add,
APP_QOS_HIGH,
msg.src_ep,
msg.dst_ep,
0, // No travel time
1, // 1 hop count
Platform_get_timestamp_ms_epoch()
);

LOGI("Message injected : %d\n", wpc_res);
}
}

void Meter_Hook_stop(void)
{
m_meter_hook_running = false;
// TODO unblock the msgrcv

WPC_unregister_downlink_data_hook();
}
17 changes: 17 additions & 0 deletions example/linux/gw-example/meter_hook.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/* Wirepas Oy licensed under Apache License, Version 2.0
*
* See file LICENSE for full license details.
*
*/
#ifndef METER_HOOK_H_
#define METER_HOOK_H_

#include "stdint.h"

void Meter_Hook_init(uint32_t meter_address);

void Meter_Hook_start(void);

void Meter_Hook_stop(void);

#endif // METER_HOOK_H_
34 changes: 34 additions & 0 deletions lib/api/wpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1045,4 +1045,38 @@ app_res_e WPC_register_for_stack_status(onStackStatusReceived_cb_f onStackStatus
*/
app_res_e WPC_unregister_from_stack_status();

/**
* \brief Inject uplink traffic as if it was received from Wirepas Network
*/
app_res_e WPC_inject_uplink_data(const uint8_t * bytes,
size_t num_bytes,
app_addr_t src_addr,
app_addr_t dst_addr,
app_qos_e qos,
uint8_t src_ep,
uint8_t dst_ep,
uint32_t travel_time,
int8_t hop_count,
unsigned long long timestamp_ms_epoch);

/**
* \brief Callback definition to register for stack status event
* \param data
* The downlink traffic
* \return True if data was intercepted, false otherwise
*/
typedef bool (*onDownlinkTrafficReceived_cb_f)(app_message_t * data_p);

/**
* \brief Register for being able to intercept downlink traffic
* before being sent to wirepas network
* \param onDownlinkDataCb
* The callback to be called when downlink traffic is received
* \note The callback will decide if message must be propagated or not.
* If not propagated, up to the interceptor to consume it
*/
app_res_e WPC_register_downlink_data_hook(onDownlinkTrafficReceived_cb_f onDownlinkDataCb);

app_res_e WPC_unregister_downlink_data_hook();

#endif
74 changes: 74 additions & 0 deletions lib/wpc/dsap.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ static onDataReceived_cb_f data_cb_table[MAX_NUMBER_EP];
static onDataReceived_cb_f m_data_cb;
#endif

static onDownlinkTrafficReceived_cb_f m_data_hook_downlink_cb;

typedef struct
{
onDataSent_cb_f cb;
Expand Down Expand Up @@ -165,6 +167,30 @@ int dsap_data_tx_request(const uint8_t * buffer,
static uint16_t packet_id = 0;
uint8_t max_data_pdu_size = WPC_Int_get_mtu();

// Check if a hook is present and offer it first
if (m_data_hook_downlink_cb)
{
app_message_t message = (app_message_t) {
.bytes = buffer,
.dst_addr = dest_add,
.on_data_sent_cb = NULL,
.buffering_delay = buffering_delay,
.pdu_id = pdu_id,
.num_bytes = len,
.src_ep = src_ep,
.dst_ep = dest_ep,
.hop_limit = hop_limit,
.qos = qos,
.is_unack_csma_ca = is_unack_csma_ca
};
if (m_data_hook_downlink_cb(&message))
{
// Message was hooked, call the callback directly
LOGI("Message hooked\n");
return APP_RES_OK;
}
}

if (len > MAX_FULL_PACKET_SIZE)
{
// Not very clean, but reuse dualmcu 6 error code for sending data
Expand Down Expand Up @@ -387,6 +413,42 @@ void dsap_data_rx_frag_indication_handler(dsap_data_rx_frag_ind_pl_t * payload,

}

void dsap_data_inject_uplink_data(const uint8_t * bytes,
size_t num_bytes,
app_addr_t src_addr,
app_addr_t dst_addr,
app_qos_e qos,
uint8_t src_ep,
uint8_t dst_ep,
uint32_t travel_time,
int8_t hop_count,
unsigned long long timestamp_ms_epoch)
{
onDataReceived_cb_f cb;
#ifdef REGISTER_DATA_PER_ENDPOINT
cb = data_cb_table[payload->dest_endpoint];
#else
cb = m_data_cb;
#endif
if (cb == NULL)
{
// No cb registered
return;
}

// Call the registered callback
cb(bytes,
num_bytes,
src_addr,
dst_addr,
qos,
src_ep,
dst_ep,
travel_time,
hop_count,
timestamp_ms_epoch);
}

void dsap_data_rx_indication_handler(dsap_data_rx_ind_pl_t * payload,
unsigned long long timestamp_ms_epoch)
{
Expand Down Expand Up @@ -482,6 +544,18 @@ bool dsap_unregister_for_data()
}
#endif

bool dsap_register_downlink_data_hook(onDownlinkTrafficReceived_cb_f onDownlinkDataCb)
{
m_data_hook_downlink_cb = onDownlinkDataCb;
return true;
}

bool dsap_unregister_downlink_data_hook()
{
m_data_hook_downlink_cb = NULL;
return true;
}

bool dsap_set_max_fragment_duration(unsigned int fragment_max_duration_s)
{
reassembly_set_max_fragment_duration(fragment_max_duration_s);
Expand Down
16 changes: 16 additions & 0 deletions lib/wpc/include/dsap.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,29 @@ bool dsap_register_for_data(onDataReceived_cb_f onDataReceived);
bool dsap_unregister_for_data();
#endif

bool dsap_register_downlink_data_hook(onDownlinkTrafficReceived_cb_f onDownlinkDataCb);

bool dsap_unregister_downlink_data_hook();

/**
* \brief Set maximum duration to keep fragment in our buffer until packet is full
* \param fragment_max_duration_s
* Maximum time in s to keep fragments from incomplete packets inside our buffers
*/
bool dsap_set_max_fragment_duration(unsigned int fragment_max_duration_s);


void dsap_data_inject_uplink_data(const uint8_t * bytes,
size_t num_bytes,
app_addr_t src_addr,
app_addr_t dst_addr,
app_qos_e qos,
uint8_t src_ep,
uint8_t dst_ep,
uint32_t travel_time,
int8_t hop_count,
unsigned long long timestamp_ms_epoch);

/**
* \brief Initialize the dsap module
*/
Expand Down
Loading

0 comments on commit 468ee17

Please sign in to comment.