Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fiber refactorings #2

Open
wants to merge 75 commits into
base: java21
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
36db89c
Fiber refactorings
bgprudhomme Aug 1, 2024
bcbc338
Update ShaclSail.java
bgprudhomme Aug 1, 2024
4208e91
Update ShaclSail.java
bgprudhomme Aug 1, 2024
fb0c8b3
Update ShaclSail.java
bgprudhomme Aug 1, 2024
dc3b580
Update ShaclSail.java
bgprudhomme Aug 1, 2024
8d867f7
Update SharedHttpClientSessionManager.java
bgprudhomme Aug 1, 2024
f4c26db
Update LmdbSailStore.java
bgprudhomme Aug 1, 2024
8c12ad4
Format
bgprudhomme Aug 2, 2024
9b33fdd
Update SPARQLOperation.java
bgprudhomme Aug 2, 2024
2ac25ca
Update BaseLockManagerBenchmark.java
bgprudhomme Aug 2, 2024
81dbe68
Update RepositoryPerformance.java
bgprudhomme Aug 2, 2024
2994a2d
Update OverflowBenchmarkConcurrent.java
bgprudhomme Aug 2, 2024
44abd82
Update OverflowBenchmarkConcurrent.java
bgprudhomme Aug 2, 2024
026f1d7
Update RemoteRepositoryTest.java
bgprudhomme Aug 2, 2024
a963c77
Update BaseConcurrentBenchmark.java
bgprudhomme Aug 2, 2024
a0a1351
Update SparqlRepositoryTest.java
bgprudhomme Aug 2, 2024
05e7756
Update ProtocolIT.java
bgprudhomme Aug 2, 2024
63ceb4e
Update ControlledWorkerScheduler.java
bgprudhomme Aug 2, 2024
4e0a445
Update FederationManager.java
bgprudhomme Aug 2, 2024
f65eb74
Update NamingThreadFactory.java
bgprudhomme Aug 2, 2024
87fb81c
Update ControlledWorkerScheduler.java
bgprudhomme Aug 2, 2024
2f9ebe3
Update ProtocolIT.java
bgprudhomme Aug 2, 2024
d8797ca
Format
bgprudhomme Aug 2, 2024
de2fb25
Update pr-verify.yml
bgprudhomme Aug 2, 2024
d87de11
Revert "Update pr-verify.yml"
bgprudhomme Aug 2, 2024
7e6b33a
Merge branch 'java21' into fibers
bgprudhomme Aug 2, 2024
f95fb7a
Update ProtocolIT.java
bgprudhomme Aug 2, 2024
f131acf
Merge branch 'java21' into fibers
bgprudhomme Aug 2, 2024
32e6853
Merge branch 'java21' into fibers
bgprudhomme Aug 3, 2024
3187ee9
Merge branch 'java21' into fibers
bgprudhomme Aug 3, 2024
1868faf
Merge branch 'java21' into fibers
bgprudhomme Aug 4, 2024
2bd6e75
Update ControlledWorkerScheduler.java
bgprudhomme Aug 5, 2024
1ca38e2
Update NamingThreadFactory.java
bgprudhomme Aug 5, 2024
3b5dcd4
Update FederationManager.java
bgprudhomme Aug 5, 2024
b61be8b
Formatting
bgprudhomme Aug 5, 2024
0458da4
Refactorings for additional JMH tests
bgprudhomme Aug 25, 2024
fd7473d
Update MultithreadedTest.java
bgprudhomme Aug 25, 2024
44a3603
Update MultithreadedTest.java
bgprudhomme Aug 26, 2024
b2506db
Update ShaclSail.java
bgprudhomme Aug 26, 2024
a76b83e
Update MultithreadedTest.java
bgprudhomme Aug 26, 2024
57957ef
Revert "Refactorings for additional JMH tests"
bgprudhomme Aug 26, 2024
502354f
Update RepositoryFederatedServiceIntegrationTest.java
bgprudhomme Aug 26, 2024
73fe77e
Update ShaclSail.java
bgprudhomme Aug 26, 2024
fc7d36a
Update ShaclSail.java
bgprudhomme Aug 26, 2024
50cc7a2
Update MinimalContextNowTest.java
bgprudhomme Aug 26, 2024
e5c8414
Update AbstractSailTest.java
bgprudhomme Aug 26, 2024
d89a7ff
Update AbstractGenericLuceneTest.java
bgprudhomme Aug 26, 2024
cb3a57b
Update MemStatementListTestIT.java
bgprudhomme Aug 26, 2024
92bedef
Update MultithreadedTest.java
bgprudhomme Aug 26, 2024
54f938d
Revert "Update MultithreadedTest.java"
bgprudhomme Aug 26, 2024
f9986e6
Update MultithreadedTest.java
bgprudhomme Aug 26, 2024
505257d
Update MultithreadedTest.java
bgprudhomme Aug 26, 2024
bd70621
Update MultithreadedTest.java
bgprudhomme Aug 26, 2024
2319564
Update ShaclSail.java
bgprudhomme Aug 26, 2024
1c248e8
Update MultithreadedTest.java
bgprudhomme Aug 26, 2024
4fffdfc
Update ProtocolIT.java
bgprudhomme Aug 27, 2024
ac834fb
Use semaphore to guard submit
bgprudhomme Dec 3, 2024
ffe1cf3
Delete core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/b…
bgprudhomme Dec 3, 2024
c4e490e
Update ShaclSail.java
bgprudhomme Dec 4, 2024
3455317
Update MultithreadedTest.java
bgprudhomme Dec 4, 2024
811c768
Update MultithreadedTest.java
bgprudhomme Dec 4, 2024
cc63339
Update FederationManager.java
bgprudhomme Dec 4, 2024
cc12250
Update ControlledWorkerScheduler.java
bgprudhomme Dec 4, 2024
f39553a
Update NamingThreadFactory.java
bgprudhomme Dec 4, 2024
1487328
Formatting
Dec 4, 2024
620b83b
More formatting
Dec 4, 2024
ab36f17
Update BaseConcurrentBenchmark.java
bgprudhomme Dec 4, 2024
bae704f
Revert "Update BaseConcurrentBenchmark.java"
Dec 6, 2024
295b836
Fix positioning of semaphore acquisition/guarding
bgprudhomme Dec 6, 2024
707e905
Fix whitespace errors
bgprudhomme Dec 7, 2024
7fdeae2
Readd deleted blank line
bgprudhomme Dec 7, 2024
eee3e3c
Fix whitespace
bgprudhomme Dec 7, 2024
fbc1ccb
Merge branch 'java21' into fibers
khatchad Dec 11, 2024
ff66a47
Merge branch 'java21' into fibers
khatchad Dec 11, 2024
cbb5045
Simplify semaphore acquisition.
khatchad Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public void test9_connectionHandling() throws Exception {
.map(value -> vf.createStatement(iri("s1"), RDFS.LABEL, value))
.collect(Collectors.toList()));

