Skip to content

Commit

Permalink
Added first implmentation of subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
hannemn committed Feb 20, 2025
1 parent c2a9bfb commit ca276d7
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 515 deletions.
40 changes: 11 additions & 29 deletions lang/c/core/include/ecal_c/pubsub/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,13 @@
#include <ecal_c/export.h>
#include <ecal_c/types.h>

#include <ecal_c/callback.h>

#ifdef __cplusplus
extern "C"
{
#endif /*__cplusplus*/
typedef struct eCAL_Publisher eCAL_Publisher;
typedef struct eCAL_PayloadWriter eCAL_PayloadWriter;

struct eCAL_SDataTypeInformation
{
const char* name;
const char* encoding;
const char* descriptor;
size_t descriptor_len;
};

enum eCAL_TransportLayer_eType
{
eCAL_TransportLayer_eType_none,
Expand Down Expand Up @@ -90,25 +80,19 @@ extern "C"
size_t layer_priority_remote_length;
};


typedef uint64_t eCAL_EntityIdT;

struct eCAL_SEntityId
enum eCAL_ePublisherEvent
{
eCAL_EntityIdT entity_id;
int32_t process_id;
const char* host_name;
};

struct eCAL_STopicId
{
struct eCAL_SEntityId topic_id;
const char* topic_name;
eCAL_ePublisherEvent_none,
eCAL_ePublisherEvent_connected,
eCAL_ePublisherEvent_disconnected,
eCAL_ePublisherEvent_dropped
};

struct eCAL_SPubEventCallbackData
{
int __placeholder;
eCAL_ePublisherEvent event_type;
long long event_time;
struct eCAL_SDataTypeInformation subscriber_datatype;
};

typedef void (*eCAL_PubEventCallbackT)(const struct eCAL_STopicId*, const struct eCAL_SPubEventCallbackData*);
Expand All @@ -121,7 +105,6 @@ extern "C"
size_t (*GetSize)();
};


ECALC_API eCAL_Publisher* eCAL_Publisher_New(const char* topic_name_, const struct eCAL_SDataTypeInformation* data_type_information_, const struct eCAL_Publisher_Configuration* publisher_configuration_);
ECALC_API eCAL_Publisher* eCAL_Publisher_New2(const char* topic_name_, const struct eCAL_SDataTypeInformation* data_type_information_, const eCAL_PubEventCallbackT pub_event_callback, const struct eCAL_Publisher_Configuration* publisher_configuration_);

Expand All @@ -135,14 +118,13 @@ extern "C"
ECALC_API char* eCAL_Publisher_GetTopicName(eCAL_Publisher* publisher_);

ECALC_API struct eCAL_STopicId* eCAL_Publisher_GetTopicId(eCAL_Publisher* publisher_);

ECALC_API struct eCAL_SDataTypeInformation* eCAL_Publisher_GetDataTypeInformation(eCAL_Publisher* publisher_);
ECALC_API void eCAL_STopicId_Free(struct eCAL_STopicId* topic_id_);


ECALC_API void eCAL_STopicId_Free(struct eCAL_STopicId* topic_id_);
ECALC_API struct eCAL_SDataTypeInformation* eCAL_Publisher_GetDataTypeInformation(eCAL_Publisher* publisher_);
ECALC_API void eCAL_SDataTypeInformation_Free(struct eCAL_SDataTypeInformation* data_type_information_);

ECALC_API struct eCAL_Publisher_Configuration eCAL_GetPublisherConfiguration();
ECALC_API struct eCAL_Publisher_Configuration* eCAL_GetPublisherConfiguration();
ECALC_API void eCAL_Publisher_Configuration_Free(eCAL_Publisher_Configuration* publisher_configuration_);

