Skip to content

Commit

Permalink
fix:MetricsCollectTask has cycle dependence
Browse files Browse the repository at this point in the history
  • Loading branch information
humkum committed Nov 18, 2022
1 parent 5fb1de2 commit ff36050
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<version>2.6.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.apache</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@
*/
package org.apache.rocketmq.exporter.config;


import org.apache.rocketmq.exporter.task.ClientMetricCollectorFixedThreadPoolExecutor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Configuration
@ConfigurationProperties(prefix = "threadpool.collect-client-metric-executor")
Expand All @@ -27,36 +35,27 @@ public class CollectClientMetricExecutorConfig {
private int maximumPoolSize = 20;
private long keepAliveTime = 4000L;
private int queueSize = 1000;

public int getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}

public int getMaximumPoolSize() {
return maximumPoolSize;
}

public void setMaximumPoolSize(int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}

public long getKeepAliveTime() {
return keepAliveTime;
}

public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}

public int getQueueSize() {
return queueSize;
}

public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
private BlockingQueue<Runnable> collectClientTaskBlockQueue;

@Bean(name = "collectClientMetricExecutor")
public ExecutorService collectClientMetricExecutor() {
collectClientTaskBlockQueue = new LinkedBlockingDeque<Runnable>(queueSize);
ExecutorService executorService = new ClientMetricCollectorFixedThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
this.collectClientTaskBlockQueue,
new ThreadFactory() {
private final AtomicLong threadIndex = new AtomicLong(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet());
}
},
new ThreadPoolExecutor.DiscardOldestPolicy()
);
return executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.exporter.config.CollectClientMetricExecutorConfig;
import org.apache.rocketmq.exporter.config.RMQConfigure;
import org.apache.rocketmq.exporter.model.BrokerRuntimeStats;
import org.apache.rocketmq.exporter.model.common.TwoTuple;
Expand All @@ -57,7 +56,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

Expand All @@ -69,13 +67,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Component
public class MetricsCollectTask {
Expand All @@ -92,31 +84,6 @@ public class MetricsCollectTask {
private static String clusterName = null;
private final static Logger log = LoggerFactory.getLogger(MetricsCollectTask.class);

private BlockingQueue<Runnable> collectClientTaskBlockQueue;

@Bean(name = "collectClientMetricExecutor")
private ExecutorService collectClientMetricExecutor(
CollectClientMetricExecutorConfig collectClientMetricExecutorConfig) {
collectClientTaskBlockQueue = new LinkedBlockingDeque<Runnable>(collectClientMetricExecutorConfig.getQueueSize());
ExecutorService executorService = new ClientMetricCollectorFixedThreadPoolExecutor(
collectClientMetricExecutorConfig.getCorePoolSize(),
collectClientMetricExecutorConfig.getMaximumPoolSize(),
collectClientMetricExecutorConfig.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
this.collectClientTaskBlockQueue,
new ThreadFactory() {
private final AtomicLong threadIndex = new AtomicLong(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet());
}
},
new ThreadPoolExecutor.DiscardOldestPolicy()
);
return executorService;
}

@PostConstruct
public void init() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
log.info("MetricsCollectTask init starting....");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.rocketmq.exporter.util;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.jupiter.api.Test;

public class UtilsTest {

Expand Down

0 comments on commit ff36050

Please sign in to comment.