Skip to content

Commit

Permalink
Merge pull request #302 from eclipse/thomas/mqtt-fix
Browse files Browse the repository at this point in the history
SensorThings MQTT: Queue notifications to keep their order
  • Loading branch information
timothyjward authored Jan 11, 2024
2 parents 1112d93 + 8c06435 commit a9b7118
Showing 1 changed file with 67 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.sensinact.core.command.GatewayThread;
import org.eclipse.sensinact.core.notification.AbstractResourceNotification;
Expand Down Expand Up @@ -80,6 +85,11 @@ public class SensorthingsMqttNorthbound extends AbstractInterceptHandler

private Server mqttBroker;

/**
* Executor to queue notifications publications
*/
private ExecutorService executor;

public @interface Config {

String host() default "0.0.0.0";
Expand All @@ -106,37 +116,56 @@ public class SensorthingsMqttNorthbound extends AbstractInterceptHandler

@Activate
void start(Config config) throws IOException {
mqttBroker = new Server();
// Setup the queue handler
executor = Executors.newSingleThreadExecutor();

Properties props = new Properties();
props.setProperty(HOST_PROPERTY_NAME, config.host());
if (config.port() >= 0) {
props.setProperty(PORT_PROPERTY_NAME, String.valueOf(config.port()));
}
if (!config.keystore_file().isBlank()) {
props.setProperty(SSL_PORT_PROPERTY_NAME, String.valueOf(config.secure_port()));
if (config.websocket_enable()) {
props.setProperty(WSS_PORT_PROPERTY_NAME, String.valueOf(config.websocket_secure_port()));
}
try {
mqttBroker = new Server();

props.setProperty(JKS_PATH_PROPERTY_NAME, config.keystore_file());
props.setProperty(KEY_STORE_TYPE, config.keystore_type());
props.setProperty(KEY_STORE_PASSWORD_PROPERTY_NAME, config._keystore_password());
props.setProperty(KEY_MANAGER_PASSWORD_PROPERTY_NAME, config._keymanager_password());
}
if (config.websocket_enable() && config.websocket_port() >= 0) {
props.setProperty(WEB_SOCKET_PORT_PROPERTY_NAME, String.valueOf(config.websocket_port()));
}
Properties props = new Properties();
props.setProperty(HOST_PROPERTY_NAME, config.host());
if (config.port() >= 0) {
props.setProperty(PORT_PROPERTY_NAME, String.valueOf(config.port()));
}
if (!config.keystore_file().isBlank()) {
props.setProperty(SSL_PORT_PROPERTY_NAME, String.valueOf(config.secure_port()));
if (config.websocket_enable()) {
props.setProperty(WSS_PORT_PROPERTY_NAME, String.valueOf(config.websocket_secure_port()));
}

props.setProperty(JKS_PATH_PROPERTY_NAME, config.keystore_file());
props.setProperty(KEY_STORE_TYPE, config.keystore_type());
props.setProperty(KEY_STORE_PASSWORD_PROPERTY_NAME, config._keystore_password());
props.setProperty(KEY_MANAGER_PASSWORD_PROPERTY_NAME, config._keymanager_password());
}
if (config.websocket_enable() && config.websocket_port() >= 0) {
props.setProperty(WEB_SOCKET_PORT_PROPERTY_NAME, String.valueOf(config.websocket_port()));
}

IConfig serverConfig = new MemoryConfig(props);
IConfig serverConfig = new MemoryConfig(props);

mqttBroker.startServer(serverConfig, List.of(this));
mqttBroker.startServer(serverConfig, List.of(this));
} catch (IOException e) {
executor.shutdownNow();
executor = null;
throw e;
}
}

@Deactivate
void stop(Config config) {
if (executor != null) {
executor.shutdownNow();
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Queued notifications not sent in time");
}
}

mqttBroker.stopServer();
mqttBroker = null;
executor = null;
}

@Override
Expand Down Expand Up @@ -169,15 +198,28 @@ public void onUnsubscribe(InterceptUnsubscribeMessage msg) {

@Override
public void notify(String topic, AbstractResourceNotification event) {
List<SensorthingsMapper<?>> list;
final List<SensorthingsMapper<?>> listeners;
synchronized (lock) {
list = List.copyOf(subscriptions.values());
listeners = List.copyOf(subscriptions.values());
if (listeners.isEmpty()) {
return;
}
}

for (SensorthingsMapper<?> l : listeners) {
executor.execute(() -> {
try {
for (Object payload : l.toPayload(event).getValue().collect(Collectors.toList())) {
notifyListeners(l.getTopicFilter(), payload);
}
} catch (InvocationTargetException | InterruptedException e) {
LOG.error("Error while preparing MQTT payload", e);
}
});
}
list.forEach(s -> s.toPayload(event).onSuccess(l -> l.forEach(o -> notifyListeners(s.getTopicFilter(), o))));
}

private void notifyListeners(String topic, Object data) {

try {
ByteBuf payload = Unpooled.wrappedBuffer(mapper.writeValueAsBytes(data));

Expand Down

0 comments on commit a9b7118

Please sign in to comment.