diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java index 08a7b10d33..9216461579 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java @@ -39,6 +39,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS; +import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION; /** * Handle List of Proxy Message, which belong to the same stream id. @@ -77,6 +78,7 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String dataTime = instanceProfile.getSinkDataTime(); extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false"); extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields())); + extraMap.put(AUDIT_VERSION, taskId); } public void generateExtraMap(String dataKey) { diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index e4cc7691c8..7b5edb5a0b 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -30,6 +30,8 @@ import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_KEY_PROXYS; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_PROXYS; +import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; +import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION; /** * AuditUtils @@ -42,6 +44,7 @@ public class AuditUtils { public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000; public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3; public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4; + public static final int AUDIT_ID_AGENT_READ_FAILED = 10003; public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004; public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001; public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002; @@ -54,6 +57,7 @@ public class AuditUtils { public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009; public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010; public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011; + public static final int AUDIT_ID_AGENT_READ_FAILED_REAL_TIME = 30012; public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013; public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014; public static final int AUDIT_ID_AGENT_TRY_SEND = 30020; @@ -93,11 +97,17 @@ public static void initAudit() { * Add audit metric */ public static void add(int auditID, String inlongGroupId, String inlongStreamId, - long logTime, int count, long size) { + long logTime, int count, long size, long version) { if (!IS_AUDIT) { return; } - AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size); + AuditOperator.getInstance() + .add(auditID, DEFAULT_AUDIT_TAG, inlongGroupId, inlongStreamId, logTime, count, size, version); + } + + public static void add(int auditID, String inlongGroupId, String inlongStreamId, + long logTime, int count, long size) { + add(auditID, inlongGroupId, inlongStreamId, logTime, count, size, DEFAULT_AUDIT_VERSION); } /** diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 2c9bd721d8..5545039f5e 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -80,6 +80,7 @@ public class InstanceManager extends AbstractDaemon { private final int instanceLimit; private final AgentConfiguration agentConf; private final String taskId; + private long auditVersion; private volatile boolean runAtLeastOneTime = false; private volatile boolean running = false; private final double reserveCoefficient = 0.8; @@ -122,6 +123,7 @@ public String toString() { */ public InstanceManager(String taskId, int instanceLimit, Db basicDb, TaskProfileDb taskProfileDb) { this.taskId = taskId; + this.auditVersion = Long.parseLong(taskId); instanceDb = new InstanceDb(basicDb); this.taskProfileDb = taskProfileDb; this.agentConf = AgentConfiguration.getAgentConf(); @@ -171,7 +173,7 @@ private Runnable coreThread() { String inlongGroupId = taskFromDb.getInlongGroupId(); String inlongStreamId = taskFromDb.getInlongStreamId(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId, - AgentUtils.getCurrentTime(), 1, 1); + AgentUtils.getCurrentTime(), 1, 1, auditVersion); } catch (Throwable ex) { LOGGER.error("coreThread error: ", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); @@ -387,7 +389,7 @@ private void deleteFromDb(String instanceId) { LOGGER.info("delete instance from db: taskId {} instanceId {} result {}", taskId, instanceId, instanceDb.getInstance(taskId, instanceId)); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, inlongGroupId, inlongStreamId, - profile.getSinkDataTime(), 1, 1); + profile.getSinkDataTime(), 1, 1, auditVersion); } private void deleteFromMemory(String instanceId) { @@ -403,7 +405,7 @@ private void deleteFromMemory(String instanceId) { instanceMap.remove(instanceId); LOGGER.info("delete instance from memory: taskId {} instanceId {}", taskId, instance.getInstanceId()); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM, inlongGroupId, inlongStreamId, - instance.getProfile().getSinkDataTime(), 1, 1); + instance.getProfile().getSinkDataTime(), 1, 1, auditVersion); } private void addToDb(InstanceProfile profile, boolean addNew) { @@ -413,7 +415,7 @@ private void addToDb(InstanceProfile profile, boolean addNew) { String inlongGroupId = profile.getInlongGroupId(); String inlongStreamId = profile.getInlongStreamId(); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB, inlongGroupId, inlongStreamId, - profile.getSinkDataTime(), 1, 1); + profile.getSinkDataTime(), 1, 1, auditVersion); } } @@ -430,7 +432,7 @@ private void addToMemory(InstanceProfile instanceProfile) { LOGGER.error("old instance {} should not exist, try stop it first", instanceProfile.getInstanceId()); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL, inlongGroupId, inlongStreamId, - instanceProfile.getSinkDataTime(), 1, 1); + instanceProfile.getSinkDataTime(), 1, 1, auditVersion); } LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr()); try { @@ -445,12 +447,12 @@ private void addToMemory(InstanceProfile instanceProfile) { instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(), EXECUTOR_SERVICE.getActiveCount()); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, inlongGroupId, inlongStreamId, - instanceProfile.getSinkDataTime(), 1, 1); + instanceProfile.getSinkDataTime(), 1, 1, auditVersion); } else { LOGGER.error( "add instance to memory init failed instanceId {}", instance.getInstanceId()); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED, inlongGroupId, inlongStreamId, - instanceProfile.getSinkDataTime(), 1, 1); + instanceProfile.getSinkDataTime(), 1, 1, auditVersion); } } catch (Throwable t) { LOGGER.error("add instance error {}", t.getMessage()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index d381c88bd2..e8d848b36b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -59,12 +59,14 @@ public abstract class CommonInstance extends Instance { private volatile int checkFinishCount = 0; private int heartbeatcheckCount = 0; private long heartBeatStartTime = AgentUtils.getCurrentTime(); + protected long auditVersion; @Override public boolean init(Object srcManager, InstanceProfile srcProfile) { try { instanceManager = (InstanceManager) srcManager; profile = srcProfile; + auditVersion = Long.parseLong(getTaskId()); setInodeInfo(profile); LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(), profile.getInstanceId(), profile.toJsonStr()); @@ -153,7 +155,7 @@ private void doRun() { private void heartbeatStatic() { if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(), - profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1); + profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion); heartbeatcheckCount = 0; heartBeatStartTime = AgentUtils.getCurrentTime(); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 4e5c07ca26..0eba81e4d2 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -106,9 +106,11 @@ public class SenderManager { private volatile boolean resendRunning = false; private volatile boolean started = false; private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); + private long auditVersion; public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) { this.profile = profile; + auditVersion = Long.parseLong(profile.getTaskId()); managerAddr = agentConf.get(AGENT_MANAGER_ADDR); proxySend = profile.getBoolean(TASK_PROXY_SEND, DEFAULT_TASK_PROXY_SEND); totalAsyncBufSize = profile @@ -233,10 +235,10 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { AgentSenderCallback cb = new AgentSenderCallback(message, retry); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, message.getGroupId(), message.getStreamId(), message.getDataTime(), message.getMsgCnt(), - message.getTotalSize()); + message.getTotalSize(), auditVersion); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, message.getGroupId(), message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), - message.getTotalSize()); + message.getTotalSize(), auditVersion); asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(), message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend); @@ -246,10 +248,10 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) { } catch (Exception exception) { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, message.getGroupId(), message.getStreamId(), message.getDataTime(), message.getMsgCnt(), - message.getTotalSize()); + message.getTotalSize(), auditVersion); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, message.getGroupId(), message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), - message.getTotalSize()); + message.getTotalSize(), auditVersion); suc = false; if (retry > maxSenderRetry) { if (retry % 10 == 0) { @@ -291,10 +293,10 @@ private Runnable flushResendQueue() { SenderMessage message = callback.message; AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(), message.getStreamId(), message.getDataTime(), message.getMsgCnt(), - message.getTotalSize()); + message.getTotalSize(), auditVersion); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(), message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(), - message.getTotalSize()); + message.getTotalSize(), auditVersion); sendBatchWithRetryCount(callback.message, callback.retry + 1); } } catch (Exception ex) { @@ -353,18 +355,18 @@ public void onMessageAck(SendResult result) { message.getOffsetAckList().forEach(ack -> ack.setHasAck(true)); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, - dataTime, message.getMsgCnt(), message.getTotalSize()); + dataTime, message.getMsgCnt(), message.getTotalSize(), auditVersion); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId, streamId, - AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize()); + AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize(), auditVersion); } else { LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result); getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(msgCnt); putInResendQueue(new AgentSenderCallback(message, retry)); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId, streamId, - dataTime, message.getMsgCnt(), message.getTotalSize()); + dataTime, message.getMsgCnt(), message.getTotalSize(), auditVersion); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, groupId, streamId, - AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize()); + AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize(), auditVersion); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 09d200a742..dd6d9ce146 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -246,8 +246,16 @@ private long readLines(RandomAccessFile reader, long pos, List lines, in if (overLen) { LOGGER.warn("readLines over len finally string len {}", new String(baos.toByteArray()).length()); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, - inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize); + long auditTime = 0; + if (isRealTime) { + auditTime = AgentUtils.getCurrentTime(); + } else { + auditTime = profile.getSinkDataTime(); + } + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId, inlongStreamId, + auditTime, 1, maxPackSize, auditVersion); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId, + inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion); } baos.reset(); overLen = false; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 192756abf8..2449d065e4 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -101,10 +101,11 @@ protected class SourceData { protected volatile boolean runnable = true; protected volatile boolean running = false; protected String taskId; + protected long auditVersion; protected String instanceId; protected InstanceProfile profile; private ExtendedHandler extendedHandler; - private boolean isRealTime = false; + protected boolean isRealTime = false; protected volatile long emptyCount = 0; protected int maxPackSize; private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( @@ -118,6 +119,7 @@ protected class SourceData { public void init(InstanceProfile profile) { this.profile = profile; taskId = profile.getTaskId(); + auditVersion = Long.parseLong(taskId); instanceId = profile.getInstanceId(); inlongGroupId = profile.getInlongGroupId(); inlongStreamId = profile.getInlongStreamId(); @@ -333,9 +335,9 @@ private Message createMessage(SourceData sourceData) { auditTime = profile.getSinkDataTime(); } AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - auditTime, 1, sourceData.getData().length); + auditTime, 1, sourceData.getData().length, auditVersion); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - AgentUtils.getCurrentTime(), 1, sourceData.getData().length); + AgentUtils.getCurrentTime(), 1, sourceData.getData().length, auditVersion); Message finalMsg = new DefaultMessage(sourceData.getData(), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java index 56a786c6e5..b288ff1946 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java @@ -48,12 +48,14 @@ public abstract class AbstractTask extends Task { protected volatile boolean running = false; protected boolean initOK = false; protected long lastPrintTime = 0; + protected long auditVersion; @Override public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) throws IOException { taskManager = (TaskManager) srcManager; this.taskProfile = taskProfile; this.basicDb = basicDb; + auditVersion = Long.parseLong(taskProfile.getTaskId()); instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM), basicDb, taskManager.getTaskDb()); try { @@ -132,7 +134,7 @@ protected void doRun() { protected void taskHeartbeat() { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, taskProfile.getInlongGroupId(), - taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1); + taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion); }