Skip to content

Commit

Permalink
[INLONG-9415][Agent] Increase audit reports related to instance maint…
Browse files Browse the repository at this point in the history
…enance (#9416)
  • Loading branch information
justinwwhuang authored Dec 5, 2023
1 parent a7cfe7d commit f01af15
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@

import java.util.List;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC;
Expand Down Expand Up @@ -105,6 +109,14 @@ public String getPredefineFields() {
return get(TaskConstants.PREDEFINE_FIELDS, "");
}

public String getInlongGroupId() {
return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
}

public String getInlongStreamId() {
return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
}

@Override
public boolean allRequiredKeyExist() {
return hasKey(TaskConstants.FILE_UPDATE_TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48;
public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028;

private static boolean IS_AUDIT = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.InstanceDb;
import org.apache.inlong.agent.db.TaskProfileDb;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Instance;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
Expand Down Expand Up @@ -218,7 +219,7 @@ private void doCleanDbInstance(List<InstanceProfile> instances) {
LOGGER.info("instance has expired, delete from db dataTime {} taskId {} instanceId {}",
instanceFromDb.getSourceDataTime(), instanceFromDb.getTaskId(),
instanceFromDb.getInstanceId());
instanceDb.deleteInstance(instanceFromDb.getTaskId(), instanceFromDb.getInstanceId());
deleteFromDb(instanceFromDb.getInstanceId());
iterator.remove();
}
}
Expand Down Expand Up @@ -347,14 +348,14 @@ private void addInstance(InstanceProfile profile) {
profile.getInstanceId());
return;
}
addToDb(profile);
addToDb(profile, true);
addToMemory(profile);
}

private void finishInstance(InstanceProfile profile) {
profile.setState(InstanceStateEnum.FINISHED);
profile.setModifyTime(AgentUtils.getCurrentTime());
addToDb(profile);
addToDb(profile, false);
deleteFromMemory(profile.getInstanceId());
LOGGER.info("finished instance state {} taskId {} instanceId {}", profile.getState(),
profile.getTaskId(), profile.getInstanceId());
Expand All @@ -366,9 +367,14 @@ private void deleteInstance(String instanceId) {
}

private void deleteFromDb(String instanceId) {
InstanceProfile profile = instanceDb.getInstance(taskId, instanceId);
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
instanceDb.deleteInstance(taskId, instanceId);
LOGGER.info("delete instance from db: taskId {} instanceId {} result {}", taskId,
instanceId, instanceDb.getInstance(taskId, instanceId));
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, inlongGroupId, inlongStreamId,
profile.getSinkDataTime(), 1, 1);
}

private void deleteFromMemory(String instanceId) {
Expand All @@ -378,26 +384,40 @@ private void deleteFromMemory(String instanceId) {
instanceId);
return;
}
String inlongGroupId = instance.getProfile().getInlongGroupId();
String inlongStreamId = instance.getProfile().getInlongStreamId();
instance.destroy();
instanceMap.remove(instanceId);
LOGGER.info("delete instance from memory: taskId {} instanceId {}", taskId, instance.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM, inlongGroupId, inlongStreamId,
instance.getProfile().getSinkDataTime(), 1, 1);
}

private void addToDb(InstanceProfile profile) {
private void addToDb(InstanceProfile profile, boolean addNew) {
LOGGER.info("add instance to db state {} instanceId {}", profile.getState(), profile.getInstanceId());
instanceDb.storeInstance(profile);
if (addNew) {
String inlongGroupId = profile.getInlongGroupId();
String inlongStreamId = profile.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB, inlongGroupId, inlongStreamId,
profile.getSinkDataTime(), 1, 1);
}
}

/**
* add instance to memory, if there is a record refer to the instance id exist we need to destroy it first.
*/
private void addToMemory(InstanceProfile instanceProfile) {
String inlongGroupId = instanceProfile.getInlongGroupId();
String inlongStreamId = instanceProfile.getInlongStreamId();
Instance oldInstance = instanceMap.get(instanceProfile.getInstanceId());
if (oldInstance != null) {
oldInstance.destroy();
instanceMap.remove(instanceProfile.getInstanceId());
LOGGER.error("old instance {} should not exist, try stop it first",
instanceProfile.getInstanceId());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
}
LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
try {
Expand All @@ -410,6 +430,8 @@ private void addToMemory(InstanceProfile instanceProfile) {
"add instance to memory instanceId {} instanceMap size {}, runningPool instance total {}, runningPool instance active {}",
instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(),
EXECUTOR_SERVICE.getActiveCount());
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, inlongGroupId, inlongStreamId,
instanceProfile.getSinkDataTime(), 1, 1);
} catch (Throwable t) {
LOGGER.error("add instance error {}", t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
Expand Down Expand Up @@ -72,8 +68,8 @@ public void setSourceName(String sourceFileName) {
public void init(InstanceProfile profile) {
this.profile = profile;
jobInstanceId = profile.getInstanceId();
inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
cache = new ProxyMessageCache(this.profile, inlongGroupId, inlongStreamId);
batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
Expand All @@ -48,8 +44,8 @@ public abstract class AbstractSource implements Source {

@Override
public void init(InstanceProfile profile) {
inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();
// register metric
this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
Expand All @@ -59,8 +55,8 @@ public abstract class AbstractReader implements Reader {

@Override
public void init(InstanceProfile profile) {
inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
inlongGroupId = profile.getInlongGroupId();
inlongStreamId = profile.getInlongStreamId();

this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
Expand Down

0 comments on commit f01af15

Please sign in to comment.