ExecutorService executor = Executors.newFixedThreadPool(5);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved
try {
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,9 @@ public long getRetryInterval() {
*--------------*/

public SharedHttpClientSessionManager() {
final ThreadFactory backingThreadFactory = Executors.defaultThreadFactory();

ExecutorService threadPoolExecutor = Executors.newCachedThreadPool((Runnable runnable) -> {
Thread thread = backingThreadFactory.newThread(runnable);
thread.setName(
String.format("rdf4j-SharedHttpClientSessionManager-%d", threadCount.getAndIncrement()));
thread.setDaemon(true);
return thread;
});

Integer corePoolSize = Integer.getInteger(CORE_POOL_SIZE_PROPERTY, 1);
((ThreadPoolExecutor) threadPoolExecutor).setCorePoolSize(corePoolSize);
ExecutorService threadPoolExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.name("rdf4j-SharedHttpClientSessionManager-", threadCount.getAndIncrement())
.factory());
khatchad marked this conversation as resolved.
Show resolved Hide resolved
this.executor = threadPoolExecutor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testConcurrentAccessToNow() throws ExecutionException, InterruptedEx
int numberOfIterations = 100;
int numberOfThreads = 10;

ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved
try {

for (int i = 0; i < numberOfIterations; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@Deprecated
public abstract class SPARQLOperation implements Operation {

private static final Executor executor = Executors.newCachedThreadPool();
private static final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved

protected HttpClient client;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class BaseLockManagerBenchmark {
public void setUp() {
Logger root = (Logger) LoggerFactory.getLogger(ReadPrefReadWriteLockManager.class.getName());
root.setLevel(ch.qos.logback.classic.Level.ERROR);
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
executorService = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved
}

@TearDown(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testConcurrentAutoInit() throws Exception {
CountDownLatch latch = new CountDownLatch(count);

for (int i = 0; i < count; i++) {
new Thread(new SailGetConnectionTask(subject, latch)).start();
Thread.ofVirtual().start(new SailGetConnectionTask(subject, latch));
khatchad marked this conversation as resolved.
Show resolved Hide resolved
}

if (!latch.await(30, TimeUnit.SECONDS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class LmdbSailStore implements SailStore {

private final ValueStore valueStore;

private final ExecutorService tripleStoreExecutor = Executors.newCachedThreadPool();
private final ExecutorService tripleStoreExecutor = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved
private final CircularBuffer<Operation> opQueue = new CircularBuffer<>(1024);
private volatile Throwable tripleStoreException;
private final AtomicBoolean running = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void manyConcurrentTransactions() throws IOException {
SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper(
new NotifySailWrapper(
new NotifySailWrapper(new NotifySailWrapper(new LmdbStore(temporaryFolder)))))));
ExecutorService executorService = Executors.newFixedThreadPool(10);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved

try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ public void testMultithreadedAdd() throws InterruptedException {
final CountDownLatch endLatch = new CountDownLatch(numThreads);
final Set<Throwable> exceptions = ConcurrentHashMap.newKeySet();
for (int i = 0; i < numThreads; i++) {
new Thread(new Runnable() {
Thread.ofVirtual().start(new Runnable() {

private final long iterationCount = 10 + Math.round(random.nextDouble() * 100);

Expand All @@ -791,7 +791,7 @@ public void run() {
endLatch.countDown();
}
}
}).start();
});
khatchad marked this conversation as resolved.
Show resolved Hide resolved
}
startLatch.countDown();
endLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
package org.eclipse.rdf4j.sail.memory.benchmark;

import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand All @@ -32,6 +34,7 @@ public class BaseConcurrentBenchmark {

Repository repository;
private ExecutorService executorService;
private Semaphore semaphore;

static InputStream getResourceAsStream(String filename) {
return BaseConcurrentBenchmark.class.getClassLoader().getResourceAsStream(filename);
Expand All @@ -42,7 +45,8 @@ public void setup() throws Exception {
if (executorService != null) {
executorService.shutdownNow();
}
executorService = Executors.newFixedThreadPool(8);
executorService = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved
semaphore = new Semaphore(8);
}

@TearDown(Level.Trial)
Expand All @@ -59,16 +63,21 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException {
CountDownLatch latchDone = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
latch.await();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latchDone.countDown();
}
});
semaphore.acquireUninterruptibly();
try {
executorService.submit(() -> {
try {
latch.await();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latchDone.countDown();
}
});
} finally {
semaphore.release();
}
}

latch.countDown();
Expand All @@ -77,7 +86,12 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException {
}

Future<?> submit(Runnable runnable) {
return executorService.submit(runnable);
semaphore.acquireUninterruptibly();
try {
return executorService.submit(runnable);
} finally {
semaphore.release();
}
}

Runnable getRunnable(CountDownLatch startSignal, RepositoryConnection connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void addMultipleThreads() throws ExecutionException, InterruptedException

MemStatementList memStatementList = new MemStatementList();

ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved
try {

List<? extends Future<?>> collect = partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void manyConcurrentTransactions() throws IOException {
SailRepository sailRepository = new SailRepository(new NotifySailWrapper(new NotifySailWrapper(
new NotifySailWrapper(
new NotifySailWrapper(new NotifySailWrapper(new NativeStore(temporaryFolder)))))));
ExecutorService executorService = Executors.newFixedThreadPool(10);
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved

try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ public void setBaseSail(Sail baseSail) {
@Experimental
protected RevivableExecutorService getExecutorService() {
return new RevivableExecutorService(
// Refactoring the below to Executors.newThreadPerTaskExecutor(r -> {
// Thread t = Thread.ofVirtual().factory().newThread(r);
// causes tests to hang forever
// (https://github.com/ponder-lab/rdf4j/actions/runs/10561618648/job/29257740868)
() -> Executors.newFixedThreadPool(AVAILABLE_PROCESSORS,
r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ private void parallelTest(List<List<Transaction>> list, IsolationLevels isolatio

Random r = new Random(52465534);

// Refactoring this to Executors.newVirtualThreadPerTaskExecutor() causes tests to hang forever
// (https://github.com/ponder-lab/rdf4j/actions/runs/10567565368/job/29276778845)
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

try {
Expand Down Expand Up @@ -467,6 +469,8 @@ private void runValidationFailuresTest(Sail sail, IsolationLevels isolationLevel
deadlockDetectionThread.setDaemon(true);
deadlockDetectionThread.start();

// Refactoring this to Executors.newVirtualThreadPerTaskExecutor() causes tests to hang forever
// (https://github.com/ponder-lab/rdf4j/actions/runs/10568156440/job/29278593171)
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

Utils.loadShapeData(repository, "complexBenchmark/shacl.trig");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public FederationManager() {
public void init(FedX federation, FederationContext federationContext) {
this.federation = federation;
this.federationContext = federationContext;
// Refactoring this to Executors.newThreadPerTaskExecutor(new NamingThreadFactory("FedX Executor"));
// and refactoring NamingThreadFactory to use virtual threads causes ServicesTest to hang forever
// (https://github.com/ponder-lab/rdf4j/actions/runs/10239404923/job/28324948852?pr=2)
this.executor = Executors.newCachedThreadPool(new NamingThreadFactory("FedX Executor"));

updateFederationType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public int getNumberOfTasks() {

private ExecutorService createExecutorService() {

bgprudhomme marked this conversation as resolved.
Show resolved Hide resolved
// Refactoring this to ExecutorService executor = Executors.newThreadPerTaskExecutor(new
// NamingThreadFactory(name))
// and refactoring NamingThreadFactory to use virtual threads causes ServicesTest to hang forever
// (https://github.com/ponder-lab/rdf4j/actions/runs/10239404923/job/28324948852?pr=2)
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue,
new NamingThreadFactory(name));
executor.allowCoreThreadTimeOut(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public NamingThreadFactory(String baseName) {

@Override
public Thread newThread(Runnable r) {
// Refactoring this to Thread t = Thread.ofVirtual().name(baseName + "-",
// nextThreadId.incrementAndGet()).unstarted(r);
// causes ServicesTest to hang forever
// (https://github.com/ponder-lab/rdf4j/actions/runs/10239404923/job/28324948852?pr=2)
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName(baseName + "-" + nextThreadId.incrementAndGet());
return t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RemoteRepositoryTest {
*/
public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newFixedThreadPool(30);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hit by own non-JMH performance test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clarify.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not use JMH; the test happens in the main method.


Repository repo = new HTTPRepository("http://10.212.10.29:8081/openrdf-sesame", "drugbank");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private static abstract class PerformanceBase {

private static final int MAX_INSTANCES = Integer.MAX_VALUE;
private static final int N_QUERIES = 100;
private final ExecutorService executor = Executors.newFixedThreadPool(30);
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hit by own non-JMH performance test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clarify.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto the above


private final IRI type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SparqlRepositoryTest {

public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newFixedThreadPool(20);
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hit by own non-JMH performance test


SPARQLRepository repo = new SPARQLRepository("http://dbpedia.org/sparql");
repo.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

public class SummaryServlet extends TransformationServlet {

private final ExecutorService executorService = Executors.newCachedThreadPool();
private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
khatchad marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger LOGGER = LoggerFactory.getLogger(SummaryServlet.class);

Expand Down
Loading