Skip to content

Commit

Permalink
Use semaphore to guard submit
Browse files Browse the repository at this point in the history
  • Loading branch information
bgprudhomme committed Dec 3, 2024
1 parent 4fffdfc commit ac834fb
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand All @@ -32,6 +33,7 @@ public class BaseConcurrentBenchmark {

Repository repository;
private ExecutorService executorService;
private Semaphore semaphore;

static InputStream getResourceAsStream(String filename) {
return BaseConcurrentBenchmark.class.getClassLoader().getResourceAsStream(filename);
Expand All @@ -43,6 +45,7 @@ public void setup() throws Exception {
executorService.shutdownNow();
}
executorService = Executors.newVirtualThreadPerTaskExecutor();
semaphore = new Semaphore(8);
}

@TearDown(Level.Trial)
Expand All @@ -61,8 +64,13 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException {
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
latch.await();
runnable.run();
semaphore.acquire();
try {
latch.await();
runnable.run();
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
Expand All @@ -77,7 +85,18 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException {
}

Future<?> submit(Runnable runnable) {
return executorService.submit(runnable);
return executorService.submit(() -> {
try {
semaphore.acquire();
try {
runnable.run();
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

Runnable getRunnable(CountDownLatch startSignal, RepositoryConnection connection,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*******************************************************************************
* Copyright (c) 2022 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.memory.benchmark;

import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.eclipse.rdf4j.common.transaction.IsolationLevel;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

@State(Scope.Benchmark)
public class BaseConcurrentBenchmark {

Repository repository;
private ExecutorService executorService;

static InputStream getResourceAsStream(String filename) {
return BaseConcurrentBenchmark.class.getClassLoader().getResourceAsStream(filename);
}

@Setup(Level.Trial)
public void setup() throws Exception {
if (executorService != null) {
executorService.shutdownNow();
}
executorService = Executors.new
}

@TearDown(Level.Trial)
public void tearDown() throws Exception {
if (executorService != null) {
executorService.shutdownNow();
executorService = null;
}
}

void threads(int threadCount, Runnable runnable) throws InterruptedException {

CountDownLatch latch = new CountDownLatch(1);
CountDownLatch latchDone = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
latch.await();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latchDone.countDown();
}
});
}

latch.countDown();
latchDone.await();

}

Future<?> submit(Runnable runnable) {
return executorService.submit(runnable);
}

Runnable getRunnable(CountDownLatch startSignal, RepositoryConnection connection,
IsolationLevel isolationLevel, Consumer<RepositoryConnection> workload) {

return () -> {
try {
startSignal.await();
} catch (InterruptedException e) {
throw new IllegalStateException();
}
RepositoryConnection localConnection = connection;
try {
if (localConnection == null) {
localConnection = repository.getConnection();
}

if (isolationLevel == null) {
localConnection.begin();
} else {
localConnection.begin(isolationLevel);
}
workload.accept(localConnection);
localConnection.commit();

} finally {
if (connection == null) {
assert localConnection != null;
localConnection.close();
}
}
};
}

<T> Runnable getRunnable(CountDownLatch startSignal, T inputData,
Consumer<T> workload) {

return () -> {
try {
startSignal.await();
} catch (InterruptedException e) {
throw new IllegalStateException();
}
workload.accept(inputData);
};
}

<T, S> Runnable getRunnable(CountDownLatch startSignal, T inputData1,
S inputData2, BiConsumer<T, S> workload) {

return () -> {
try {
startSignal.await();
} catch (InterruptedException e) {
throw new IllegalStateException();
}
workload.accept(inputData1, inputData2);
};
}

}

0 comments on commit ac834fb

Please sign in to comment.