From 056810bac05dea399aed37aadbdda5f8faf81f82 Mon Sep 17 00:00:00 2001 From: Paolo Bosetti Date: Mon, 15 Jul 2024 15:02:37 +0200 Subject: [PATCH] Added support for message topic in sink and filter plugins. Plugin protocol version 4 --- src/common.hpp | 2 ++ src/filter.hpp | 5 +++-- src/plugin/echoj.cpp | 2 +- src/plugin/running_avg.cpp | 2 +- src/plugin/to_console.cpp | 4 ++-- src/sink.hpp | 5 +++-- src/source.hpp | 2 +- 7 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/common.hpp b/src/common.hpp index 169a3fc..b32114a 100644 --- a/src/common.hpp +++ b/src/common.hpp @@ -1,6 +1,8 @@ #ifndef COMMON_HPP #define COMMON_HPP +#define PLUGIN_PROTOCOL_VERSION 4 + /*! * @file common.hpp * @brief Common definitions for the pugg library diff --git a/src/filter.hpp b/src/filter.hpp index 1f549e1..222063f 100644 --- a/src/filter.hpp +++ b/src/filter.hpp @@ -64,9 +64,10 @@ class Filter { * data was loaded successfully, and false otherwise. * * @param data The input data + * @param topic The topic of the data * @return True if the data was loaded successfully, and false otherwise */ - virtual return_type load_data(Tin const &data) = 0; + virtual return_type load_data(Tin const &data, std::string topic = "") = 0; /*! * Processes the input data @@ -124,7 +125,7 @@ class Filter { /*! * Returns the plugin protocol version. */ - static const int version = 3; + static const int version = PLUGIN_PROTOCOL_VERSION; /*! * Returns the plugin server name. diff --git a/src/plugin/echoj.cpp b/src/plugin/echoj.cpp index 3f75c12..fd7c3d2 100644 --- a/src/plugin/echoj.cpp +++ b/src/plugin/echoj.cpp @@ -25,7 +25,7 @@ using json = nlohmann::json; class Echo : public Filter { public: string kind() override { return PLUGIN_NAME; } - return_type load_data(json const &d) override { + return_type load_data(json const &d, string topic = "") override { _data = d; return return_type::success; } diff --git a/src/plugin/running_avg.cpp b/src/plugin/running_avg.cpp index 5c68479..e9f7a6a 100644 --- a/src/plugin/running_avg.cpp +++ b/src/plugin/running_avg.cpp @@ -29,7 +29,7 @@ class RunningAverage : public Filter { // We expect to have a dictionary of values in the input, and we feed them // into a map of double-ended queues (deques) to keep track of the last N // values for each key. - return_type load_data(json const &input) override { + return_type load_data(json const &input, string topic = "") override { if (input[_params["field"]].is_object() == false) { return return_type::error; } diff --git a/src/plugin/to_console.cpp b/src/plugin/to_console.cpp index 5c15ea6..b973752 100644 --- a/src/plugin/to_console.cpp +++ b/src/plugin/to_console.cpp @@ -25,9 +25,9 @@ using json = nlohmann::json; class ToConsole : public Sink { public: string kind() override { return PLUGIN_NAME; } - return_type load_data(json const &d) override { + return_type load_data(json const &d, string topic = "") override { _data = d; - cout << "Data: " << _data << endl; + cout << "[" << topic << "] Data: " << _data << endl; return return_type::success; } diff --git a/src/sink.hpp b/src/sink.hpp index 53303ee..6243bc2 100644 --- a/src/sink.hpp +++ b/src/sink.hpp @@ -63,9 +63,10 @@ class Sink { * data was loaded successfully, and false otherwise. * * @param data The input data + * @param topic The topic of the data * @return True if the data was loaded successfully, and false otherwise */ - virtual return_type load_data(Tin const &data) = 0; + virtual return_type load_data(Tin const &data, std::string topic = "") = 0; /*! * Sets the parameters @@ -112,7 +113,7 @@ class Sink { /*! * Returns the plugin protocol version. */ - static const int version = 3; + static const int version = PLUGIN_PROTOCOL_VERSION; /*! * Returns the plugin server name. diff --git a/src/source.hpp b/src/source.hpp index adbfeb4..ec91813 100644 --- a/src/source.hpp +++ b/src/source.hpp @@ -118,7 +118,7 @@ class Source { bool dummy; - static const int version = 3; + static const int version = PLUGIN_PROTOCOL_VERSION; /*! * Returns the plugin server name.