Skip to content

Commit

Permalink
Improved error management and info upon launch for mqtt agent
Browse files Browse the repository at this point in the history
  • Loading branch information
pbosetti committed Jun 14, 2024
1 parent eff0c5b commit 1929010
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions src/plugin/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MQTTBridge : public Source<json>, public mosquittopp {
int port = _params["broker_port"];

lib_init();
reinitialise("MQTT2MADS-bridge", true);
connect(host.c_str(), port, 60);
subscribe(NULL, topic.c_str(), 0);

Expand All @@ -64,7 +65,14 @@ class MQTTBridge : public Source<json>, public mosquittopp {
// }

void on_message(const struct mosquitto_message *message) override {
_data = json::parse((char *)(message->payload));
_data.clear();
try {
_data = json::parse((char *)(message->payload));
_error = "No error";
} catch (json::parse_error &e) {
_error = e.what();
_data["message"] = "Error parsing invalid JSON received from MQTT";
}
_topic = message->topic;
return;
}
Expand All @@ -80,18 +88,19 @@ class MQTTBridge : public Source<json>, public mosquittopp {
*/
return_type get_output(json *out, std::vector<unsigned char> *blob = nullptr) override {
if (setup() != return_type::success) return return_type::critical;
// while (_data.empty()) {
loop();
this_thread::sleep_for(chrono::microseconds(500));
// }
loop();
if(_data.is_null() || _data.empty()) {
_data.clear();
return return_type::retry;
}
(*out)["payload"] = _data;
(*out)["topic"] = _topic;
_data.clear();
return return_type::success;
this_thread::sleep_for(chrono::microseconds(500));
if (_error != "No error")
return return_type::error;
else
return return_type::success;
}

void set_params(void *params) override {
Expand All @@ -100,7 +109,10 @@ class MQTTBridge : public Source<json>, public mosquittopp {
}

map<string, string> info() override {
return {};
return {
{"Broker:", _params.value("broker_host", "unset") + ":" + to_string(_params["broker_port"])},
{"Topic:", _params["topic"]}
};
};

private:
Expand Down Expand Up @@ -135,14 +147,13 @@ int main(int argc, char const *argv[]) {
json output, params;
params["broker_host"] = "localhost";
params["broker_port"] = 1883;
params["topic"] = "mads/#";
params["topic"] = "capture/#";

// Set parameters
bridge.set_params(&params);

// Process data
while (true)
{
while (true) {
if (bridge.get_output(&output) == return_type::success)
cout << "MQTT: " << output << endl;
}
Expand Down

0 comments on commit 1929010

Please sign in to comment.