Skip to content

Commit

Permalink
Querier implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 23, 2025
1 parent 4322864 commit 5788439
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 30 deletions.
31 changes: 31 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,37 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr
#endif

#if Z_FEATURE_QUERY == 1
/**
* Declare a :c:type:`_z_querier_t` for the given resource key.
*
* Parameters:
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to query. The callee gets the ownership of any
* allocated value.
* consolidation_mode: The kind of consolidation that should be applied on replies.
* congestion_control: The congestion control to apply when routing the querier queries.
* target: The kind of queryables that should be target of this query.
* priority: The priority of the query.
* is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* timeout_ms: The timeout value of this query.
* Returns:
* The created :c:type:`_z_querier_t` (in null state if the declaration failed)..
*/
_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control,
z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms);

/**
* Undeclare a :c:type:`_z_querier_t`.
*
* Parameters:
* querier: The :c:type:`_z_querier_t` to undeclare. The callee releases the
* querier upon successful return.
* Returns:
* 0 if success, or a negative value identifying the error.
*/
z_result_t _z_undeclare_querier(_z_querier_t *querier);

/**
* Query data from the matching queryables in the system.
*
Expand Down
7 changes: 5 additions & 2 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,21 @@ typedef struct _z_querier_t {
_z_zint_t _id;
_z_session_weak_t _zn;
_z_encoding_t _encoding;
z_consolidation_mode_t _consolidation_mode;
z_query_target_t _target;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
z_reliability_t reliability;
bool _is_express;
uint64_t _timeout_ms;
} _z_querier_t;

#if Z_FEATURE_QUERY == 1
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_querier_t _z_querier_null(void) { return (_z_querier_t){0}; }
static inline bool _z_querier_check(const _z_querier_t *querier) { return !_Z_RC_IS_NULL(&querier->_zn); }
void _z_querier_clear(_z_querier_t *pub);
void _z_querier_free(_z_querier_t **pub);
void _z_querier_clear(_z_querier_t *querier);
void _z_querier_free(_z_querier_t **querier);
#endif

#ifdef __cplusplus
Expand Down
121 changes: 96 additions & 25 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1206,60 +1206,131 @@ z_result_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr
}

void _z_querier_drop(_z_querier_t *querier) {
(void)querier;
// TODO(sashacmc): Implement
// _z_undeclare_querier(pub);
// _z_querier_clear(pub);
_z_undeclare_querier(querier);
_z_querier_clear(querier);
}

_Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_querier_t, querier, _z_querier_check, _z_querier_null, _z_querier_drop)

#ifdef Z_FEATURE_UNSTABLE_API
void z_querier_get_options_default(z_querier_get_options_t *options) {
options->encoding = NULL;
// TODO(sashacmc): Implement
options->attachment = NULL;
options->payload = NULL;
}

void z_querier_options_default(z_querier_options_t *options) {
options->target = z_query_target_default();
options->consolidation = z_query_consolidation_default();
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->is_express = false;
// TODO(sashacmc): Implement
options->timeout_ms = Z_GET_TIMEOUT_DEFAULT;
}

z_result_t z_declare_querier(const z_loaned_session_t *zs, z_owned_querier_t *querier,
const z_loaned_keyexpr_t *keyexpr, z_querier_options_t *options) {
(void)zs;
(void)querier;
(void)keyexpr;
(void)options;
// TODO(sashacmc): Implement
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
_z_keyexpr_t key = keyexpr_aliased;

querier->_val = _z_querier_null();
// TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition
// lacks a way to convey them to later-joining nodes. Thus, in the current version automatic
// resource declarations are only performed on unicast transports.
if (_Z_RC_IN_VAL(zs)->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
_z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
if (r == NULL) {
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
key = _z_keyexpr_from_string(id, &keyexpr_aliased._suffix);
}
}
// Set options
z_querier_options_t opt;
z_querier_options_default(&opt);
if (options != NULL) {
opt = *options;
}

// Set querier
_z_querier_t int_querier = _z_declare_querier(zs, key, opt.consolidation.mode, opt.congestion_control, opt.target,
opt.priority, opt.is_express, opt.timeout_ms);

querier->_val = int_querier;
return _Z_RES_OK;
}

z_result_t z_undeclare_querier(z_moved_querier_t *querier) {
(void)querier;
// TODO(sashacmc): Implement

return _Z_RES_OK;
z_result_t ret = _z_undeclare_querier(&querier->_this._val);
_z_querier_clear(&querier->_this._val);
return ret;
}

z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *parameters, z_moved_closure_reply_t *callback,
z_querier_get_options_t *options) {
(void)querier;
(void)parameters;
(void)callback;
(void)options;
z_result_t ret = _Z_RES_OK;

// TODO(sashacmc): Implement
void *ctx = callback->_this._val.context;
callback->_this._val.context = NULL;

return _Z_RES_OK;
z_querier_get_options_t opt;
z_querier_get_options_default(&opt);
if (options != NULL) {
opt = *options;
}

_z_encoding_t encoding;
if (opt.encoding == NULL) {
_Z_RETURN_IF_ERR(_z_encoding_copy(&encoding, &querier->_encoding));
} else {
encoding = _z_encoding_steal(&opt.encoding->_this._val);
}
// Remove potentially redundant ke suffix
_z_keyexpr_t querier_keyexpr = _z_keyexpr_alias_from_user_defined(querier->_key, true);

_z_session_t *session = NULL;
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
session = _Z_RC_IN_VAL(&sess_rc);
} else {
ret = _Z_ERR_SESSION_CLOSED;
}

z_consolidation_mode_t consolidation_mode = querier->_consolidation_mode;
if (consolidation_mode == Z_CONSOLIDATION_MODE_AUTO) {
const char *lp = (parameters == NULL) ? "" : parameters;
if (strstr(lp, Z_SELECTOR_TIME) != NULL) {
consolidation_mode = Z_CONSOLIDATION_MODE_NONE;
} else {
consolidation_mode = Z_CONSOLIDATION_MODE_LATEST;
}
}

if (session != NULL) {
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&opt.payload->_this),
.encoding = _z_encoding_from_owned(&opt.encoding->_this)};

ret = _z_query(session, querier_keyexpr, parameters, querier->_target, consolidation_mode, value,
callback->_this._val.call, callback->_this._val.drop, ctx, querier->_timeout_ms,
_z_bytes_from_owned_bytes(&opt.attachment->_this), querier->_congestion_control,
querier->_priority, querier->_is_express);
} else {
ret = _Z_ERR_SESSION_CLOSED;
}

_z_session_rc_drop(&sess_rc);

// Clean-up
z_bytes_drop(opt.payload);
z_encoding_drop(opt.encoding);
z_bytes_drop(opt.attachment);
z_internal_closure_reply_null(
&callback->_this); // call and drop passed to _z_query, so we nullify the closure here
return ret;
}

const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier) {
(void)querier;
// TODO(sashacmc): Implement

return NULL;
return (const z_loaned_keyexpr_t *)&querier->_key;
}
#endif // Z_FEATURE_UNSTABLE_API

Expand Down
32 changes: 30 additions & 2 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,33 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr
#endif

#if Z_FEATURE_QUERY == 1
/*------------------ Querier Declaration ------------------*/
_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control,
z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms) {
// Allocate querier
_z_querier_t ret;
// Fill querier
ret._key = _z_keyexpr_duplicate(&keyexpr);
ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn));
ret._consolidation_mode = consolidation_mode;
ret._congestion_control = congestion_control;
ret._target = target;
ret._priority = priority;
ret._is_express = is_express;
ret._timeout_ms = timeout_ms;
ret._zn = _z_session_rc_clone_as_weak(zn);
return ret;
}

