Skip to content

Commit

Permalink
[INLONG-9457][Agent] Add task and instance heartbeat audit (#9458)
Browse files Browse the repository at this point in the history
* [INLONG-9457][Agent] Add task and instance heartbeat audit

* [INLONG-9457][Agent] Add task and instance heartbeat audit

(cherry picked from commit 0dac3fa)
  • Loading branch information
justinwwhuang authored and vernedeng committed Dec 11, 2023
1 parent a6ce6ef commit f4148e6
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import java.text.ParseException;
import java.util.TimeZone;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;

Expand Down Expand Up @@ -89,6 +93,14 @@ public void setTaskClass(String className) {
set(TaskConstants.TASK_CLASS, className);
}

public String getInlongGroupId() {
return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
}

public String getInlongStreamId() {
return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
}

/**
* parse json string to configuration instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,19 @@ public class AuditUtils {
public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 47;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48;
public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028;
public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 30003;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 30004;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 30005;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 30006;
public static final int AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT = 30007;
public static final int AUDIT_ID_AGENT_TASK_HEARTBEAT = 30008;
public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;

private static boolean IS_AUDIT = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class InstanceManager extends AbstractDaemon {
public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
// instance in db
private final InstanceDb instanceDb;
TaskProfileDb taskProfileDb;
private TaskProfileDb taskProfileDb;
private TaskProfile taskFromDb;
// task in memory
private final ConcurrentHashMap<String, Instance> instanceMap;
// instance profile queue.
Expand Down Expand Up @@ -161,6 +162,10 @@ private Runnable coreThread() {
cleanDbInstance();
dealWithActionQueue(actionQueue);
keepPaceWithDb();
String inlongGroupId = taskFromDb.getInlongGroupId();
String inlongStreamId = taskFromDb.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
} catch (Throwable ex) {
LOGGER.error("coreThread {}", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
Expand Down Expand Up @@ -323,6 +328,7 @@ public void waitForTerminate() {
}

private void restoreFromDb() {
taskFromDb = taskProfileDb.getTask(taskId);
List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
profileList.forEach((profile) -> {
InstanceStateEnum state = profile.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.RocksDbImp;
import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
Expand Down Expand Up @@ -197,6 +198,8 @@ private Runnable coreThread() {
printTaskDetail();
dealWithConfigQueue(configQueue);
dealWithActionQueue(actionQueue);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT, "", "",
AgentUtils.getCurrentTime(), 1, 1);
} catch (Throwable ex) {
LOGGER.error("exception caught", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.inlong.agent.core.instance.InstanceAction;
import org.apache.inlong.agent.core.instance.InstanceManager;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Sink;
Expand Down Expand Up @@ -109,6 +110,10 @@ public void run() {
checkFinishCount = 0;
}
AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
} else {
sink.write(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.core.task.file.TaskManager;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Task;
import org.apache.inlong.agent.plugin.task.filecollect.FileScanner.BasicFileInfo;
import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
Expand Down Expand Up @@ -262,6 +263,10 @@ public void run() {
} else {
runForNormal();
}
String inlongGroupId = taskProfile.getInlongGroupId();
String inlongStreamId = taskProfile.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
}
running = false;
}
Expand Down
15 changes: 14 additions & 1 deletion inlong-manager/manager-web/sql/changes-1.10.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,20 @@ VALUES ('audit_sort_mysql_binlog_input', 'MYSQL_BINLOG', 0, '29'),
('audit_sort_pulsar_input', 'PULSAR', 0, '31'),
('audit_sort_pulsar_output', 'PULSAR', 1, '32'),
('audit_sort_tube_input', 'TUBEMQ', 0, '33'),
('audit_sort_tube_output', 'TUBEMQ', 1, '34');
('audit_sort_tube_output', 'TUBEMQ', 1, '34'),
('audit_agent_sent_failed', 'AGENT', 2, '10004'),
('audit_agent_read_realtime', 'AGENT', 3, '30001'),
('audit_agent_send_realtime', 'AGENT', 4, '30002'),
('audit_agent_add_instance_mem', 'AGENT', 5, '30003'),
('audit_agent_del_instance_mem', 'AGENT', 6, '30004'),
('audit_agent_add_instance_db', 'AGENT', 7, '30005'),
('audit_agent_del_instance_db', 'AGENT', 8, '30006'),
('audit_agent_task_mgr_heartbeat', 'AGENT', 9, '30007'),
('audit_agent_task_heartbeat', 'AGENT', 10, '30008'),
('audit_agent_instance_mgr_heartbeat', 'AGENT', 11, '30009'),
('audit_agent_instance_heartbeat', 'AGENT', 12, '30010'),
('audit_agent_sent_failed_realtime', 'AGENT', 13, '30011'),
('audit_agent_del_instance_mem_unusual', 'AGENT', 14, '30014');

ALTER TABLE `operation_log`
ADD COLUMN `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id';
Expand Down

0 comments on commit f4148e6

Please sign in to comment.