Skip to content

Commit

Permalink
Merge branch 'virtual-threads' of github.com:GoogleCloudPlatform/pgad…
Browse files Browse the repository at this point in the history
…apter into virtual-threads
  • Loading branch information
olavloite committed Jan 21, 2024
2 parents 47562d7 + 2a04b52 commit b2e8dfa
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 38 deletions.
5 changes: 5 additions & 0 deletions benchmarks/client-comparisons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
<artifactId>google-cloud-spanner-pgadapter</artifactId>
<version>0.27.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
<version>1.33.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import com.google.cloud.pgadapter.benchmark.config.BenchmarkConfiguration;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -46,9 +49,9 @@ interface BenchmarkMethod {
this.benchmarkConfiguration = benchmarkConfiguration;
this.benchmarks.put("SelectOneValueAutoCommit", this::benchmarkSelectOneValueAutoCommit);
this.benchmarks.put("SelectOneRowAutoCommit", this::benchmarkSelectOneRowAutoCommit);
this.benchmarks.put("Select100RowsAutoCommit", this::benchmarkSelect100RowsRowAutoCommit);
this.benchmarks.put("Select100RowsAutoCommit", this::benchmarkSelect100RowsAutoCommit);
this.benchmarks.put("SelectOneRowTransaction", this::benchmarkSelectOneRowTransaction);
this.benchmarks.put("Select100RowsTransaction", this::benchmarkSelect100RowsRowTransaction);
this.benchmarks.put("Select10RowsTransaction", this::benchmarkSelect10RowsTransaction);
for (String benchmark : benchmarkConfiguration.getBenchmarks()) {
if (!this.benchmarks.containsKey(benchmark)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -87,6 +90,18 @@ public void run() {

abstract List<String> loadIdentifiers();

String getRandomId() {
return identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
}

Object[] getRandomIds(int numIds) {
Set<String> result = new HashSet<>(numIds);
while (result.size() < numIds) {
result.add(getRandomId());
}
return result.toArray();
}

abstract String getParameterName(int index);

void benchmarkSelectOneValueAutoCommit(String name, int parallelism) throws Exception {
Expand All @@ -95,7 +110,8 @@ void benchmarkSelectOneValueAutoCommit(String name, int parallelism) throws Exce
parallelism,
benchmarkConfiguration.getIterations(),
"select col_varchar from benchmark_all_types where id=" + getParameterName(1),
true);
true,
1);
}

void benchmarkSelectOneRowAutoCommit(String name, int parallelism) throws Exception {
Expand All @@ -104,16 +120,12 @@ void benchmarkSelectOneRowAutoCommit(String name, int parallelism) throws Except
parallelism,
benchmarkConfiguration.getIterations(),
"select * from benchmark_all_types where id=" + getParameterName(1),
true);
true,
1);
}

void benchmarkSelect100RowsRowAutoCommit(String name, int parallelism) throws Exception {
benchmarkSelect(
name,
parallelism,
benchmarkConfiguration.getIterations() / 10,
"select * from benchmark_all_types where id>=" + getParameterName(1) + " limit 100",
true);
void benchmarkSelect100RowsAutoCommit(String name, int parallelism) throws Exception {
benchmarkSelectNRows(name, parallelism, benchmarkConfiguration.getIterations(), 100, true);
}

void benchmarkSelectOneRowTransaction(String name, int parallelism) throws Exception {
Expand All @@ -122,20 +134,35 @@ void benchmarkSelectOneRowTransaction(String name, int parallelism) throws Excep
parallelism,
benchmarkConfiguration.getIterations(),
"select * from benchmark_all_types where id=" + getParameterName(1),
false);
false,
1);
}

void benchmarkSelect100RowsRowTransaction(String name, int parallelism) throws Exception {
void benchmarkSelect10RowsTransaction(String name, int parallelism) throws Exception {
benchmarkSelectNRows(name, parallelism, benchmarkConfiguration.getIterations(), 10, false);
}

void benchmarkSelectNRows(
String name, int parallelism, int iterations, int numRows, boolean autoCommit)
throws Exception {
benchmarkSelect(
name,
parallelism,
benchmarkConfiguration.getIterations() / 10,
"select * from benchmark_all_types where id>=" + getParameterName(1) + " limit 100",
false);
iterations,
"select * from benchmark_all_types where id in (select * from unnest("
+ getParameterName(1)
+ "::text[]))",
autoCommit,
numRows);
}