z_result_t _z_undeclare_querier(_z_querier_t *querier) {
if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}
_z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id);
return _Z_RES_OK;
}

/*------------------ Query ------------------*/
z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target,
const z_consolidation_mode_t consolidation, _z_value_t value, _z_closure_reply_callback_t callback,
Expand All @@ -532,7 +559,7 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete
pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr);
pq->_target = target;
pq->_consolidation = consolidation;
pq->_anykey = (strstr(parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
pq->_anykey = (parameters == NULL || strstr(parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
pq->_callback = callback;
pq->_dropper = dropper;
pq->_pending_replies = NULL;
Expand All @@ -542,7 +569,8 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete

ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session
if (ret == _Z_RES_OK) {
_z_slice_t params = _z_slice_alias_buf((uint8_t *)parameters, strlen(parameters));
_z_slice_t params =
(parameters == NULL) ? _z_slice_null() : _z_slice_alias_buf((uint8_t *)parameters, strlen(parameters));
_z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, &params, pq->_id, pq->_consolidation, &value,
timeout_ms, attachment, cong_ctrl, priority, is_express);

Expand Down
21 changes: 20 additions & 1 deletion src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,28 @@
#include "zenoh-pico/net/query.h"

#include "zenoh-pico/net/session.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/transport/common/tx.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERY == 1
void _z_querier_clear(_z_querier_t *querier) {
_z_keyexpr_clear(&querier->_key);
_z_session_weak_drop(&querier->_zn);
_z_encoding_clear(&querier->_encoding);
*querier = _z_querier_null();
}

void _z_querier_free(_z_querier_t **querier) {
_z_querier_t *ptr = *querier;

if (ptr != NULL) {
_z_querier_clear(ptr);

z_free(ptr);
*querier = NULL;
}
}

static void _z_query_clear_inner(_z_query_t *q) {
_z_keyexpr_clear(&q->_key);
_z_value_clear(&q->_value);
Expand Down Expand Up @@ -56,6 +74,7 @@ void _z_query_free(_z_query_t **query) {
*query = NULL;
}
}
#endif

#if Z_FEATURE_QUERYABLE == 1
void _z_queryable_clear(_z_queryable_t *qbl) {
Expand Down

0 comments on commit 5788439

Please sign in to comment.