Skip to content

Commit

Permalink
[INLONG-10035][Agent] Report audit data using the new SDK interface (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Apr 22, 2024
1 parent fc5ce9d commit cd2a7a8
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;

/**
* Handle List of Proxy Message, which belong to the same stream id.
Expand Down Expand Up @@ -77,6 +78,7 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
dataTime = instanceProfile.getSinkDataTime();
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
extraMap.put(AUDIT_VERSION, taskId);
}

public void generateExtraMap(String dataKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_KEY_PROXYS;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_ENABLE;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AUDIT_PROXYS;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;

/**
* AuditUtils
Expand All @@ -42,6 +44,7 @@ public class AuditUtils {
public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
public static final int AUDIT_ID_AGENT_READ_FAILED = 10003;
public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002;
Expand All @@ -54,6 +57,7 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
public static final int AUDIT_ID_AGENT_READ_FAILED_REAL_TIME = 30012;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
public static final int AUDIT_ID_AGENT_TRY_SEND = 30020;
Expand Down Expand Up @@ -93,11 +97,17 @@ public static void initAudit() {
* Add audit metric
*/
public static void add(int auditID, String inlongGroupId, String inlongStreamId,
long logTime, int count, long size) {
long logTime, int count, long size, long version) {
if (!IS_AUDIT) {
return;
}
AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size);
AuditOperator.getInstance()
.add(auditID, DEFAULT_AUDIT_TAG, inlongGroupId, inlongStreamId, logTime, count, size, version);
}

