diff --git a/VERSION-API b/VERSION-API index dc39e58..4684374 100644 --- a/VERSION-API +++ b/VERSION-API @@ -1 +1 @@ -1.6 \ No newline at end of file +1.8 \ No newline at end of file diff --git a/conf/jni-config.json b/conf/jni-config.json index 18b5bc4..563d4d6 100644 --- a/conf/jni-config.json +++ b/conf/jni-config.json @@ -13,14 +13,24 @@ { "name":"com.sun.management.internal.DiagnosticCommandInfo[]" }, +{ + "name":"io.seqera.tower.agent.Agent", + "methods":[{"name":"main","parameterTypes":["java.lang.String[]"] }] +}, { "name":"java.lang.ClassLoader", - "methods":[{"name":"getPlatformClassLoader","parameterTypes":[] }] + "methods":[ + {"name":"getPlatformClassLoader","parameterTypes":[] }, + {"name":"loadClass","parameterTypes":["java.lang.String"] } + ] }, { "name":"java.util.Arrays", "methods":[{"name":"asList","parameterTypes":["java.lang.Object[]"] }] }, +{ + "name":"jdk.internal.loader.ClassLoaders$PlatformClassLoader" +}, { "name":"sun.management.VMManagementImpl", "fields":[ diff --git a/conf/reflect-config.json b/conf/reflect-config.json index feb85fb..d888a79 100644 --- a/conf/reflect-config.json +++ b/conf/reflect-config.json @@ -321,6 +321,9 @@ "name":"io.micronaut.http.client.netty.DefaultHttpClient$11", "methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }] }, +{ + "name":"io.micronaut.http.client.netty.DefaultHttpClient$5" +}, { "name":"io.micronaut.http.client.netty.DefaultHttpClient$HttpClientInitializer" }, @@ -339,6 +342,14 @@ "name":"io.micronaut.http.client.netty.ssl.$NettyClientSslBuilder$Definition$Reference", "methods":[{"name":"","parameterTypes":[] }] }, +{ + "name":"io.micronaut.http.client.netty.websocket.NettyWebSocketClientHandler", + "methods":[ + {"name":"channelActive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, + {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }, + {"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] } + ] +}, { "name":"io.micronaut.http.codec.$CodecConfiguration$Definition$Reference", "methods":[{"name":"","parameterTypes":[] }] @@ -1233,6 +1244,10 @@ {"name":"read","parameterTypes":["io.netty.channel.ChannelHandlerContext"] } ] }, +{ + "name":"io.netty.channel.ChannelHandlerAdapter", + "methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }] +}, { "name":"io.netty.channel.ChannelInboundHandlerAdapter", "methods":[ @@ -1254,6 +1269,18 @@ {"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] } ] }, +{ + "name":"io.netty.channel.ChannelOutboundHandlerAdapter", + "methods":[ + {"name":"bind","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.net.SocketAddress","io.netty.channel.ChannelPromise"] }, + {"name":"close","parameterTypes":["io.netty.channel.ChannelHandlerContext","io.netty.channel.ChannelPromise"] }, + {"name":"connect","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.net.SocketAddress","java.net.SocketAddress","io.netty.channel.ChannelPromise"] }, + {"name":"deregister","parameterTypes":["io.netty.channel.ChannelHandlerContext","io.netty.channel.ChannelPromise"] }, + {"name":"disconnect","parameterTypes":["io.netty.channel.ChannelHandlerContext","io.netty.channel.ChannelPromise"] }, + {"name":"flush","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, + {"name":"read","parameterTypes":["io.netty.channel.ChannelHandlerContext"] } + ] +}, { "name":"io.netty.channel.CombinedChannelDuplexHandler", "methods":[ @@ -1319,7 +1346,9 @@ { "name":"io.netty.handler.codec.ByteToMessageDecoder", "methods":[ + {"name":"channelInactive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }, + {"name":"channelReadComplete","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }, {"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] } ] }, @@ -1334,6 +1363,10 @@ "name":"io.netty.handler.codec.MessageToMessageDecoder", "methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }] }, +{ + "name":"io.netty.handler.codec.MessageToMessageEncoder", + "methods":[{"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }] +}, { "name":"io.netty.handler.codec.http.HttpClientCodec" }, @@ -1347,6 +1380,12 @@ { "name":"io.netty.handler.codec.http.HttpContentDecompressor" }, +{ + "name":"io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder" +}, +{ + "name":"io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder" +}, { "name":"io.netty.handler.ssl.SslHandler", "methods":[ @@ -1418,6 +1457,10 @@ { "name":"io.reactivex.Single" }, +{ + "name":"io.seqera.tower.agent.$Agent$Definition$Reference", + "methods":[{"name":"","parameterTypes":[] }] +}, { "name":"io.seqera.tower.agent.$AgentClientSocket$Intercepted$Definition$Reference", "methods":[{"name":"","parameterTypes":[] }] @@ -1428,6 +1471,18 @@ "allDeclaredMethods":true, "methods":[{"name":"","parameterTypes":[] }] }, +{ + "name":"io.seqera.tower.agent.exchange.AgentMessage", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true +}, +{ + "name":"io.seqera.tower.agent.exchange.HeartbeatMessage", + "allDeclaredFields":true, + "allDeclaredMethods":true, + "allDeclaredConstructors":true +}, { "name":"io.seqera.tower.agent.model.ServiceInfo", "allDeclaredFields":true, @@ -1449,6 +1504,10 @@ { "name":"java.io.FilePermission" }, +{ + "name":"java.io.Serializable", + "allDeclaredMethods":true +}, { "name":"java.lang.Boolean", "fields":[{"name":"TYPE"}] diff --git a/src/main/java/io/seqera/tower/agent/Agent.java b/src/main/java/io/seqera/tower/agent/Agent.java index 3d32693..1aee264 100644 --- a/src/main/java/io/seqera/tower/agent/Agent.java +++ b/src/main/java/io/seqera/tower/agent/Agent.java @@ -19,6 +19,8 @@ import io.micronaut.rxjava2.http.client.websockets.RxWebSocketClient; import io.micronaut.scheduling.TaskScheduler; import io.micronaut.websocket.exceptions.WebSocketClientException; +import io.seqera.tower.agent.exchange.CommandRequest; +import io.seqera.tower.agent.exchange.CommandResponse; import io.seqera.tower.agent.exchange.HeartbeatMessage; import io.seqera.tower.agent.model.ServiceInfoResponse; import io.seqera.tower.agent.utils.VersionProvider; @@ -28,12 +30,16 @@ import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.lang.module.ModuleDescriptor; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Command( name = "tw-agent", @@ -51,6 +57,7 @@ optionListHeading = "%nOptions:%n" ) public class Agent implements Runnable { + public static final int HEARTBEAT_MINUTES_INTERVAL = 1; private static Logger logger = LoggerFactory.getLogger(Agent.class); @Parameters(index = "0", paramLabel = "AGENT_CONNECTION_ID", description = "Agent connection ID to identify this agent", arity = "1") @@ -75,27 +82,72 @@ public static void main(String[] args) throws Exception { @Override public void run() { + try { + checkTower(); + connectTower(); + sendPeriodicHeartbeat(); + } catch (Exception e) { + logger.error(e.getMessage()); + System.exit(-1); + } + } - checkTower(); - - final URI uri; + /** + * Connect the agent to Tower using websockets + */ + private void connectTower() { try { - uri = new URI(url + "/agent/" + agentKey + "/connect"); + final URI uri = new URI(url + "/agent/" + agentKey + "/connect"); final MutableHttpRequest req = HttpRequest.GET(uri).bearerAuth(token); final RxWebSocketClient webSocketClient = ctx.getBean(RxWebSocketClient.class); - agentClient = webSocketClient.connect(AgentClientSocket.class, req).blockingFirst(); - logger.info("Connected"); - - sendPeriodicHeartbeat(); + agentClient = webSocketClient.connect(AgentClientSocket.class, req) + .timeout(5, TimeUnit.SECONDS) + .blockingFirst(); + agentClient.setConnectCallback(this::connectTower); + agentClient.setCommandRequestCallback(this::execCommand); } catch (URISyntaxException e) { - logger.error(String.format("Invalid URI: %s/agent/%s/connect - %s", url, agentKey, e.getMessage())); - System.exit(-1); + logger.error("Invalid URI: {}/agent/{}/connect - {}", url, agentKey, e.getMessage()); } catch (WebSocketClientException e) { - logger.error(String.format("Connection error - %s", e.getMessage())); - System.exit(-1); - } catch (Throwable e) { - e.printStackTrace(); - System.exit(-1); + logger.error("Connection error - {}", e.getMessage()); + } catch (Exception e) { + if (e.getCause() instanceof TimeoutException) { + logger.error("Connection timeout [trying to reconnect in {} minutes]", HEARTBEAT_MINUTES_INTERVAL); + } else { + logger.error("Unknown problem"); + e.printStackTrace(); + } + } + } + + /** + * Executes a command request and sends the response back to Tower + * + * @param message Command request message + */ + private void execCommand(CommandRequest message) { + try { + logger.info("Execute: {}", message.getCommand()); + Process process = new ProcessBuilder().command("sh", "-c", message.getCommand()).start(); + int exitStatus = process.waitFor(); + // read the stdout + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + StringBuilder builder = new StringBuilder(); + String line = null; + while ((line = reader.readLine()) != null) { + builder.append(line); + builder.append("\n"); + } + String result = builder.toString(); + + // send result + CommandResponse response = new CommandResponse(message.getId(), result.getBytes(), exitStatus); + logger.info("Sending response --> {}", response); + agentClient.send(response); + logger.info("Response sent"); + } catch (Exception e) { + // send result + CommandResponse response = new CommandResponse(message.getId(), e.getMessage().getBytes(), -1); + agentClient.send(response); } } @@ -104,10 +156,15 @@ public void run() { */ private void sendPeriodicHeartbeat() { TaskScheduler scheduler = ctx.getBean(TaskScheduler.class); - - scheduler.scheduleAtFixedRate(null, Duration.ofMinutes(1), () -> { - System.out.println("Sending heartbeat"); - agentClient.send(new HeartbeatMessage()); + Duration interval = Duration.ofMinutes(HEARTBEAT_MINUTES_INTERVAL); + scheduler.scheduleAtFixedRate(interval, interval, () -> { + if (agentClient.isOpen()) { + logger.info("Sending heartbeat"); + agentClient.send(new HeartbeatMessage()); + } else { + logger.info("Trying to reconnect"); + connectTower(); + } }); } @@ -150,6 +207,12 @@ private void checkTower() { } } + /** + * Minimum API required version + * + * @return Required API version + * @throws IOException On reading properties file + */ private String getVersionApi() throws IOException { Properties properties = new Properties(); properties.load(this.getClass().getResourceAsStream("/META-INF/build-info.properties")); diff --git a/src/main/java/io/seqera/tower/agent/AgentClientSocket.java b/src/main/java/io/seqera/tower/agent/AgentClientSocket.java index 08ec7e0..aca5b0b 100644 --- a/src/main/java/io/seqera/tower/agent/AgentClientSocket.java +++ b/src/main/java/io/seqera/tower/agent/AgentClientSocket.java @@ -11,7 +11,7 @@ package io.seqera.tower.agent; -import io.micronaut.http.HttpRequest; +import io.micronaut.websocket.CloseReason; import io.micronaut.websocket.WebSocketSession; import io.micronaut.websocket.annotation.ClientWebSocket; import io.micronaut.websocket.annotation.OnClose; @@ -19,40 +19,46 @@ import io.micronaut.websocket.annotation.OnOpen; import io.seqera.tower.agent.exchange.AgentMessage; import io.seqera.tower.agent.exchange.CommandRequest; -import io.seqera.tower.agent.exchange.CommandResponse; import io.seqera.tower.agent.exchange.HeartbeatMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.InputStreamReader; import java.time.Duration; import java.time.Instant; +import java.util.function.Consumer; /** * @author Jordi Deu-Pons */ @ClientWebSocket abstract class AgentClientSocket implements AutoCloseable { + private static Logger logger = LoggerFactory.getLogger(AgentClientSocket.class); private WebSocketSession session; - private Instant openingTime; + // Callback to reconnect the agent + private Runnable connectCallback; + + // Callback to manage a command request + private Consumer commandRequestCallback; + @OnOpen - void onOpen(WebSocketSession session, HttpRequest request) { + void onOpen(WebSocketSession session) { this.session = session; this.openingTime = Instant.now(); - System.out.println("Client opened connection"); + logger.info("Client opened connection"); } @OnMessage void onMessage(AgentMessage message) { if (message instanceof HeartbeatMessage) { - System.out.println("Received heartbeat"); + logger.info("Received heartbeat"); return; } - if (message instanceof CommandRequest) { - execCommand((CommandRequest) message); + if (message instanceof CommandRequest && commandRequestCallback != null) { + commandRequestCallback.accept((CommandRequest) message); return; } @@ -60,38 +66,42 @@ void onMessage(AgentMessage message) { } @OnClose - void onClose() { - if (openingTime != null) { - System.out.println("Closed after " + Duration.between(openingTime, Instant.now())); + void onClose(CloseReason reason) { + + // Duplicated agent + if (reason.getCode() == 4000) { + logger.error("There is an active agent for this user and connection ID. Please close it before starting a new one."); + System.exit(-1); } - } - private void execCommand(CommandRequest message) { - try { - System.out.println("Execute command: " + message.getCommand()); - Process process = new ProcessBuilder().command("sh", "-c", message.getCommand()).start(); - int exitStatus = process.waitFor(); - // read the stdout - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - StringBuilder builder = new StringBuilder(); - String line = null; - while ((line = reader.readLine()) != null) { - builder.append(line); - builder.append("\n"); + if (reason.getCode() == 4001) { + logger.info("Closing to reauthenticate the session"); + if (connectCallback != null) { + connectCallback.run(); } - String result = builder.toString(); - - // send result - CommandResponse response = new CommandResponse(message.getId(), result.getBytes(), exitStatus); - System.out.println("Sending response --> " + response); - session.sendSync(response); - System.out.println("Response sent"); - } catch (Throwable e) { - // send result - CommandResponse response = new CommandResponse(message.getId(), e.getMessage().getBytes(), -1); - session.sendSync(response); + return; + } + + if (openingTime != null) { + Duration d = Duration.between(openingTime, Instant.now()); + String duration = String.format("%sd %sh %sm %ss", d.toDaysPart(), d.toHoursPart(), d.toMinutesPart(), d.toSecondsPart()); + logger.info("Closed after {}. [trying to reconnect in {} minutes]", duration, Agent.HEARTBEAT_MINUTES_INTERVAL); } } abstract void send(AgentMessage message); + + public boolean isOpen() { + return session.isOpen(); + } + + public void setConnectCallback(Runnable connectCallback) { + this.connectCallback = connectCallback; + } + + public void setCommandRequestCallback(Consumer callback) { + this.commandRequestCallback = callback; + } + + }