Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fiber refactorings #2

Open
wants to merge 21 commits into
base: java21
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fiber refactorings
  • Loading branch information
bgprudhomme committed Oct 17, 2024
commit a20f771a24775a72a612e91326f33ca966869e60
Original file line number Diff line number Diff line change
@@ -380,7 +380,7 @@ public void when_repeatedBidirectionalStreaming_withFaultyService() throws IOExc

private static Server createServer(BindableService service) throws IOException {
Server server = ServerBuilder.forPort(0)
.executor(Executors.newFixedThreadPool(4))
.executor(Executors.newVirtualThreadPerTaskExecutor())
.addService(service)
.build();
server.start();
Original file line number Diff line number Diff line change
@@ -630,7 +630,7 @@ public void testConcurrentPutAndOrderbyQueries() {
map.addIndex(indexConfig);

int threadsCount = RuntimeAvailableProcessors.get() - 2;
ExecutorService executor = Executors.newFixedThreadPool(threadsCount);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

int keysPerThread = 5000;
CountDownLatch latch = new CountDownLatch(threadsCount);
@@ -683,7 +683,7 @@ public void testConcurrentUpdateAndOrderbyQueries() {
map.addIndex(indexConfig);

int threadsCount = RuntimeAvailableProcessors.get() - 2;
ExecutorService executor = Executors.newFixedThreadPool(threadsCount);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

int keysPerThread = 2500;
CountDownLatch latch = new CountDownLatch(threadsCount);
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ public void setUp() throws Exception {
public void selectAllFromTableWhereIdColumnParallel() throws Exception {
int repeatCount = 10000;
Future<?>[] futures = new Future[THREAD_COUNT];
var pool = Executors.newFixedThreadPool(THREAD_COUNT);
var pool = Executors.newVirtualThreadPerTaskExecutor();
try {
for (int i = 0; i < THREAD_COUNT; ++i) {
futures[i] = pool.submit(() -> {
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ public void test_indexScan() throws InterruptedException {

for (int i = 0, threadsLength = threads.length; i < threadsLength; i++) {
HazelcastInstance inst = createHazelcastClient();
threads[i] = new Thread(() -> {
threads[i] = Thread.ofVirtual().unstarted(() -> {
int numQueriesLocal = 0;
while (!done.get()) {
try {
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ public void when_addingElementsToCacheMultiThreaded_then_minProperSizeAndElement
};

List<Thread> threadList = IntStream.range(0, threadCount)
.mapToObj(value -> new Thread(runnable))
.mapToObj(value -> Thread.ofVirtual().unstarted(runnable))
.collect(Collectors.toList());
threadList.forEach(Thread::start);
threadList.forEach((ConsumerEx<Thread>) Thread::join);
Original file line number Diff line number Diff line change
@@ -281,7 +281,7 @@ protected void handleCommand(String commandInputted) {
println("ops/s = " + repeat * ONE_THOUSAND / (Clock.currentTimeMillis() - t0));
} else if (first.startsWith("&") && first.length() > 1) {
final int fork = Integer.parseInt(first.substring(1));
ExecutorService pool = Executors.newFixedThreadPool(fork);
ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();
final String threadCommand = command.substring(first.length());
for (int i = 0; i < fork; i++) {
final int threadID = i;
Original file line number Diff line number Diff line change
@@ -221,7 +221,7 @@ public void handleCommand(String inputCommand) {
} else if (first.startsWith("&") && first.length() > 1) {
final int fork = Integer.parseInt(first.substring(1));
final String threadCommand = command.substring(first.length());
ExecutorService pool = Executors.newFixedThreadPool(fork);
ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < fork; i++) {
final int threadID = i;
pool.submit(() -> {
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@
final class DnsEndpointResolver
extends HazelcastKubernetesDiscoveryStrategy.EndpointResolver {
// executor service for dns lookup calls
private static final ExecutorService DNS_LOOKUP_SERVICE = Executors.newCachedThreadPool();
private static final ExecutorService DNS_LOOKUP_SERVICE = Executors.newVirtualThreadPerTaskExecutor();

private final String serviceDns;
private final int port;
Original file line number Diff line number Diff line change
@@ -105,7 +105,7 @@ public void concurrentCacheCreation() throws InterruptedException {
errorCounter.incrementAndGet();
}
};
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < threadCount; i++) {
executorService.submit(getCache);
}
@@ -119,7 +119,7 @@ public void concurrentCacheCreation() throws InterruptedException {

@Test
public void createOrGetConcurrentlySingleCache_fromMultiProviders() {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
@@ -136,7 +136,7 @@ public void createOrGetConcurrentlySingleCache_fromMultiProviders() {

@Test
public void createConcurrentlyMultipleCaches_fromMultipleProviders() {
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ public void testSyncListener() throws Exception {
final CountDownLatch latch = new CountDownLatch(threadCount);
final AtomicBoolean shutdown = new AtomicBoolean(false);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
Thread.ofVirtual().start(() -> {
Random rand = new Random();
for (int i1 = 0; i1 < putCount && !shutdown.get(); i1++) {
String key = String.valueOf(rand.nextInt(putCount));
@@ -98,7 +98,7 @@ public void testSyncListener() throws Exception {
actualPutCount.incrementAndGet();
}
latch.countDown();
}).start();
});
}

if (!latch.await(ASSERT_TRUE_EVENTUALLY_TIMEOUT, TimeUnit.SECONDS)) {
@@ -291,7 +291,7 @@ private void testSyncListener_shouldNotHang_AfterAction(String cacheName, Cachin
int threads = 4;
final CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
Thread.ofVirtual().start(() -> {
Random rand = new Random();
while (true) {
try {
@@ -301,7 +301,7 @@ private void testSyncListener_shouldNotHang_AfterAction(String cacheName, Cachin
}
}
latch.countDown();
}).start();
});
}

// wait a little for putting threads to start
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ public void testPutCacheConfigConcurrently()
throws ExecutionException, InterruptedException {
CacheService cacheService = new TestCacheService(mockNodeEngine, false);

executorService = Executors.newFixedThreadPool(CONCURRENCY);
executorService = Executors.newVirtualThreadPerTaskExecutor();
List<Future<CacheConfig>> futures = new ArrayList<>();
for (int i = 0; i < CONCURRENCY; i++) {
futures.add(
@@ -130,7 +130,7 @@ public void testPutCacheConfigConcurrently()
public void testPutCacheConfigConcurrently_whenExceptionThrownFromAdditionalSetup() {
CacheService cacheService = new TestCacheService(mockNodeEngine, true);

executorService = Executors.newFixedThreadPool(CONCURRENCY);
executorService = Executors.newVirtualThreadPerTaskExecutor();
List<Future<CacheConfig>> futures = new ArrayList<>();
for (int i = 0; i < CONCURRENCY; i++) {
futures.add(
Original file line number Diff line number Diff line change
@@ -94,7 +94,7 @@ public void testCardinalityEstimatorSpawnNodeInParallel() {
final String name = "testSpawnNodeInParallel";
CardinalityEstimator estimator = instance.getCardinalityEstimator(name);
estimator.add(1L);
final ExecutorService ex = Executors.newFixedThreadPool(parallel);
final ExecutorService ex = Executors.newVirtualThreadPerTaskExecutor();
try {
for (int i = 0; i < total / parallel; i++) {
final HazelcastInstance[] instances = new HazelcastInstance[parallel];
Original file line number Diff line number Diff line change
@@ -751,7 +751,7 @@ public void testClusterShutdown_thenCheckOperationsNotHanging() throws Exception
CountDownLatch testFinishedLatch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(() -> {
Thread thread = Thread.ofVirtual().unstarted(() -> {
try {
for (int i1 = 0; i1 < mapSize; i1++) {
if (i1 == mapSize / 4) {
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ public void testClientPortConnection() {
@Test
public void testClientConnectionBeforeServerReady() {
String clusterName = randomString();
ExecutorService executorService = Executors.newFixedThreadPool(2);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
executorService.submit(() -> {
Config config = new Config();
config.setClusterName(clusterName);
Original file line number Diff line number Diff line change
@@ -292,7 +292,7 @@ public void testClientListenerDisconnected() {
clients.add(client);
}

ExecutorService ex = Executors.newFixedThreadPool(4);
ExecutorService ex = Executors.newVirtualThreadPerTaskExecutor();
try {
for (final HazelcastInstance client : clients) {
ex.execute(client::shutdown);
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ public static void main(String[] args) {
System.out.println(" Get Percentage: " + getPercentage);
System.out.println(" Put Percentage: " + putPercentage);
System.out.println(" Remove Percentage: " + (100 - (putPercentage + getPercentage)));
ExecutorService es = Executors.newFixedThreadPool(threadCount);
ExecutorService es = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < threadCount; i++) {
es.submit((Runnable) () -> {
IMap<String, Object> map = client.getMap("default");
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ private static void runTest(ICache<Integer, Integer> icacheOnClient,
}
};

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

int numOfGetters = 2 * RuntimeAvailableProcessors.get();
for (int i = 0; i < numOfGetters; i++) {
Original file line number Diff line number Diff line change
@@ -126,7 +126,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
ArrayList<Thread> threads = new ArrayList<>();

// continuously adds and removes member
Thread shadowMember = new Thread(() -> {
Thread shadowMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
HazelcastInstance member = hazelcastFactory.newHazelcastInstance(config);
sleepSeconds(5);
@@ -138,7 +138,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {

for (int i = 0; i < NEAR_CACHE_POPULATE_THREAD_COUNT; i++) {
// populates client Near Cache
Thread populateClientNearCache = new Thread(() -> {
Thread populateClientNearCache = Thread.ofVirtual().unstarted(() -> {
int key = 0;
while (!stopTest.get()) {
clientCache.get(key++);
@@ -151,7 +151,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
}

// updates data from member
Thread putFromMember = new Thread(() -> {
Thread putFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
int key = getInt(KEY_COUNT);
int value = getInt(Integer.MAX_VALUE);
@@ -162,7 +162,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
});
threads.add(putFromMember);

Thread clearFromMember = new Thread(() -> {
Thread clearFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
memberCache.clear();
sleepSeconds(3);
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ public void testClusterShutdownDuringMapPutAll() {
final CountDownLatch threadsFinished = new CountDownLatch(numThreads);
final CountDownLatch threadsStarted = new CountDownLatch(numThreads);

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < numThreads; i++) {
executor.execute(new Runnable() {
@Override
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.hazelcast.internal.nearcache.impl.NearCacheTestUtils.getBaseConfig;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
import static junit.framework.TestCase.assertNull;

@RunWith(HazelcastParallelClassRunner.class)
@@ -63,7 +63,7 @@ public void testDestroyAndCreateProxyWithNearCache() {

int createPutGetThreadCount = 2;
int destroyThreadCount = 2;
ExecutorService pool = newFixedThreadPool(createPutGetThreadCount + destroyThreadCount);
ExecutorService pool = newVirtualThreadPerTaskExecutor();

final AtomicBoolean isRunning = new AtomicBoolean(true);
final AtomicReference<Exception> exception = new AtomicReference<>();
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ public void tearDown() throws Exception {

@Test
public void stress_stats_by_doing_put_and_remove() throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(4);
ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();
pool.execute(new Put());
pool.execute(new Put());
pool.execute(new Remove());
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
ArrayList<Thread> threads = new ArrayList<>();

// continuously adds and removes member
Thread shadowMember = new Thread(() -> {
Thread shadowMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
HazelcastInstance member1 = factory.newHazelcastInstance(config);
sleepSeconds(5);
@@ -97,7 +97,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {

// populates client Near Cache
for (int i = 0; i < NEAR_CACHE_POPULATE_THREAD_COUNT; i++) {
Thread populateClientNearCache = new Thread(() -> {
Thread populateClientNearCache = Thread.ofVirtual().unstarted(() -> {
int key = 0;
while (!stopTest.get()) {
clientMap.get(key++);
@@ -111,7 +111,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
}

// updates map data from member
Thread putFromMember = new Thread(() -> {
Thread putFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
int key = getInt(KEY_COUNT);
int value = getInt(Integer.MAX_VALUE);
@@ -122,7 +122,7 @@ public void ensure_nearCachedClient_and_member_data_sync_eventually() {
});
threads.add(putFromMember);

Thread clearFromMember = new Thread(() -> {
Thread clearFromMember = Thread.ofVirtual().unstarted(() -> {
while (!stopTest.get()) {
memberMap.clear();
sleepSeconds(5);
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ public void stress_user_listener_removal_upon_query_cache_destroy() throws Inter
final AtomicBoolean stop = new AtomicBoolean(false);
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < STRESS_TEST_THREAD_COUNT; i++) {
Thread thread = new Thread(() -> {
Thread thread = Thread.ofVirtual().unstarted(() -> {
while (!stop.get()) {
String name = mapNames[getInt(0, 4)];
final IMap<Integer, Integer> map = client.getMap(name);
@@ -177,7 +177,7 @@ public void event_service_is_empty_after_queryCache_concurrent_destroy() throws
final AtomicBoolean stop = new AtomicBoolean(false);
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < STRESS_TEST_THREAD_COUNT; i++) {
Thread thread = new Thread(() -> {
Thread thread = Thread.ofVirtual().unstarted(() -> {
while (!stop.get()) {
QueryCache queryCache = map.getQueryCache("a", Predicates.alwaysTrue(), true);

@@ -238,7 +238,7 @@ public void no_query_cache_left_after_creating_and_destroying_same_map_concurren
final HazelcastInstance client = factory.newHazelcastClient();
final String mapName = "test";

ExecutorService pool = Executors.newFixedThreadPool(STRESS_TEST_THREAD_COUNT);
ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();
final AtomicBoolean stop = new AtomicBoolean(false);

for (int i = 0; i < 1000; i++) {
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ public void testConcurrentTxnPut() throws Exception {
final MultiMap multiMap = client.getMultiMap(mapName);

final int threads = 10;
final ExecutorService ex = Executors.newFixedThreadPool(threads);
final ExecutorService ex = Executors.newVirtualThreadPerTaskExecutor();
final CountDownLatch latch = new CountDownLatch(threads);
final AtomicReference<Throwable> error = new AtomicReference<>(null);

Original file line number Diff line number Diff line change
@@ -78,8 +78,8 @@ public void tearDown() {
public void testCommitConcurrently() {
int count = 10000;
String name = randomString();
ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorServiceForCommit = Executors.newFixedThreadPool(5);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
ExecutorService executorServiceForCommit = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < count; i++) {
XATransactionRunnable runnable = new XATransactionRunnable(xaResource, name, executorServiceForCommit, i);
executorService.execute(runnable);
Original file line number Diff line number Diff line change
@@ -208,7 +208,7 @@ public void testParallel() throws Exception {
// this is needed due to a racy bug in atomikos
txn(client);
int size = 100;
ExecutorService executorService = Executors.newFixedThreadPool(5);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
final CountDownLatch latch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
executorService.execute(() -> {
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@ public void testClientReconnectModeAsyncConnected() throws InterruptedException
@Test(timeout = MINUTE * 10)
public void testClientReconnectModeAsyncConnectedMultipleThreads() {
int numThreads = 10;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

ClientConfig clientConfig = new ClientConfig();
final ClientStateListener listener = new ClientStateListener(clientConfig);
Loading