diff --git a/pom.xml b/pom.xml index 45393f3..f50fcf7 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 2.1.2.RELEASE + 2.6.6 org.apache diff --git a/src/main/java/org/apache/rocketmq/exporter/config/CollectClientMetricExecutorConfig.java b/src/main/java/org/apache/rocketmq/exporter/config/CollectClientMetricExecutorConfig.java index f62f9d6..daa22b5 100644 --- a/src/main/java/org/apache/rocketmq/exporter/config/CollectClientMetricExecutorConfig.java +++ b/src/main/java/org/apache/rocketmq/exporter/config/CollectClientMetricExecutorConfig.java @@ -16,9 +16,17 @@ */ package org.apache.rocketmq.exporter.config; - +import org.apache.rocketmq.exporter.task.ClientMetricCollectorFixedThreadPoolExecutor; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; @Configuration @ConfigurationProperties(prefix = "threadpool.collect-client-metric-executor") @@ -27,36 +35,27 @@ public class CollectClientMetricExecutorConfig { private int maximumPoolSize = 20; private long keepAliveTime = 4000L; private int queueSize = 1000; - - public int getCorePoolSize() { - return corePoolSize; - } - - public void setCorePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; - } - - public int getMaximumPoolSize() { - return maximumPoolSize; - } - - public void setMaximumPoolSize(int maximumPoolSize) { - this.maximumPoolSize = maximumPoolSize; - } - - public long getKeepAliveTime() { - return keepAliveTime; - } - - public void setKeepAliveTime(long keepAliveTime) { - this.keepAliveTime = keepAliveTime; - } - - public int getQueueSize() { - return queueSize; - } - - public void setQueueSize(int queueSize) { - this.queueSize = queueSize; + private BlockingQueue collectClientTaskBlockQueue; + + @Bean(name = "collectClientMetricExecutor") + public ExecutorService collectClientMetricExecutor() { + collectClientTaskBlockQueue = new LinkedBlockingDeque(queueSize); + ExecutorService executorService = new ClientMetricCollectorFixedThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + TimeUnit.MILLISECONDS, + this.collectClientTaskBlockQueue, + new ThreadFactory() { + private final AtomicLong threadIndex = new AtomicLong(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet()); + } + }, + new ThreadPoolExecutor.DiscardOldestPolicy() + ); + return executorService; } } diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java index 7f4b940..78c7c34 100644 --- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java +++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java @@ -41,7 +41,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.exporter.config.CollectClientMetricExecutorConfig; import org.apache.rocketmq.exporter.config.RMQConfigure; import org.apache.rocketmq.exporter.model.BrokerRuntimeStats; import org.apache.rocketmq.exporter.model.common.TwoTuple; @@ -57,7 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -69,13 +67,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; @Component public class MetricsCollectTask { @@ -92,31 +84,6 @@ public class MetricsCollectTask { private static String clusterName = null; private final static Logger log = LoggerFactory.getLogger(MetricsCollectTask.class); - private BlockingQueue collectClientTaskBlockQueue; - - @Bean(name = "collectClientMetricExecutor") - private ExecutorService collectClientMetricExecutor( - CollectClientMetricExecutorConfig collectClientMetricExecutorConfig) { - collectClientTaskBlockQueue = new LinkedBlockingDeque(collectClientMetricExecutorConfig.getQueueSize()); - ExecutorService executorService = new ClientMetricCollectorFixedThreadPoolExecutor( - collectClientMetricExecutorConfig.getCorePoolSize(), - collectClientMetricExecutorConfig.getMaximumPoolSize(), - collectClientMetricExecutorConfig.getKeepAliveTime(), - TimeUnit.MILLISECONDS, - this.collectClientTaskBlockQueue, - new ThreadFactory() { - private final AtomicLong threadIndex = new AtomicLong(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet()); - } - }, - new ThreadPoolExecutor.DiscardOldestPolicy() - ); - return executorService; - } - @PostConstruct public void init() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException { log.info("MetricsCollectTask init starting...."); diff --git a/src/test/java/org/apache/rocketmq/exporter/util/UtilsTest.java b/src/test/java/org/apache/rocketmq/exporter/util/UtilsTest.java index 6ea67c5..21a842a 100644 --- a/src/test/java/org/apache/rocketmq/exporter/util/UtilsTest.java +++ b/src/test/java/org/apache/rocketmq/exporter/util/UtilsTest.java @@ -17,7 +17,7 @@ package org.apache.rocketmq.exporter.util; import org.assertj.core.api.Assertions; -import org.junit.Test; +import org.junit.jupiter.api.Test; public class UtilsTest {