Skip to content

Commit

Permalink
GEODE-10068: Make WanCopyRegionFunctionService thread pool configurab… (
Browse files Browse the repository at this point in the history
#7424)

* GEODE-10068: Make WanCopyRegionFunctionService thread pool configurable through property

* GEODE-10068: Change name of property and add test case

* GEODE-10068: Update after more review comments
  • Loading branch information
albertogpz authored Jun 8, 2022
1 parent 4d2c2e7 commit 425bdb8
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ public class SystemPropertyHelper {
*/
public static final String RE_AUTHENTICATE_WAIT_TIME = "reauthenticate.wait.time";

/**
* Maximum number of concurrent executions in a server of wan-copy region commands.
* Once the maximum number is reached, subsequent executions will be halted until
* a thread for any of the ongoing executions is released.
*/
public static final String WAN_COPY_REGION_MAX_CONCURRENT_THREADS =
"geode.wan.copy-region.max-threads";

/**
* As of Geode 1.4.0, a region set operation will be in a transaction even if it is the first
* operation in the transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@

import org.apache.geode.cache.Cache;
import org.apache.geode.internal.cache.CacheService;
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
import org.apache.geode.management.internal.functions.CliFunctionResult;

public class WanCopyRegionFunctionService implements CacheService {

private static final String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
"WAN Copy Region Function Execution Processor";

private volatile ExecutorService wanCopyRegionFunctionExecutionPool;

/**
Expand All @@ -41,11 +46,15 @@ public class WanCopyRegionFunctionService implements CacheService {

@Override
public boolean init(Cache cache) {
String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
"WAN Copy Region Function Execution Processor";
int WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS = 10;
int maxConcurrentThreads = SystemProperty
.getProductIntegerProperty(
SystemPropertyHelper.WAN_COPY_REGION_MAX_CONCURRENT_THREADS, 10);
return init(maxConcurrentThreads);
}

boolean init(int maxConcurrentThreads) {
wanCopyRegionFunctionExecutionPool = LoggingExecutors
.newFixedThreadPool(WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS,
.newFixedThreadPool(maxConcurrentThreads,
WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX, true);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.internal.functions.CliFunctionResult;
Expand All @@ -36,7 +37,7 @@ public class WanCopyRegionFunctionServiceTest {
private WanCopyRegionFunctionService service;
private final InternalCache cache = mock(InternalCache.class);

@Before
@BeforeEach
public void setUp() throws Exception {
service = new WanCopyRegionFunctionService();
service.init(cache);
Expand Down Expand Up @@ -158,7 +159,7 @@ public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
int executions = 5;
CountDownLatch latch = new CountDownLatch(executions);
for (int i = 0; i < executions; i++) {
Callable<CliFunctionResult> firstExecution = () -> {
Callable<CliFunctionResult> execution = () -> {
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
return null;
};
Expand All @@ -167,7 +168,7 @@ public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
CompletableFuture
.supplyAsync(() -> {
try {
return service.execute(firstExecution, regionName, "mySender1");
return service.execute(execution, regionName, "mySender1");
} catch (Exception e) {
return null;
}
Expand All @@ -183,4 +184,47 @@ public void severalExecuteWithDifferentRegionOrSenderAreAllowed() {
latch.countDown();
}
}

@Test
public void concurrentExecutionsDoesNotExceedMaxConcurrentExecutions() {
int maxConcurrentExecutions = 2;
service.init(maxConcurrentExecutions);

int executions = 4;
CountDownLatch latch = new CountDownLatch(executions);
AtomicInteger concurrentExecutions = new AtomicInteger(0);
for (int i = 0; i < executions; i++) {
Callable<CliFunctionResult> execution = () -> {
concurrentExecutions.incrementAndGet();
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
concurrentExecutions.decrementAndGet();
return null;
};

final String regionName = String.valueOf(i);
CompletableFuture
.supplyAsync(() -> {
try {
return service.execute(execution, regionName, "mySender1");
} catch (Exception e) {
return null;
}
});
}

// Wait for the functions to start execution
await().untilAsserted(
() -> assertThat(service.getNumberOfCurrentExecutions()).isEqualTo(executions));

// Make sure concurrent executions does not exceed the maximum
assertThat(concurrentExecutions.get()).isEqualTo(maxConcurrentExecutions);

// End executions
for (int i = 0; i < executions; i++) {
latch.countDown();
}

await().untilAsserted(() -> assertThat(concurrentExecutions.get()).isEqualTo(0));
}

}

0 comments on commit 425bdb8

Please sign in to comment.