#ifdef __cplusplus
Expand Down
245 changes: 48 additions & 197 deletions lang/c/core/include/ecal_c/pubsub/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,203 +34,54 @@
extern "C"
{
#endif /*__cplusplus*/
/**
* @brief Instance a subscriber.
*
* @return Handle to new subscriber or NULL if failed.
**/
ECALC_API ECAL_HANDLE eCAL_Sub_New();

/**
* @brief Create a subscriber.
*
* @param handle_ Publisher handle.
* @param topic_name_ Unique topic name.
* @param topic_type_name_ Topic type name (like 'string', 'person').
* @param topic_type_encoding_ Topic type encoding (like 'base', 'proto').
* @param topic_desc_ Topic type description.
* @param topic_desc_len_ Topic type description length.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_Create(ECAL_HANDLE handle_, const char* topic_name_, const char* topic_type_name_, const char* topic_type_encoding_, const char* topic_desc_, int topic_desc_len_);

/**
* @brief Destroy a subscriber.
*
* @param handle_ Subscriber handle.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_Destroy(ECAL_HANDLE handle_);

/**
* @brief Set a set of id's to prefiltering topics (see eCAL_Pub_SetID).
*
* @param handle_ Subscriber handle.
* @param id_array_ Array of id's (Use nullptr to reset id's).
* @param id_num_ Number of id's.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_SetID(ECAL_HANDLE handle_, const long long* id_array_, const int id_num_);

/**
* @brief Sets subscriber attribute.
*
* @param handle_ Subscriber handle.
* @param attr_name_ Attribute name.
* @param attr_name_len_ Attribute name length.
* @param attr_value_ Attribute value.
* @param attr_value_len_ Attribute value length.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_SetAttribute(ECAL_HANDLE handle_, const char* attr_name_, int attr_name_len_, const char* attr_value_, int attr_value_len_);

/**
* @brief Removes subscriber attribute.
*
* @param handle_ Subscriber handle.
* @param attr_name_ Attribute name.
* @param attr_name_len_ Attribute name length.
*
* @return None zero if succeeded.
* @experimental
**/
ECALC_API int eCAL_Sub_ClearAttribute(ECAL_HANDLE handle_, const char* attr_name_, int attr_name_len_);

/**
* @brief Receive a message from the publisher in a preallocated buffer.
*
* @param handle_ Subscriber handle.
* @param buf_ Buffer to store the received message content.
* @param buf_len_ Length of the receive buffer.
* @param [out] time_ Time from publisher in us.
* @param rcv_timeout_ Maximum time before receive operation returns (in milliseconds, -1 means infinite).
*
* @return Length of received buffer.
**/
ECALC_API int eCAL_Sub_Receive_ToBuffer(ECAL_HANDLE handle_, void* buf_, int buf_len_, long long* time_, int rcv_timeout_);

/**
* @brief Receive a message from the publisher and let eCAL allocate the memory.
*
* @param handle_ Subscriber handle.
* @param [out] buf_ Buffer to store the pointer to the received message content.
* You need to free the memory finally calling eCAL_FreeMem.
* @param [out] time_ Time from publisher in us.
* @param rcv_timeout_ Maximum time before receive operation returns (in milliseconds, -1 means infinite).
*
* @return Length of received buffer.
**/
ECALC_API int eCAL_Sub_Receive_Alloc(ECAL_HANDLE handle_, void** buf_, long long* time_, int rcv_timeout_);

/**
* @brief Receive a message from the publisher and let eCAL allocate the memory (able to process zero length buffer).
*
* @param handle_ Subscriber handle.
* @param [out] buf_ Buffer to store the received message content.
* @param [out] buf_len_ Length of allocated buffer,
* eCAL is allocating the buffer for you, use ecal_free_mem to free the buffer finally.
* You need to free the memory finally calling eCAL_FreeMem.
* @param [out] time_ Time from publisher in us.
* @param rcv_timeout_ Maximum time before receive operation returns (in milliseconds, -1 means infinite).
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_Receive_Buffer_Alloc(ECAL_HANDLE handle_, void** buf_, int* buf_len_, long long* time_, int rcv_timeout_);

/**
* @brief Add callback function for incoming receives.
* @since eCAL 5.10.0
*
* @param handle_ Subscriber handle.
* @param callback_ The callback function to add.
* @param par_ User defined context that will be forwarded to the callback function.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_AddReceiveCallback(ECAL_HANDLE handle_, ReceiveCallbackCT callback_, void* par_);

/**
* @brief Remove callback function for incoming receives.
*
* @param handle_ Subscriber handle.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_RemReceiveCallback(ECAL_HANDLE handle_);

/**
* @brief Add callback function for subscriber events.
*
* @param handle_ Subscriber handle.
* @param type_ The event type to react on.
* @param callback_ The callback function to add.
* @param par_ User defined context that will be forwarded to the callback function.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_AddEventCallback(ECAL_HANDLE handle_, enum eCAL_Subscriber_Event type_, SubEventCallbackCT callback_, void* par_);

/**
* @brief Remove callback function for subscriber events.
*
* @param handle_ Subscriber handle.
* @param type_ The event type to remove.
*
* @return None zero if succeeded.
**/
ECALC_API int eCAL_Sub_RemEventCallback(ECAL_HANDLE handle_, enum eCAL_Subscriber_Event type_);

/**
* @brief Gets type name of the connected topic.
*
* @param handle_ Subscriber handle.
* @param [out] buf_ Pointer to store the subscriber type name string.
* @param buf_len_ Length of allocated buffer or ECAL_ALLOCATE_4ME if
* eCAL should allocate the buffer for you (see eCAL_FreeMem).
*
* @return Type name buffer length or zero if failed.
**/
ECALC_API int eCAL_Sub_GetTypeName(ECAL_HANDLE handle_, void* buf_, int buf_len_);

/**
* @brief Gets encoding of the connected topic.
*
* @param handle_ Subscriber handle.
* @param [out] buf_ Pointer to store the subscriber encoding string.
* @param buf_len_ Length of allocated buffer or ECAL_ALLOCATE_4ME if
* eCAL should allocate the buffer for you (see eCAL_FreeMem).
*
* @return Encoding buffer length or zero if failed.
**/
ECALC_API int eCAL_Sub_GetEncoding(ECAL_HANDLE handle_, void* buf_, int buf_len_);

/**
* @brief Gets description of the connected topic.
*
* @param handle_ Subscriber handle.
* @param [out] buf_ Pointer to store the subscriber description string.
* @param buf_len_ Length of allocated buffer or ECAL_ALLOCATE_4ME if
* eCAL should allocate the buffer for you (see eCAL_FreeMem).
*
* @return Description buffer length or zero if failed.
**/
ECALC_API int eCAL_Sub_GetDescription(ECAL_HANDLE handle_, void* buf_, int buf_len_);

/**
* @brief Dump the whole class state into a string buffer.
*
* @param handle_ Publisher handle.
* @param [out] buf_ Pointer to store the monitoring information.
* @param buf_len_ Length of allocated buffer or ECAL_ALLOCATE_4ME if
* eCAL should allocate the buffer for you (see eCAL_FreeMem).
*
* @return Dump buffer length or zero if failed.
**/
ECALC_API int eCAL_Sub_Dump(ECAL_HANDLE handle_, void* buf_, int buf_len_);

typedef struct eCAL_Subscriber eCAL_Subscriber;

struct eCAL_SReceiveCallbackData
{
const void* buffer;
size_t buffer_size;
int64_t send_timestamp;
int64_t send_clock;
};

enum eCAL_eSubscriberEvent
{
eCAL_eSubscriberEvent_none,
eCAL_eSubscriberEvent_connected,
eCAL_eSubscriberEvent_disconnected,
eCAL_eSubscriberEvent_dropped
};

struct eCAL_SSubEventCallbackData
{
eCAL_eSubscriberEvent event_type;
long long event_time;
struct eCAL_SDataTypeInformation publisher_datatype;
};

typedef void (*eCAL_ReceiveCallbackT)(const struct eCAL_STopicId*, const struct eCAL_SDataTypeInformation*, const struct eCAL_SReceiveCallbackData*);
typedef void (*eCAL_SubEventCallbackT)(const struct eCAL_STopicId*, const struct eCAL_SSubEventCallbackData*);

ECALC_API eCAL_Subscriber* eCAL_Subscriber_New(const char* topic_name_, const struct eCAL_SDataTypeInformation* data_type_information_, const struct eCAL_Subscriber_Configuration* subscriber_configuration_);
ECALC_API eCAL_Subscriber* eCAL_Subscriber_New2(const char* topic_name_, const struct eCAL_SDataTypeInformation* data_type_information_, const struct eCAL_SubEventCallbackT sub_event_callback, const struct eCAL_Subscriber_Configuration* subscriber_configuration_);
ECALC_API void eCAL_Subscriber_Delete(eCAL_Subscriber* subscriber_);

ECALC_API int eCAL_Subscriber_SetReceiveCallback(eCAL_Subscriber* subscriber_, eCAL_ReceiveCallbackT callback_);
ECALC_API int eCAL_Subscriber_RemoveReceiveCallback(eCAL_Subscriber* subscriber_);

ECALC_API size_t eCAL_Subscriber_GetPublisherCount(eCAL_Subscriber* subscriber_);

ECALC_API const char* eCAL_Subscriber_GetTopicName(eCAL_Subscriber* subscriber_);

ECALC_API struct eCAL_STopicId* eCAL_Subscriber_GetTopicId(eCAL_Subscriber* subscriber_);
//ECALC_API void eCAL_STopicId_Free(struct eCAL_STopicId* topic_id_);

ECALC_API struct eCAL_SDataTypeInformation* eCAL_Subscriber_GetDataTypeInformation(eCAL_Subscriber* subscriber_);
//ECALC_API void eCAL_SDataTypeInformation_Free(struct eCAL_SDataTypeInformation* data_type_information_);

ECALC_API struct eCAL_Subscriber_Configuration* eCAL_GetSubscriberConfiguration();
ECALC_API void eCAL_Subscriber_Configuration_Free(eCAL_Subscriber_Configuration* subscriber_configuration_);
#ifdef __cplusplus
}
#endif /*__cplusplus*/
Expand Down
24 changes: 24 additions & 0 deletions lang/c/core/include/ecal_c/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@

#include <stdint.h>


struct eCAL_SDataTypeInformation
{
const char* name;
const char* encoding;
const void* descriptor;
size_t descriptor_len;
};


typedef uint64_t eCAL_EntityIdT;

struct eCAL_SEntityId
{
eCAL_EntityIdT entity_id;
int32_t process_id;
const char* host_name;
};

struct eCAL_STopicId
{
struct eCAL_SEntityId topic_id;
const char* topic_name;
};
/**
* @brief Flag to indicate eCAL to allocate/deallocate memory.
**/
Expand Down
Loading

0 comments on commit ca276d7

Please sign in to comment.