Skip to content

Commit

Permalink
[INLONG-10096][Manager] Add doc
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Apr 28, 2024
1 parent 67f9bec commit 17fd548
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest;
import org.apache.inlong.manager.service.cmd.shell.ShellExecutor;
import org.apache.inlong.manager.service.cmd.shell.SimpleTracker;
import org.apache.inlong.manager.service.cmd.shell.ShellExecutorImpl;
import org.apache.inlong.manager.service.cmd.shell.ShellTrackerImpl;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,19 +54,19 @@ private String join(String... strings) {

@Override
public CommandResult exec(String cmd) throws Exception {
SimpleTracker shellTracker = new SimpleTracker();
ShellExecutor shellExecutor = new ShellExecutor(shellTracker);
ShellTrackerImpl shellTracker = new ShellTrackerImpl();
ShellExecutorImpl shellExecutor = new ShellExecutorImpl(shellTracker);
shellExecutor.syncExec("sh", "-c", cmd);
String cmdMsg = join("sh", "-c", cmd);
LOG.debug("run command : " + cmdMsg);
CommandResult commandResult = new CommandResult();
commandResult.setCode(shellTracker.getCode());
commandResult.setStdout(join(shellTracker.getResult()));
commandResult.setResult(join(shellTracker.getResult()));
if (commandResult.getCode() != 0) {
throw new Exception("command " + cmdMsg + " exec failed, code = " +
commandResult.getCode() + ", output = " + commandResult.getStdout());
commandResult.getCode() + ", output = " + commandResult.getResult());
}
LOG.debug(commandResult.getStdout());
LOG.debug(commandResult.getResult());
return commandResult;
}

Expand All @@ -83,19 +83,19 @@ public CommandResult execRemote(AgentClusterNodeRequest clusterNodeRequest, Stri
String cmdMsg = join(cmdShell, ip, user, password, remoteCommandTimeout, cmd, port);
LOG.info("run remote command : " + cmdMsg);

SimpleTracker shellTracker = new SimpleTracker();
ShellExecutor shellExecutor = new ShellExecutor(shellTracker);
ShellTrackerImpl shellTracker = new ShellTrackerImpl();
ShellExecutorImpl shellExecutor = new ShellExecutorImpl(shellTracker);
shellExecutor.syncExec(cmdShell, ip, user, password, remoteCommandTimeout, cmd, port);

CommandResult commandResult = new CommandResult();
commandResult.setCode(shellTracker.getCode());
commandResult.setStdout(join(shellTracker.getResult()));
commandResult.setResult(join(shellTracker.getResult()));

LOG.debug(commandResult.getStdout());
LOG.debug(commandResult.getResult());
if (commandResult.getCode() != 0) {
throw new Exception(
"remote command " + cmdMsg + " exec failed, code = " + commandResult.getCode() + ", output = "
+ commandResult.getStdout());
+ commandResult.getResult());
}
return commandResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.inlong.manager.service.cmd;

