Skip to content

Commit

Permalink
[INLONG-11591][Agent] Reduce duplicate code for log collection type t…
Browse files Browse the repository at this point in the history
…asks
  • Loading branch information
justinwwhuang committed Dec 10, 2024
1 parent 36a9017 commit d3d5354
Show file tree
Hide file tree
Showing 35 changed files with 391 additions and 421 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.file.FileUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

Expand All @@ -40,12 +41,24 @@
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;

/**
* job profile which contains details describing properties of one job.
*/
public class InstanceProfile extends AbstractConfiguration implements Comparable<InstanceProfile> {

public static final String DEFAULT_FILE_INSTANCE = "org.apache.inlong.agent.plugin.instance.FileInstance";
public static final String DEFAULT_COS_INSTANCE = "org.apache.inlong.agent.plugin.instance.COSInstance";
public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance";
public static final String DEFAULT_MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance";
public static final String DEFAULT_MQTT_INSTANCE = "org.apache.inlong.agent.plugin.instance.MqttInstance";
public static final String DEFAULT_ORACLE_INSTANCE = "org.apache.inlong.agent.plugin.instance.OracleInstance";
public static final String DEFAULT_POSTGRES_INSTANCE = "org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
public static final String DEFAULT_PULSAR_INSTANCE = "org.apache.inlong.agent.plugin.instance.PulsarInstance";
public static final String DEFAULT_REDIS_INSTANCE = "org.apache.inlong.agent.plugin.instance.RedisInstance";
public static final String DEFAULT_SQLSERVER_INSTANCE = "org.apache.inlong.agent.plugin.instance.SQLServerInstance";

private static final Logger LOGGER = LoggerFactory.getLogger(InstanceProfile.class);
private static final Gson GSON = new Gson();

Expand All @@ -64,12 +77,40 @@ public String toJsonStr() {
return GSON.toJson(getConfigStorage());
}

public void setInstanceClass(String className) {
set(TaskConstants.INSTANCE_CLASS, className);
public String getInstanceClass() {
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, TaskTypeEnum.FILE.getType()));
return getInstanceClassByTaskType(taskType);
}

public String getInstanceClass() {
return get(TaskConstants.INSTANCE_CLASS);
public static String getInstanceClassByTaskType(TaskTypeEnum taskType) {
if (taskType == null) {
return null;
}
switch (taskType) {
case FILE:
return DEFAULT_FILE_INSTANCE;
case KAFKA:
return DEFAULT_KAFKA_INSTANCE;
case PULSAR:
return DEFAULT_PULSAR_INSTANCE;
case POSTGRES:
return DEFAULT_POSTGRES_INSTANCE;
case ORACLE:
return DEFAULT_ORACLE_INSTANCE;
case SQLSERVER:
return DEFAULT_SQLSERVER_INSTANCE;
case MONGODB:
return DEFAULT_MONGODB_INSTANCE;
case REDIS:
return DEFAULT_REDIS_INSTANCE;
case MQTT:
return DEFAULT_MQTT_INSTANCE;
case COS:
return DEFAULT_COS_INSTANCE;
default:
LOGGER.error("invalid task type {}", taskType);
return null;
}
}

public String getTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.inlong.agent.utils.DateTransUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.enums.TaskStateEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;

import com.google.gson.Gson;
Expand All @@ -32,18 +33,32 @@
import java.text.ParseException;
import java.util.TimeZone;

import static java.util.Objects.requireNonNull;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_TYPE;

/**
* job profile which contains details describing properties of one job.
*/
public class TaskProfile extends AbstractConfiguration {

public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
public static final String DEFAULT_COS_TASK = "org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask";
public static final String DEFAULT_REDIS_TASK = "org.apache.inlong.agent.plugin.task.RedisTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String DEFAULT_MOCK_TASK = "org.apache.inlong.agent.plugin.task.MockTask";

private static final Gson GSON = new Gson();
private static final Logger logger = LoggerFactory.getLogger(TaskProfile.class);

Expand All @@ -57,6 +72,37 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
return TaskProfileDto.convertToTaskProfile(dataConfig);
}

public String getTaskClass() {
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, TaskTypeEnum.FILE.getType()));
switch (requireNonNull(taskType)) {
case FILE:
return DEFAULT_FILE_TASK;
case KAFKA:
return DEFAULT_KAFKA_TASK;
case PULSAR:
return DEFAULT_PULSAR_TASK;
case POSTGRES:
return DEFAULT_POSTGRESQL_TASK;
case ORACLE:
return DEFAULT_ORACLE_TASK;
case SQLSERVER:
return DEFAULT_SQLSERVER_TASK;
case MONGODB:
return DEFAULT_MONGODB_TASK;
case REDIS:
return DEFAULT_REDIS_TASK;
case MQTT:
return DEFAULT_MQTT_TASK;
case COS:
return DEFAULT_COS_TASK;
case MOCK:
return DEFAULT_MOCK_TASK;
default:
logger.error("invalid task type {}", taskType);
return null;
}
}

