diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java index db41243fbf2..acc0fbfee37 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java @@ -40,6 +40,20 @@ public InstanceDb(Db db) { this.db = db; } + /** + * list all instance from db. + * + * @return list of task + */ + public List listAllInstances() { + List result = this.db.findAll(""); + List instanceList = new ArrayList<>(); + for (KeyValueEntity entity : result) { + instanceList.add(entity.getAsInstanceProfile()); + } + return instanceList; + } + /** * get instance list from db. * diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 5545039f5e1..d388f2293e8 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -22,22 +22,18 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.InstanceDb; import org.apache.inlong.agent.db.TaskProfileDb; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.enums.InstanceStateEnum; -import org.apache.inlong.common.enums.TaskStateEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -45,7 +41,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB; @@ -57,11 +52,9 @@ public class InstanceManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(InstanceManager.class); private static final int ACTION_QUEUE_CAPACITY = 100; - public static final int CLEAN_INSTANCE_ONCE_LIMIT = 10; public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000; - public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000; - private long lastCleanTime = 0; - public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3"; + public static final int INSTANCE_PRINT_INTERVAL_MS = 10000; + private long lastPrintTime = 0; // instance in db private final InstanceDb instanceDb; private TaskProfileDb taskProfileDb; @@ -167,7 +160,7 @@ private Runnable coreThread() { while (isRunnable()) { try { AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); - cleanDbInstance(); + printInstanceState(); dealWithActionQueue(actionQueue); keepPaceWithDb(); String inlongGroupId = taskFromDb.getInlongGroupId(); @@ -184,10 +177,10 @@ private Runnable coreThread() { }; } - private void cleanDbInstance() { - if (AgentUtils.getCurrentTime() - lastCleanTime > INSTANCE_DB_CLEAN_INTERVAL_MS) { + private void printInstanceState() { + long currentTime = AgentUtils.getCurrentTime(); + if (currentTime - lastPrintTime > INSTANCE_PRINT_INTERVAL_MS) { List instances = instanceDb.getInstances(taskId); - doCleanDbInstance(instances); InstancePrintStat stat = new InstancePrintStat(); for (int i = 0; i < instances.size(); i++) { InstanceProfile instance = instances.get(i); @@ -196,45 +189,7 @@ private void cleanDbInstance() { LOGGER.info( "instanceManager running! taskId {} mem {} db total {} {} action count {}", taskId, instanceMap.size(), instances.size(), stat, actionQueue.size()); - lastCleanTime = AgentUtils.getCurrentTime(); - } - } - - private void doCleanDbInstance(List instances) { - AtomicInteger cleanCount = new AtomicInteger(); - Iterator iterator = instances.iterator(); - while (iterator.hasNext()) { - if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) { - return; - } - InstanceProfile instanceFromDb = iterator.next(); - if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) { - return; - } - TaskProfile taskFromDb = taskProfileDb.getTask(taskId); - if (taskFromDb != null) { - if (taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { - return; - } - if (taskFromDb.isRetry()) { - if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) { - return; - } - } else { - if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) { - return; - } - } - } - long expireTime = DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT + taskFromDb.getCycleUnit()); - if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > expireTime) { - cleanCount.getAndIncrement(); - LOGGER.info("instance has expired, delete from db dataTime {} taskId {} instanceId {}", - instanceFromDb.getSourceDataTime(), instanceFromDb.getTaskId(), - instanceFromDb.getInstanceId()); - deleteFromDb(instanceFromDb.getInstanceId()); - iterator.remove(); - } + lastPrintTime = currentTime; } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java index fca223b8730..41dc95ca950 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java @@ -20,16 +20,25 @@ import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.InstanceDb; import org.apache.inlong.agent.db.OffsetDb; +import org.apache.inlong.agent.db.TaskProfileDb; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.enums.InstanceStateEnum; +import org.apache.inlong.common.enums.TaskStateEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * used to store instance offset to db @@ -39,14 +48,17 @@ public class OffsetManager extends AbstractDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class); public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000; + public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100; + public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3"; private static volatile OffsetManager offsetManager = null; private final OffsetDb offsetDb; - // instance in db private final InstanceDb instanceDb; + private final TaskProfileDb taskProfileDb; - private OffsetManager(Db offsetBasicDb, Db instanceBasicDb) { - this.offsetDb = new OffsetDb(offsetBasicDb); + private OffsetManager(Db taskBasicDb, Db instanceBasicDb, Db offsetBasicDb) { + taskProfileDb = new TaskProfileDb(taskBasicDb); instanceDb = new InstanceDb(instanceBasicDb); + offsetDb = new OffsetDb(offsetBasicDb); } /** @@ -60,18 +72,8 @@ private Runnable coreThread() { while (isRunnable()) { try { AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - List offsets = offsetDb.listAllOffsets(); - offsets.forEach(offset -> { - String taskId = offset.getTaskId(); - String instanceId = offset.getInstanceId(); - InstanceProfile instanceProfile = instanceDb.getInstance(taskId, instanceId); - if (instanceProfile == null) { - deleteOffset(taskId, instanceId); - LOGGER.info("instance not found, delete offset taskId {} instanceId {}", taskId, - instanceId); - } - }); - LOGGER.info("offsetManager running! offsets count {}", offsets.size()); + cleanDbInstance(); + cleanDbOffset(); } catch (Throwable ex) { LOGGER.error("offset-manager-core: ", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); @@ -83,11 +85,11 @@ private Runnable coreThread() { /** * task position manager singleton, can only generated by agent manager */ - public static void init(Db offsetBasicDb, Db instanceBasicDb) { + public static void init(Db taskBasicDb, Db instanceBasicDb, Db offsetBasicDb) { if (offsetManager == null) { synchronized (OffsetManager.class) { if (offsetManager == null) { - offsetManager = new OffsetManager(offsetBasicDb, instanceBasicDb); + offsetManager = new OffsetManager(taskBasicDb, instanceBasicDb, offsetBasicDb); } } } @@ -115,6 +117,63 @@ public OffsetProfile getOffset(String taskId, String instanceId) { return offsetDb.getOffset(taskId, instanceId); } + private void cleanDbOffset() { + List offsets = offsetDb.listAllOffsets(); + offsets.forEach(offset -> { + String taskId = offset.getTaskId(); + String instanceId = offset.getInstanceId(); + InstanceProfile instanceProfile = instanceDb.getInstance(taskId, instanceId); + if (instanceProfile == null) { + deleteOffset(taskId, instanceId); + LOGGER.info("instance not found, delete offset taskId {} instanceId {}", taskId, + instanceId); + } + }); + LOGGER.info("offsetManager running! offsets count {}", offsets.size()); + } + + private void cleanDbInstance() { + AtomicInteger cleanCount = new AtomicInteger(); + Iterator iterator = instanceDb.listAllInstances().listIterator(); + while (iterator.hasNext()) { + if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) { + return; + } + InstanceProfile instanceFromDb = iterator.next(); + String taskId = instanceFromDb.getTaskId(); + String instanceId = instanceFromDb.getInstanceId(); + if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) { + continue; + } + TaskProfile taskFromDb = taskProfileDb.getTask(taskId); + if (taskFromDb != null) { + if (taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { + continue; + } + if (taskFromDb.isRetry()) { + if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) { + continue; + } + } else { + if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) { + continue; + } + } + } + long expireTime = DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT + taskFromDb.getCycleUnit()); + if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > expireTime) { + cleanCount.getAndIncrement(); + LOGGER.info("instance has expired, delete from db dataTime {} taskId {} instanceId {}", + instanceFromDb.getSourceDataTime(), taskId, instanceId); + instanceDb.deleteInstance(taskId, instanceId); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, instanceFromDb.getInlongGroupId(), + instanceFromDb.getInlongStreamId(), instanceFromDb.getSinkDataTime(), 1, 1, + Long.parseLong(taskId)); + iterator.remove(); + } + } + } + @Override public void start() throws Exception { submitWorker(coreThread()); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java index e3815f725e4..eca7f6b25f4 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java @@ -132,7 +132,7 @@ public TaskManager() { agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE)); offsetBasicDb = initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET)); - OffsetManager.init(offsetBasicDb, instanceBasicDb); + OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb); this.runningPool = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index a9d683750c3..1923dc5f1c3 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -57,6 +57,8 @@ public class TestLogFileSource { private static final Gson GSON = new Gson(); private static final String[] check = {"hello line-end-symbol aa", "world line-end-symbol", "agent line-end-symbol"}; + // task basic db + private static Db taskBasicDb; // instance basic db private static Db instanceBasicDb; // offset basic db @@ -64,12 +66,12 @@ public class TestLogFileSource { @BeforeClass public static void setup() { - helper = new AgentBaseTestsHelper(TestLogFileSource.class.getName()).setupAgentHome(); + taskBasicDb = TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK); instanceBasicDb = TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE); offsetBasicDb = TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET); - OffsetManager.init(offsetBasicDb, instanceBasicDb); + OffsetManager.init(taskBasicDb, instanceBasicDb, offsetBasicDb); } private LogFileSource getSource(int taskId, long offset) {