Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3062: Fix KafkaBinderMetrics for resource leaks #3064

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2024 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,16 +20,15 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToDoubleFunction;

@@ -50,10 +49,12 @@
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.context.Lifecycle;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;

/**
* Metrics for Kafka binder.
@@ -72,7 +73,7 @@
* @author Omer Celik
*/
public class KafkaBinderMetrics
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable {
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable, Lifecycle {

private static final int DEFAULT_TIMEOUT = 5;

@@ -101,6 +102,8 @@ public class KafkaBinderMetrics

private final ReentrantLock consumerFactoryLock = new ReentrantLock();

private final AtomicBoolean running = new AtomicBoolean();

public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory,
@@ -125,14 +128,14 @@ public void setTimeout(int timeout) {

@Override
public void bindTo(MeterRegistry registry) {
/**
/*
* We can't just replace one scheduler with another.
* Before and even after the old one is gathered by GC, it's threads still exist, consume memory and CPU resources to switch contexts.
* Theoretically, as a result of processing n topics, there will be about (1+n)*n/2 threads simultaneously at the same time.
*/
if (this.scheduler != null) {
LOG.info("Try to shutdown the old scheduler with " + ((ScheduledThreadPoolExecutor) scheduler).getPoolSize() + " threads");
this.scheduler.shutdown();
this.scheduler.shutdownNow();
}

this.scheduler = Executors.newScheduledThreadPool(this.binder.getTopicsInUse().size());
@@ -278,10 +281,50 @@ public void onApplicationEvent(BindingCreatedEvent event) {
}

@Override
public void close() throws Exception {
if (this.meterRegistry != null) {
this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
public void close() {
if (this.scheduler != null) {
this.consumerFactoryLock.lock();
try {
if (this.meterRegistry != null) {
this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
}
this.scheduler.shutdownNow();
try {
this.scheduler.awaitTermination(
binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().toSeconds(),
TimeUnit.SECONDS);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
ReflectionUtils.rethrowRuntimeException(ex);
}
}
finally {
this.scheduler = null;
this.metadataConsumers.values().forEach(Consumer::close);
this.metadataConsumers.clear();
this.consumerFactoryLock.unlock();
}
}
Optional.ofNullable(scheduler).ifPresent(ExecutorService::shutdown);
}

@Override
public void start() {
this.running.set(true);
// Nothing else to do here. The 'bindTo()' is called from the 'onApplicationEvent()',
// which, in turn, is emitted from the 'AbstractBindingLifecycle.start()' logic.
}

@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
close();
}
}

@Override
public boolean isRunning() {
return this.running.get();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2024 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -57,6 +57,7 @@
* @author Tomek Szmytka
* @author Nico Heller
* @author Kurt Hong
* @author Artem Bilan
*/
class KafkaBinderMetricsTest {

@@ -346,10 +347,11 @@ public void usesBeginningOffsetIfNoCommittedOffsetFound() {
}

@Test
public void shouldShutdownSchedulerOnClose() throws Exception {
public void shouldShutdownSchedulerOnClose() {
metrics.bindTo(meterRegistry);
assertThat(metrics.scheduler).isNotNull();
metrics.close();
assertThat(metrics.scheduler.isShutdown()).isTrue();
assertThat(metrics.scheduler).isNull();
}

@Test
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

@@ -47,6 +48,7 @@
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.convert.converter.Converter;
@@ -78,7 +80,7 @@
* @author Byungjun You
* @author Omer Celik
*/
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware {
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware, SmartLifecycle {

protected final Log logger = LogFactory.getLog(getClass());

@@ -94,6 +96,8 @@ public class DefaultBinderFactory implements BinderFactory, DisposableBean, Appl

private final BinderCustomizer binderCustomizer;

private final AtomicBoolean running = new AtomicBoolean();

private volatile ConfigurableApplicationContext context;

private Collection<Listener> listeners;
@@ -144,6 +148,27 @@ public void destroy() {
this.defaultBinderForBindingTargetType.clear();
}

@Override
public void start() {
// This is essentially used when CRaC checkpoint is restored
if (this.running.compareAndSet(false, true)) {
this.binderInstanceCache.values().stream().map(Entry::getValue).forEach(ConfigurableApplicationContext::start);
}
}

@Override
public void stop() {
// Makes sense for CRaC checkpoint
if (this.running.compareAndSet(true, false)) {
this.binderInstanceCache.values().stream().map(Entry::getValue).forEach(ConfigurableApplicationContext::stop);
}
}

@Override
public boolean isRunning() {
return this.running.get();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <T> Binder<T, ?, ?> getBinder(String name, Class<? extends T> bindingTargetType) {