public String getTaskId() {
return get(TaskConstants.TASK_ID);
}
Expand All @@ -81,14 +127,6 @@ public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
}

public String getTaskClass() {
return get(TaskConstants.TASK_CLASS);
}

public void setTaskClass(String className) {
set(TaskConstants.TASK_CLASS, className);
}

public String getInlongGroupId() {
return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
}
Expand Down Expand Up @@ -124,11 +162,9 @@ public String toJsonStr() {
return GSON.toJson(getConfigStorage());
}

public InstanceProfile createInstanceProfile(String instanceClass, String fileName, String cycleUnit,
String dataTime,
public InstanceProfile createInstanceProfile(String fileName, String cycleUnit, String dataTime,
long fileUpdateTime) {
InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr());
instanceProfile.setInstanceClass(instanceClass);
instanceProfile.setInstanceId(fileName);
instanceProfile.setSourceDataTime(dataTime);
Long sinkDataTime = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
*/
public class TaskConstants extends CommonConstants {

// job id
// public static final String JOB_ID = "job.id";
public static final String TASK_ID = "task.id";
public static final String INSTANCE_ID = "instance.id";
public static final String JOB_INSTANCE_ID = "job.instance.id";
public static final String INSTANCE_CREATE_TIME = "instance.createTime";
public static final String INSTANCE_MODIFY_TIME = "instance.modifyTime";
public static final String TASK_GROUP_ID = "task.groupId";
Expand All @@ -36,9 +33,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_SOURCE = "task.source";

public static final String TASK_CHANNEL = "task.channel";

public static final String TASK_CLASS = "task.taskClass";
public static final String INSTANCE_CLASS = "task.instance.class";
public static final String TASK_TYPE = "task.taskType";
public static final String TASK_FILE_TRIGGER = "task.fileTask.trigger";

// sink config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,11 @@
public class TaskProfileDto {

private static final Logger logger = LoggerFactory.getLogger(TaskProfileDto.class);

public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.file.LogFileTask";
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask";
public static final String DEFAULT_REDIS_TASK = "org.apache.inlong.agent.plugin.task.RedisTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink";
public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink";
public static final String DEFAULT_COS_TASK = "org.apache.inlong.agent.plugin.task.cos.COSTask";
/**
* file source
*/
Expand Down Expand Up @@ -470,6 +459,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {

// common attribute
task.setId(String.valueOf(dataConfig.getTaskId()));
task.setTaskType(dataConfig.getTaskType());
task.setGroupId(dataConfig.getInlongGroupId());
task.setStreamId(dataConfig.getInlongStreamId());
task.setChannel(DEFAULT_CHANNEL);
Expand Down Expand Up @@ -517,7 +507,6 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case FILE:
task.setTaskClass(DEFAULT_FILE_TASK);
FileTask fileTask = getFileTask(dataConfig);
task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
Expand All @@ -526,56 +515,48 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case KAFKA:
task.setTaskClass(DEFAULT_KAFKA_TASK);
KafkaTask kafkaTask = getKafkaTask(dataConfig);
task.setKafkaTask(kafkaTask);
task.setSource(KAFKA_SOURCE);
profileDto.setTask(task);
break;
case PULSAR:
task.setTaskClass(DEFAULT_PULSAR_TASK);
PulsarTask pulsarTask = getPulsarTask(dataConfig);
task.setPulsarTask(pulsarTask);
task.setSource(PULSAR_SOURCE);
profileDto.setTask(task);
break;
case POSTGRES:
task.setTaskClass(DEFAULT_POSTGRESQL_TASK);
PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig);
task.setPostgreSQLTask(postgreSQLTask);
task.setSource(POSTGRESQL_SOURCE);
profileDto.setTask(task);
break;
case ORACLE:
task.setTaskClass(DEFAULT_ORACLE_TASK);
OracleTask oracleTask = getOracleTask(dataConfig);
task.setOracleTask(oracleTask);
task.setSource(ORACLE_SOURCE);
profileDto.setTask(task);
break;
case SQLSERVER:
task.setTaskClass(DEFAULT_SQLSERVER_TASK);
SqlServerTask sqlserverTask = getSqlServerTask(dataConfig);
task.setSqlserverTask(sqlserverTask);
task.setSource(SQLSERVER_SOURCE);
profileDto.setTask(task);
break;
case MONGODB:
task.setTaskClass(DEFAULT_MONGODB_TASK);
MongoTask mongoTask = getMongoTask(dataConfig);
task.setMongoTask(mongoTask);
task.setSource(MONGO_SOURCE);
profileDto.setTask(task);
break;
case REDIS:
task.setTaskClass(DEFAULT_REDIS_TASK);
RedisTask redisTask = getRedisTask(dataConfig);
task.setRedisTask(redisTask);
task.setSource(REDIS_SOURCE);
profileDto.setTask(task);
break;
case MQTT:
task.setTaskClass(DEFAULT_MQTT_TASK);
MqttTask mqttTask = getMqttTask(dataConfig);
task.setMqttTask(mqttTask);
task.setSource(MQTT_SOURCE);
Expand All @@ -585,7 +566,6 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case COS:
task.setTaskClass(DEFAULT_COS_TASK);
COSTask cosTask = getCOSTask(dataConfig);
task.setCycleUnit(cosTask.getCycleUnit());
task.setCosTask(cosTask);
Expand Down Expand Up @@ -619,6 +599,7 @@ public static class Task {
private String mqClusters;
private String topicInfo;
private String taskClass;
private Integer taskType;
private String predefinedFields;
private Integer state;
private String cycleUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private void addToMemory(InstanceProfile instanceProfile) {
instanceProfile.getSinkDataTime(), 1, 1, auditVersion);
}
} catch (Throwable t) {
LOGGER.error("add instance error {}", t.getMessage());
LOGGER.error("add instance error {}", t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.sources.extend.DefaultExtendedHandler;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.task.file.FileDataUtils;
import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;

import lombok.AllArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.inlong.agent.plugin.task;

import org.apache.inlong.agent.plugin.task.file.LogFileTask;
import org.apache.inlong.agent.plugin.task.logcollection.local.FileTask;

/**
* Directory trigger with format date.
*/
public class FormatDateLogFileTask extends LogFileTask {
public class FormatDateLogFileTask extends FileTask {

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
public class KafkaTask extends AbstractTask {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTask.class);
public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance";
private boolean isAdded = false;
private String topic;
private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH");
Expand All @@ -58,8 +57,8 @@ protected List<InstanceProfile> getNewInstanceList() {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, topic,
CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
InstanceProfile instanceProfile = taskProfile.createInstanceProfile(topic, CycleUnitType.HOUR, dataTime,
AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}", instanceProfile.toJsonStr());
list.add(instanceProfile);
this.isAdded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
public class MongoDBTask extends AbstractTask {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBTask.class);
public static final String DEFAULT_MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance";
private boolean isAdded = false;
private String collection;
private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH");
Expand Down Expand Up @@ -66,8 +65,8 @@ protected List<InstanceProfile> getNewInstanceList() {
return list;
}
String dataTime = LocalDateTime.now().format(dateTimeFormatter);
InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_MONGODB_INSTANCE, collection,
CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
InstanceProfile instanceProfile = taskProfile.createInstanceProfile(collection, CycleUnitType.HOUR, dataTime,
AgentUtils.getCurrentTime());
LOGGER.info("taskProfile.createInstanceProfile: {}", instanceProfile.toJsonStr());
list.add(instanceProfile);
this.isAdded = true;
Expand Down
Loading

0 comments on commit d3d5354

Please sign in to comment.