public static void add(int auditID, String inlongGroupId, String inlongStreamId,
long logTime, int count, long size) {
add(auditID, inlongGroupId, inlongStreamId, logTime, count, size, DEFAULT_AUDIT_VERSION);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class InstanceManager extends AbstractDaemon {
private final int instanceLimit;
private final AgentConfiguration agentConf;
private final String taskId;
private long auditVersion;
private volatile boolean runAtLeastOneTime = false;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
Expand Down Expand Up @@ -122,6 +123,7 @@ public String toString() {
*/
public InstanceManager(String taskId, int instanceLimit, Db basicDb, TaskProfileDb taskProfileDb) {
this.taskId = taskId;
this.auditVersion = Long.parseLong(taskId);
instanceDb = new InstanceDb(basicDb);
this.taskProfileDb = taskProfileDb;
this.agentConf = AgentConfiguration.getAgentConf();
Expand Down Expand Up @@ -171,7 +173,7 @@ private Runnable coreThread() {
String inlongGroupId = taskFromDb.getInlongGroupId();
String inlongStreamId = taskFromDb.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId,
AgentUtils.getCurrentTime(), 1, 1);
AgentUtils.getCurrentTime(), 1, 1, auditVersion);
} catch (Throwable ex) {
LOGGER.error("coreThread error: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
Expand Down Expand Up @@ -387,7 +389,7 @@ private void deleteFromDb(String instanceId) {
LOGGER.info("delete instance from db: taskId {} instanceId {} result {}", taskId,
instanceId, instanceDb.getInstance(taskId, instanceId));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, inlongGroupId, inlongStreamId,
profile.getSinkDataTime(), 1, 1);
profile.getSinkDataTime(), 1, 1, auditVersion);
}

private void deleteFromMemory(String instanceId) {
Expand All @@ -403,7 +405,7 @@ private void deleteFromMemory(String instanceId) {
instanceMap.remove(instanceId);
LOGGER.info("delete instance from memory: taskId {} instanceId {}", taskId, instance.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM, inlongGroupId, inlongStreamId,
instance.getProfile().getSinkDataTime(), 1, 1);
instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
}

private void addToDb(InstanceProfile profile, boolean addNew) {
Expand All @@ -413,7 +415,7 @@ private void addToDb(InstanceProfile profile, boolean addNew) {
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB, inlongGroupId, inlongStreamId,
profile.getSinkDataTime(), 1, 1);
profile.getSinkDataTime(), 1, 1, auditVersion);
}
}

Expand All @@ -430,7 +432,7 @@ private void addToMemory(InstanceProfile instanceProfile) {
LOGGER.error("old instance {} should not exist, try stop it first",
instanceProfile.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
Expand All @@ -445,12 +447,12 @@ private void addToMemory(InstanceProfile instanceProfile) {
instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(),
EXECUTOR_SERVICE.getActiveCount());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
} else {
LOGGER.error(
"add instance to memory init failed instanceId {}", instance.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
} catch (Throwable t) {
LOGGER.error("add instance error {}", t.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ public abstract class CommonInstance extends Instance {
private volatile int checkFinishCount = 0;
private int heartbeatcheckCount = 0;
private long heartBeatStartTime = AgentUtils.getCurrentTime();
protected long auditVersion;

@Override
public boolean init(Object srcManager, InstanceProfile srcProfile) {
try {
instanceManager = (InstanceManager) srcManager;
profile = srcProfile;
auditVersion = Long.parseLong(getTaskId());
setInodeInfo(profile);
LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(),
profile.getInstanceId(), profile.toJsonStr());
Expand Down Expand Up @@ -153,7 +155,7 @@ private void doRun() {
private void heartbeatStatic() {
if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(),
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1);
profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion);
heartbeatcheckCount = 0;
heartBeatStartTime = AgentUtils.getCurrentTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ public class SenderManager {
private volatile boolean resendRunning = false;
private volatile boolean started = false;
private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
private long auditVersion;

public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) {
this.profile = profile;
auditVersion = Long.parseLong(profile.getTaskId());
managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
proxySend = profile.getBoolean(TASK_PROXY_SEND, DEFAULT_TASK_PROXY_SEND);
totalAsyncBufSize = profile
Expand Down Expand Up @@ -233,10 +235,10 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
AgentSenderCallback cb = new AgentSenderCallback(message, retry);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, message.getGroupId(),
message.getStreamId(), message.getDataTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(),
message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend);
Expand All @@ -246,10 +248,10 @@ private void sendBatchWithRetryCount(SenderMessage message, int retry) {
} catch (Exception exception) {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, message.getGroupId(),
message.getStreamId(), message.getDataTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
suc = false;
if (retry > maxSenderRetry) {
if (retry % 10 == 0) {
Expand Down Expand Up @@ -291,10 +293,10 @@ private Runnable flushResendQueue() {
SenderMessage message = callback.message;
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(),
message.getStreamId(), message.getDataTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, message.getGroupId(),
message.getStreamId(), AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
message.getTotalSize(), auditVersion);
sendBatchWithRetryCount(callback.message, callback.retry + 1);
}
} catch (Exception ex) {
Expand Down Expand Up @@ -353,18 +355,18 @@ public void onMessageAck(SendResult result) {
message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId,
dataTime, message.getMsgCnt(), message.getTotalSize());
dataTime, message.getMsgCnt(), message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId, streamId,
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize());
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize(), auditVersion);
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result);
getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(msgCnt);
putInResendQueue(new AgentSenderCallback(message, retry));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId, streamId,
dataTime, message.getMsgCnt(), message.getTotalSize());
dataTime, message.getMsgCnt(), message.getTotalSize(), auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, groupId, streamId,
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize());
AgentUtils.getCurrentTime(), message.getMsgCnt(), message.getTotalSize(), auditVersion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,16 @@ private long readLines(RandomAccessFile reader, long pos, List<byte[]> lines, in
if (overLen) {
LOGGER.warn("readLines over len finally string len {}",
new String(baos.toByteArray()).length());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize);
long auditTime = 0;
if (isRealTime) {
auditTime = AgentUtils.getCurrentTime();
} else {
auditTime = profile.getSinkDataTime();
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED, inlongGroupId, inlongStreamId,
auditTime, 1, maxPackSize, auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_FAILED_REAL_TIME, inlongGroupId,
inlongStreamId, AgentUtils.getCurrentTime(), 1, maxPackSize, auditVersion);
}
baos.reset();
overLen = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ protected class SourceData {
protected volatile boolean runnable = true;
protected volatile boolean running = false;
protected String taskId;
protected long auditVersion;
protected String instanceId;
protected InstanceProfile profile;
private ExtendedHandler extendedHandler;
private boolean isRealTime = false;
protected boolean isRealTime = false;
protected volatile long emptyCount = 0;
protected int maxPackSize;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
Expand All @@ -118,6 +119,7 @@ protected class SourceData {
public void init(InstanceProfile profile) {
this.profile = profile;
taskId = profile.getTaskId();
auditVersion = Long.parseLong(taskId);
instanceId = profile.getInstanceId();
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
Expand Down Expand Up @@ -333,9 +335,9 @@ private Message createMessage(SourceData sourceData) {
auditTime = profile.getSinkDataTime();
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
auditTime, 1, sourceData.getData().length);
auditTime, 1, sourceData.getData().length, auditVersion);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
AgentUtils.getCurrentTime(), 1, sourceData.getData().length);
AgentUtils.getCurrentTime(), 1, sourceData.getData().length, auditVersion);
Message finalMsg = new DefaultMessage(sourceData.getData(), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ public abstract class AbstractTask extends Task {
protected volatile boolean running = false;
protected boolean initOK = false;
protected long lastPrintTime = 0;
protected long auditVersion;

@Override
public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) throws IOException {
taskManager = (TaskManager) srcManager;
this.taskProfile = taskProfile;
this.basicDb = basicDb;
auditVersion = Long.parseLong(taskProfile.getTaskId());
instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
basicDb, taskManager.getTaskDb());
try {
Expand Down Expand Up @@ -132,7 +134,7 @@ protected void doRun() {

protected void taskHeartbeat() {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, taskProfile.getInlongGroupId(),
taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1);
taskProfile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1, auditVersion);

}

Expand Down

0 comments on commit cd2a7a8

Please sign in to comment.