From ac834fbe476e5936bb5e12ff399d0d27e5423846 Mon Sep 17 00:00:00 2001 From: Benjamin Prud'homme Date: Tue, 3 Dec 2024 16:13:58 -0500 Subject: [PATCH] Use semaphore to guard submit --- .../benchmark/BaseConcurrentBenchmark.java | 25 +++- .../BaseConcurrentBenchmark.java.save | 141 ++++++++++++++++++ 2 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java.save diff --git a/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java index dfca939628..28a1284f9e 100644 --- a/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java +++ b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java @@ -15,6 +15,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -32,6 +33,7 @@ public class BaseConcurrentBenchmark { Repository repository; private ExecutorService executorService; + private Semaphore semaphore; static InputStream getResourceAsStream(String filename) { return BaseConcurrentBenchmark.class.getClassLoader().getResourceAsStream(filename); @@ -43,6 +45,7 @@ public void setup() throws Exception { executorService.shutdownNow(); } executorService = Executors.newVirtualThreadPerTaskExecutor(); + semaphore = new Semaphore(8); } @TearDown(Level.Trial) @@ -61,8 +64,13 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException { for (int i = 0; i < threadCount; i++) { executorService.submit(() -> { try { - latch.await(); - runnable.run(); + semaphore.acquire(); + try { + latch.await(); + runnable.run(); + } finally { + semaphore.release(); + } } catch (InterruptedException e) { e.printStackTrace(); } finally { @@ -77,7 +85,18 @@ void threads(int threadCount, Runnable runnable) throws InterruptedException { } Future submit(Runnable runnable) { - return executorService.submit(runnable); + return executorService.submit(() -> { + try { + semaphore.acquire(); + try { + runnable.run(); + } finally { + semaphore.release(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); } Runnable getRunnable(CountDownLatch startSignal, RepositoryConnection connection, diff --git a/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java.save b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java.save new file mode 100644 index 0000000000..fad59516d3 --- /dev/null +++ b/core/sail/memory/src/test/java/org/eclipse/rdf4j/sail/memory/benchmark/BaseConcurrentBenchmark.java.save @@ -0,0 +1,141 @@ +/******************************************************************************* + * Copyright (c) 2022 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.memory.benchmark; + +import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.eclipse.rdf4j.common.transaction.IsolationLevel; +import org.eclipse.rdf4j.repository.Repository; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +@State(Scope.Benchmark) +public class BaseConcurrentBenchmark { + + Repository repository; + private ExecutorService executorService; + + static InputStream getResourceAsStream(String filename) { + return BaseConcurrentBenchmark.class.getClassLoader().getResourceAsStream(filename); + } + + @Setup(Level.Trial) + public void setup() throws Exception { + if (executorService != null) { + executorService.shutdownNow(); + } + executorService = Executors.new + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception { + if (executorService != null) { + executorService.shutdownNow(); + executorService = null; + } + } + + void threads(int threadCount, Runnable runnable) throws InterruptedException { + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latchDone = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executorService.submit(() -> { + try { + latch.await(); + runnable.run(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + latchDone.countDown(); + } + }); + } + + latch.countDown(); + latchDone.await(); + + } + + Future submit(Runnable runnable) { + return executorService.submit(runnable); + } + + Runnable getRunnable(CountDownLatch startSignal, RepositoryConnection connection, + IsolationLevel isolationLevel, Consumer workload) { + + return () -> { + try { + startSignal.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(); + } + RepositoryConnection localConnection = connection; + try { + if (localConnection == null) { + localConnection = repository.getConnection(); + } + + if (isolationLevel == null) { + localConnection.begin(); + } else { + localConnection.begin(isolationLevel); + } + workload.accept(localConnection); + localConnection.commit(); + + } finally { + if (connection == null) { + assert localConnection != null; + localConnection.close(); + } + } + }; + } + + Runnable getRunnable(CountDownLatch startSignal, T inputData, + Consumer workload) { + + return () -> { + try { + startSignal.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(); + } + workload.accept(inputData); + }; + } + + Runnable getRunnable(CountDownLatch startSignal, T inputData1, + S inputData2, BiConsumer workload) { + + return () -> { + try { + startSignal.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(); + } + workload.accept(inputData1, inputData2); + }; + } + +}