Skip to content

Commit

Permalink
Add querier write filtering support
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 28, 2025
1 parent 29cdaa3 commit a97ae91
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 43 deletions.
9 changes: 4 additions & 5 deletions include/zenoh-pico/net/filtering.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ typedef struct _z_write_filter_t {
_z_writer_filter_ctx_t *ctx;
} _z_write_filter_t;

typedef struct _z_publisher_t _z_publisher_t;

z_result_t _z_write_filter_create(_z_publisher_t *pub);
z_result_t _z_write_filter_destroy(_z_publisher_t *pub);
bool _z_write_filter_active(const _z_publisher_t *pub);
z_result_t _z_write_filter_create(_z_session_t *zn, _z_write_filter_t *filter, _z_keyexpr_t keyexpr,
uint8_t interest_flag);
z_result_t _z_write_filter_destroy(_z_session_t *zn, _z_write_filter_t *filter);
bool _z_write_filter_active(const _z_write_filter_t *filter);

#ifdef __cplusplus
}
Expand Down
4 changes: 4 additions & 0 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/net/filtering.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"
Expand Down Expand Up @@ -66,6 +67,9 @@ typedef struct _z_querier_t {
z_reliability_t reliability;
bool _is_express;
uint64_t _timeout_ms;
#if Z_FEATURE_INTEREST == 1
_z_write_filter_t _filter;
#endif
} _z_querier_t;

#if Z_FEATURE_QUERY == 1
Expand Down
26 changes: 20 additions & 6 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "zenoh-pico/net/sample.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/interest.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/resource.h"
Expand Down Expand Up @@ -971,7 +972,8 @@ z_result_t z_declare_publisher(const z_loaned_session_t *zs, z_owned_publisher_t
_z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.encoding == NULL ? NULL : &opt.encoding->_this._val,
opt.congestion_control, opt.priority, opt.is_express, reliability);
// Create write filter
z_result_t res = _z_write_filter_create(&int_pub);
z_result_t res =
_z_write_filter_create(_Z_RC_IN_VAL(zs), &int_pub._filter, keyexpr_aliased, _Z_INTEREST_FLAG_SUBSCRIBERS);
if (res != _Z_RES_OK) {
if (key._id != Z_RESOURCE_ID_NONE) {
_z_undeclare_resource(_Z_RC_IN_VAL(zs), key._id);
Expand Down Expand Up @@ -1037,7 +1039,7 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay
_z_bytes_t attachment_bytes = _z_bytes_from_owned_bytes(&opt.attachment->_this);

// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
if (!_z_write_filter_active(&pub->_filter)) {
// Write value
ret = _z_write(session, pub_keyexpr, payload_bytes, &encoding, Z_SAMPLE_KIND_PUT, pub->_congestion_control,
pub->_priority, pub->_is_express, opt.timestamp, attachment_bytes, reliability);
Expand Down Expand Up @@ -1255,6 +1257,15 @@ z_result_t z_declare_querier(const z_loaned_session_t *zs, z_owned_querier_t *qu
_z_querier_t int_querier = _z_declare_querier(zs, key, opt.consolidation.mode, opt.congestion_control, opt.target,
opt.priority, opt.is_express, opt.timeout_ms);

// Create write filter
z_result_t res =
_z_write_filter_create(_Z_RC_IN_VAL(zs), &int_querier._filter, keyexpr_aliased, _Z_INTEREST_FLAG_QUERYABLES);
if (res != _Z_RES_OK) {
if (key._id != Z_RESOURCE_ID_NONE) {
_z_undeclare_resource(_Z_RC_IN_VAL(zs), key._id);
}
return res;
}
querier->_val = int_querier;
return _Z_RES_OK;
}
Expand Down Expand Up @@ -1314,10 +1325,13 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&opt.payload->_this),
.encoding = _z_encoding_from_owned(&opt.encoding->_this)};

ret = _z_query(session, querier_keyexpr, parameters, querier->_target, consolidation_mode, value,
callback->_this._val.call, callback->_this._val.drop, ctx, querier->_timeout_ms,
_z_bytes_from_owned_bytes(&opt.attachment->_this), querier->_congestion_control,
querier->_priority, querier->_is_express);
// Check if write filter is active before quering
if (!_z_write_filter_active(&querier->_filter)) {
ret = _z_query(session, querier_keyexpr, parameters, querier->_target, consolidation_mode, value,
callback->_this._val.call, callback->_this._val.drop, ctx, querier->_timeout_ms,
_z_bytes_from_owned_bytes(&opt.attachment->_this), querier->_congestion_control,
querier->_priority, querier->_is_express);
}
} else {
ret = _Z_ERR_SESSION_CLOSED;
}
Expand Down
56 changes: 25 additions & 31 deletions src/net/filtering.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@
#include <stdint.h>
#include <string.h>