void benchmarkSelect(
String benchmarkName, int parallelism, int iterations, String sql, boolean autoCommit)
String benchmarkName,
int parallelism,
int iterations,
String sql,
boolean autoCommit,
int numRows)
throws Exception {
int totalOperations = parallelism * iterations;
statistics.reset(this.name, benchmarkName, parallelism, totalOperations);
Expand All @@ -148,7 +175,7 @@ void benchmarkSelect(

List<Future<?>> futures = new ArrayList<>(parallelism);
for (int task = 0; task < parallelism; task++) {
futures.add(executor.submit(() -> runQuery(sql, autoCommit, iterations, durations)));
futures.add(executor.submit(() -> runQuery(sql, autoCommit, iterations, numRows, durations)));
}
executor.shutdown();
assertTrue(executor.awaitTermination(1L, TimeUnit.HOURS));
Expand All @@ -161,5 +188,9 @@ void benchmarkSelect(
}

abstract void runQuery(
String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue<Duration> durations);
String sql,
boolean autoCommit,
int iterations,
int numRows,
ConcurrentLinkedQueue<Duration> durations);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

class GapicBenchmarkRunner extends AbstractBenchmarkRunner {

Expand Down Expand Up @@ -118,7 +120,11 @@ String getParameterName(int index) {

@Override
void runQuery(
String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue<Duration> durations) {
String sql,
boolean autoCommit,
int iterations,
int numRows,
ConcurrentLinkedQueue<Duration> durations) {
try {
Session session =
client.createSession(
Expand All @@ -133,16 +139,32 @@ void runQuery(
Thread.sleep(sleepDuration);
}
String id = identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
Value paramValue;
if (numRows == 1) {
paramValue = Value.newBuilder().setStringValue(id).build();
} else {
paramValue =
Value.newBuilder()
.setListValue(
ListValue.newBuilder()
.addAllValues(
Arrays.stream(getRandomIds(numRows))
.map(
element ->
Value.newBuilder()
.setStringValue(element.toString())
.build())
.collect(Collectors.toList()))
.build())
.build();
}
Stopwatch watch = Stopwatch.createStarted();
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSession(session.getName())
.setSql(sql)
.setSeqno(1L)
.setParams(
Struct.newBuilder()
.putFields("p1", Value.newBuilder().setStringValue(id).build())
.build());
.setParams(Struct.newBuilder().putFields("p1", paramValue).build());
if (!autoCommit) {
builder.setTransaction(
TransactionSelector.newBuilder()
Expand All @@ -156,10 +178,10 @@ void runQuery(
if (spannerConfiguration.isUseStreamingSql()) {
ServerStream<PartialResultSet> stream =
client.executeStreamingSqlCallable().call(builder.build());
metadata = consumeStream(stream);
metadata = consumeStream(stream, numRows);
} else {
ResultSet resultSet = client.executeSql(builder.build());
consumeResultSet(resultSet);
consumeResultSet(resultSet, numRows);
metadata = resultSet.getMetadata();
}
if (!autoCommit) {
Expand All @@ -180,7 +202,8 @@ void runQuery(
}
}

private void consumeResultSet(ResultSet resultSet) {
private void consumeResultSet(ResultSet resultSet, int expectedNumRows) {
assertEquals(expectedNumRows, resultSet.getRowsList().size());
for (ListValue row : resultSet.getRowsList()) {
int col = 0;
for (Value value : row.getValuesList()) {
Expand All @@ -190,18 +213,24 @@ private void consumeResultSet(ResultSet resultSet) {
}
}

private ResultSetMetadata consumeStream(ServerStream<PartialResultSet> stream) {
private ResultSetMetadata consumeStream(
ServerStream<PartialResultSet> stream, int expectedNumRows) {
ResultSetMetadata metadata = ResultSetMetadata.getDefaultInstance();
int expectedNumCells = -1;
int numCells = 0;
for (PartialResultSet partialResultSet : stream) {
if (partialResultSet.hasMetadata()) {
metadata = partialResultSet.getMetadata();
expectedNumCells = metadata.getRowType().getFieldsCount() * expectedNumRows;
}
int col = 0;
for (Value value : partialResultSet.getValuesList()) {
assertEquals(value, partialResultSet.getValues(col));
col++;
numCells++;
}
}
assertEquals(expectedNumCells, numCells);
return metadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ String getParameterName(int index) {

@Override
void runQuery(
String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue<Duration> durations) {
String sql,
boolean autoCommit,
int iterations,
int numRows,
ConcurrentLinkedQueue<Duration> durations) {
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
connection.setAutoCommit(autoCommit);
for (int n = 0; n < iterations; n++) {
Expand All @@ -67,11 +71,15 @@ void runQuery(
.nextLong(benchmarkConfiguration.getMaxRandomWait().toMillis());
Thread.sleep(sleepDuration);
}
String id = identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
Stopwatch watch = Stopwatch.createStarted();
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, id);
if (numRows == 1) {
statement.setString(1, getRandomId());
} else {
statement.setArray(1, connection.createArrayOf("text", getRandomIds(numRows)));
}
try (ResultSet resultSet = statement.executeQuery()) {
int rowCount = 0;
while (resultSet.next()) {
for (int col = 1; col <= resultSet.getMetaData().getColumnCount(); col++) {
if (connection.isWrapperFor(CloudSpannerJdbcConnection.class)
Expand All @@ -96,7 +104,9 @@ void runQuery(
resultSet.getString(resultSet.getMetaData().getColumnLabel(col)));
}
}
rowCount++;
}
assertEquals(numRows, rowCount);
}
}
if (!autoCommit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,7 +85,11 @@ String getParameterName(int index) {

@Override
void runQuery(
String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue<Duration> durations) {
String sql,
boolean autoCommit,
int iterations,
int numRows,
ConcurrentLinkedQueue<Duration> durations) {
try {
for (int n = 0; n < iterations; n++) {
if (!benchmarkConfiguration.getMaxRandomWait().isZero()) {
Expand All @@ -92,20 +98,30 @@ void runQuery(
.nextLong(benchmarkConfiguration.getMaxRandomWait().toMillis());
Thread.sleep(sleepDuration);
}
String id = identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
Stopwatch watch = Stopwatch.createStarted();
Statement statement = Statement.newBuilder(sql).bind("p1").to(id).build();
Statement.Builder builder = Statement.newBuilder(sql);
if (numRows == 1) {
builder.bind("p1").to(getRandomId()).build();
} else {
builder
.bind("p1")
.toStringArray(
Arrays.stream(getRandomIds(numRows))
.map(Object::toString)
.collect(Collectors.toList()));
}
Statement statement = builder.build();
if (autoCommit) {
try (ResultSet resultSet = databaseClient.singleUse().executeQuery(statement)) {
consumeResultSet(resultSet);
consumeResultSet(resultSet, numRows);
}
} else {
databaseClient
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet resultSet = transaction.executeQuery(statement)) {
consumeResultSet(resultSet);
consumeResultSet(resultSet, numRows);
}
return 0L;
});
Expand All @@ -118,14 +134,17 @@ void runQuery(
}
}

private void consumeResultSet(ResultSet resultSet) {
private void consumeResultSet(ResultSet resultSet, int expectedNumRows) {
int numRows = 0;
while (resultSet.next()) {
for (int col = 0; col < resultSet.getColumnCount(); col++) {
assertEquals(
resultSet.getValue(col),
resultSet.getValue(resultSet.getMetadata().getRowType().getFields(col).getName()));
}
numRows++;
}
assertEquals(expectedNumRows, numRows);
}

private Spanner createSpanner() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ benchmark.parallelism=1,2,4,8,16,24,32,48,64,100,150,200
benchmark.virtual-threads-factor=1
# The benchmarks to run
benchmark.benchmarks=SelectOneValueAutoCommit
#,SelectOneRowAutoCommit,Select100RowsAutoCommit,SelectOneRowTransaction,Select100RowsTransaction
#,SelectOneRowAutoCommit,Select100RowsAutoCommit,SelectOneRowTransaction,Select10RowsTransaction

# --- Possible optimizations for TPC-C --- #
benchmark.use-read-only-transactions=false
Expand Down
Loading

0 comments on commit b2e8dfa

Please sign in to comment.