/**
* Created by florianfan on 2015/11/9.
* Command result
*/
public class CommandResult {

private int code = 0;
private String stdout;
private String stderr;
private String result;
private String errMsg;

public int getCode() {
return code;
Expand All @@ -34,28 +34,28 @@ public void setCode(int code) {
this.code = code;
}

public String getStdout() {
return stdout;
public String getResult() {
return result;
}

public void setStdout(String stdout) {
this.stdout = stdout;
public void setResult(String result) {
this.result = result;
}

public String getStderr() {
return stderr;
public String getErrMsg() {
return errMsg;
}

public void setStderr(String stderr) {
this.stderr = stderr;
public void setErrMsg(String errMsg) {
this.errMsg = errMsg;
}

@Override
public String toString() {
return "CommandResult{" +
"code=" + code +
", stdout='" + stdout + '\'' +
", stderr='" + stderr + '\'' +
", stdout='" + result + '\'' +
", stderr='" + errMsg + '\'' +
'}';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,215 +17,14 @@

package org.apache.inlong.manager.service.cmd.shell;

import lombok.extern.slf4j.Slf4j;
public interface ShellExecutor {

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Execute shell commands
*
* @param shellPath shell path
* @param params params
*/
public void syncExec(String shellPath, String... params);

@Slf4j
public class ShellExecutor implements IExecutor {

private static final String[] EXCEPTION_REG = new String[]{"(.*)Caused by: (.*)Exception(.*)",
"(.*)java.net.UnknownHostException(.*)",
"(.*)Copy failed: java.io.IOException: Job failed!(.*)"};
private ShellTracker tracker;

public ShellExecutor(ShellTracker tracker) {
this.tracker = tracker;
}

private static long getPid(Process process) {
try {
Field f = process.getClass().getDeclaredField("pid");
f.setAccessible(true);
long procHandle = f.getLong(process);
return procHandle;
} catch (Exception e) {
e.printStackTrace();
return -1;
}
}

private static String[] merge(String shellPath, String[] paths) {
List<String> cmds = new ArrayList<String>();
cmds.add(shellPath);
for (String path : paths) {
if (path.equals("")) {
continue;
}
cmds.add(path);
}
String[] strings = new String[cmds.size()];
cmds.toArray(strings);
return strings;
}

private static String arrayToString(Object[] array, String split) {
if (array == null || array.length == 0) {
return "";
}
StringBuilder str = new StringBuilder();
for (int i = 0, length = array.length; i < length; i++) {
if (i != 0) {
str.append(split);
}
str.append(array[i]);
}
return str.toString();
}

private static boolean HasException(String str) {
for (String reg : EXCEPTION_REG) {
Pattern pattern = Pattern.compile(reg);
Matcher matcher = pattern.matcher(str);
if (matcher.find()) {
return true;
}
}
return false;
}

public void asynExec(String shellPath, String... params) {
this.tracker.beforeStart();
AsyncShellRunnable asyncShell = new AsyncShellRunnable(shellPath, this.tracker, params);
Thread thread = new Thread(asyncShell);
thread.start();
}

public void syncExec(String shellPath, String... params) {
List<String> result = new ArrayList<String>();
String[] cmds = merge(shellPath, params);
try {
tracker.start();
Process ps = Runtime.getRuntime().exec(cmds);
long pid = getPid(ps);
tracker.setProcessId(pid);
BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
String line;
boolean hasException = false;
while ((line = br.readLine()) != null) {
if (HasException(line)) {
hasException = true;
}
result.add(line);
tracker.setRunInfo(arrayToString(result.toArray(), "\n"));
tracker.lineChange(line);
}
if (hasException) {
tracker.lineChange("Java exception exist in output");
tracker.fail(-1);
return;
}
ps.waitFor();
int exitValue = ps.exitValue();
if (exitValue != 0) {
tracker.fail(exitValue);
return;
}
tracker.success();
} catch (Exception e) {
e.printStackTrace();
result.add(e.getMessage());
tracker.setRunInfo(arrayToString(result.toArray(), "\n"));
tracker.lineChange(e.getMessage());
tracker.fail(-1);
}
}

public void syncScriptExec(String script, String[] envConfig) {
List<String> result = new ArrayList<String>();
try {
tracker.start();
Process ps = Runtime.getRuntime().exec("bash +x " + script, envConfig);
long pid = getPid(ps);
tracker.setProcessId(pid);
BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
String line;
boolean hasException = false;
while ((line = br.readLine()) != null) {
if (HasException(line)) {
hasException = true;
}
result.add(line);
tracker.setRunInfo(arrayToString(result.toArray(), "\n"));
tracker.lineChange(line);
}
if (hasException) {
tracker.lineChange("Java exception exist in output");
tracker.fail(-1);
return;
}
ps.waitFor();
int exitValue = ps.exitValue();
if (exitValue != 0) {
tracker.fail(exitValue);
return;
}
tracker.success();
} catch (Exception e) {
e.printStackTrace();
result.add(e.getMessage());
tracker.setRunInfo(arrayToString(result.toArray(), "\n"));
tracker.lineChange(e.getMessage());
tracker.fail(-1);
}
}

public static class AsyncShellRunnable implements Runnable {

private String shellPath;
private String[] params;
private List<String> result = new ArrayList<String>();
private ShellTracker tracker;

public AsyncShellRunnable(String shellPath, ShellTracker tracker, String... params) {
this.shellPath = shellPath;
this.params = params;
this.tracker = tracker;
}

public void run() {
String[] cmds = merge(shellPath, params);
try {
tracker.start();
Process ps = Runtime.getRuntime().exec(cmds);
long pid = getPid(ps);
tracker.setProcessId(pid);
BufferedReader br = new BufferedReader(new InputStreamReader(ps.getInputStream()));
String line;
boolean hasException = false;
while ((line = br.readLine()) != null) {
if (HasException(line)) {
hasException = true;
}
result.add(line);
tracker.setRunInfo(arrayToString(result.toArray(), "\n"));
tracker.lineChange(line);
}
if (hasException) {
tracker.lineChange("Java exception exist in output");
tracker.fail(-1);
return;
}
ps.waitFor();
int exitValue = ps.exitValue();
if (exitValue != 0) {
tracker.fail(exitValue);
return;
}
tracker.success();
} catch (Exception e) {
e.printStackTrace();
result.add(e.getMessage());
tracker.setRunInfo(arrayToString(result.toArray(), "\n"));
tracker.lineChange(e.getMessage());
tracker.fail(-1);
}
}
}
}
Loading

0 comments on commit 17fd548

Please sign in to comment.