#include "zenoh-pico/api/types.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/net/primitives.h"
#include "zenoh-pico/net/query.h"
#include "zenoh-pico/protocol/codec/core.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_INTEREST == 1
static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) {
Expand Down Expand Up @@ -72,9 +62,10 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) {
}
}

z_result_t _z_write_filter_create(_z_publisher_t *pub) {
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
z_result_t _z_write_filter_create(_z_session_t *zn, _z_write_filter_t *filter, _z_keyexpr_t keyexpr,
uint8_t interest_flag) {
uint8_t flags = interest_flag | _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT |
_Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
_z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)z_malloc(sizeof(_z_writer_filter_ctx_t));

if (ctx == NULL) {
Expand All @@ -83,44 +74,47 @@ z_result_t _z_write_filter_create(_z_publisher_t *pub) {
ctx->state = WRITE_FILTER_INIT;
ctx->decl_id = 0;

pub->_filter.ctx = ctx;
pub->_filter._interest_id =
_z_add_interest(_Z_RC_IN_VAL(&pub->_zn), _z_keyexpr_alias_from_user_defined(pub->_key, true),
_z_write_filter_callback, flags, (void *)ctx);
if (pub->_filter._interest_id == 0) {
filter->ctx = ctx;
filter->_interest_id = _z_add_interest(zn, keyexpr, _z_write_filter_callback, flags, (void *)ctx);
if (filter->_interest_id == 0) {
z_free(ctx);
return _Z_ERR_GENERIC;
}
return _Z_RES_OK;
}

z_result_t _z_write_filter_destroy(_z_publisher_t *pub) {
if (pub->_filter.ctx != NULL) {
z_result_t res = _z_remove_interest(_Z_RC_IN_VAL(&pub->_zn), pub->_filter._interest_id);
z_free(pub->_filter.ctx);
pub->_filter.ctx = NULL;
z_result_t _z_write_filter_destroy(_z_session_t *zn, _z_write_filter_t *filter) {
if (filter->ctx != NULL) {
z_result_t res = _z_remove_interest(zn, filter->_interest_id);
z_free(filter->ctx);
filter->ctx = NULL;
return res;
}
return _Z_RES_OK;
}

bool _z_write_filter_active(const _z_publisher_t *pub) {
return pub->_filter.ctx != NULL && pub->_filter.ctx->state == WRITE_FILTER_ACTIVE;
bool _z_write_filter_active(const _z_write_filter_t *filter) {
return filter->ctx != NULL && filter->ctx->state == WRITE_FILTER_ACTIVE;
}

#else
z_result_t _z_write_filter_create(_z_publisher_t *pub) {
_ZP_UNUSED(pub);
z_result_t _z_write_filter_create(_z_session_t *zn, _z_write_filter_t *filter, _z_keyexpr_t keyexpr,
uint8_t interest_flag) {
_ZP_UNUSED(zn);
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(filter);
_ZP_UNUSED(interest_flag);
return _Z_RES_OK;
}

z_result_t _z_write_filter_destroy(_z_publisher_t *pub) {
_ZP_UNUSED(pub);
z_result_t _z_write_filter_destroy(_z_session_t *zn, _z_write_filter_t *filter) {
_ZP_UNUSED(zn);
_ZP_UNUSED(filter);
return _Z_RES_OK;
}

bool _z_write_filter_active(const _z_publisher_t *pub) {
_ZP_UNUSED(pub);
bool _z_write_filter_active(const _z_write_filter_t *filter) {
_ZP_UNUSED(filter);
return false;
}

Expand Down
3 changes: 2 additions & 1 deletion src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ z_result_t _z_undeclare_publisher(_z_publisher_t *pub) {
_z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&pub->_zn), pub->_id);
#endif
// Clear publisher
_z_write_filter_destroy(pub);
_z_write_filter_destroy(_Z_RC_IN_VAL(&pub->_zn), &pub->_filter);
_z_undeclare_resource(_Z_RC_IN_VAL(&pub->_zn), pub->_key._id);
return _Z_RES_OK;
}
Expand Down Expand Up @@ -541,6 +541,7 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier) {
if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}
_z_write_filter_destroy(_Z_RC_IN_VAL(&querier->_zn), &querier->_filter);
_z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id);
return _Z_RES_OK;
}
Expand Down

0 comments on commit a97ae91

Please sign in to comment.