diff --git a/benchmarks/client-comparisons/pom.xml b/benchmarks/client-comparisons/pom.xml
index bd5ff33611..1146d6f18a 100644
--- a/benchmarks/client-comparisons/pom.xml
+++ b/benchmarks/client-comparisons/pom.xml
@@ -44,6 +44,11 @@
google-cloud-spanner-pgadapter
0.27.2-SNAPSHOT
+
+ io.opentelemetry
+ opentelemetry-sdk-extension-autoconfigure-spi
+ 1.33.0
+
org.apache.commons
commons-lang3
diff --git a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/AbstractBenchmarkRunner.java b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/AbstractBenchmarkRunner.java
index 49c85e23d5..32134b23d8 100644
--- a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/AbstractBenchmarkRunner.java
+++ b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/AbstractBenchmarkRunner.java
@@ -7,13 +7,16 @@
import com.google.cloud.pgadapter.benchmark.config.BenchmarkConfiguration;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +49,9 @@ interface BenchmarkMethod {
this.benchmarkConfiguration = benchmarkConfiguration;
this.benchmarks.put("SelectOneValueAutoCommit", this::benchmarkSelectOneValueAutoCommit);
this.benchmarks.put("SelectOneRowAutoCommit", this::benchmarkSelectOneRowAutoCommit);
- this.benchmarks.put("Select100RowsAutoCommit", this::benchmarkSelect100RowsRowAutoCommit);
+ this.benchmarks.put("Select100RowsAutoCommit", this::benchmarkSelect100RowsAutoCommit);
this.benchmarks.put("SelectOneRowTransaction", this::benchmarkSelectOneRowTransaction);
- this.benchmarks.put("Select100RowsTransaction", this::benchmarkSelect100RowsRowTransaction);
+ this.benchmarks.put("Select10RowsTransaction", this::benchmarkSelect10RowsTransaction);
for (String benchmark : benchmarkConfiguration.getBenchmarks()) {
if (!this.benchmarks.containsKey(benchmark)) {
throw new IllegalArgumentException(
@@ -87,6 +90,18 @@ public void run() {
abstract List loadIdentifiers();
+ String getRandomId() {
+ return identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
+ }
+
+ Object[] getRandomIds(int numIds) {
+ Set result = new HashSet<>(numIds);
+ while (result.size() < numIds) {
+ result.add(getRandomId());
+ }
+ return result.toArray();
+ }
+
abstract String getParameterName(int index);
void benchmarkSelectOneValueAutoCommit(String name, int parallelism) throws Exception {
@@ -95,7 +110,8 @@ void benchmarkSelectOneValueAutoCommit(String name, int parallelism) throws Exce
parallelism,
benchmarkConfiguration.getIterations(),
"select col_varchar from benchmark_all_types where id=" + getParameterName(1),
- true);
+ true,
+ 1);
}
void benchmarkSelectOneRowAutoCommit(String name, int parallelism) throws Exception {
@@ -104,16 +120,12 @@ void benchmarkSelectOneRowAutoCommit(String name, int parallelism) throws Except
parallelism,
benchmarkConfiguration.getIterations(),
"select * from benchmark_all_types where id=" + getParameterName(1),
- true);
+ true,
+ 1);
}
- void benchmarkSelect100RowsRowAutoCommit(String name, int parallelism) throws Exception {
- benchmarkSelect(
- name,
- parallelism,
- benchmarkConfiguration.getIterations() / 10,
- "select * from benchmark_all_types where id>=" + getParameterName(1) + " limit 100",
- true);
+ void benchmarkSelect100RowsAutoCommit(String name, int parallelism) throws Exception {
+ benchmarkSelectNRows(name, parallelism, benchmarkConfiguration.getIterations(), 100, true);
}
void benchmarkSelectOneRowTransaction(String name, int parallelism) throws Exception {
@@ -122,20 +134,35 @@ void benchmarkSelectOneRowTransaction(String name, int parallelism) throws Excep
parallelism,
benchmarkConfiguration.getIterations(),
"select * from benchmark_all_types where id=" + getParameterName(1),
- false);
+ false,
+ 1);
}
- void benchmarkSelect100RowsRowTransaction(String name, int parallelism) throws Exception {
+ void benchmarkSelect10RowsTransaction(String name, int parallelism) throws Exception {
+ benchmarkSelectNRows(name, parallelism, benchmarkConfiguration.getIterations(), 10, false);
+ }
+
+ void benchmarkSelectNRows(
+ String name, int parallelism, int iterations, int numRows, boolean autoCommit)
+ throws Exception {
benchmarkSelect(
name,
parallelism,
- benchmarkConfiguration.getIterations() / 10,
- "select * from benchmark_all_types where id>=" + getParameterName(1) + " limit 100",
- false);
+ iterations,
+ "select * from benchmark_all_types where id in (select * from unnest("
+ + getParameterName(1)
+ + "::text[]))",
+ autoCommit,
+ numRows);
}
void benchmarkSelect(
- String benchmarkName, int parallelism, int iterations, String sql, boolean autoCommit)
+ String benchmarkName,
+ int parallelism,
+ int iterations,
+ String sql,
+ boolean autoCommit,
+ int numRows)
throws Exception {
int totalOperations = parallelism * iterations;
statistics.reset(this.name, benchmarkName, parallelism, totalOperations);
@@ -148,7 +175,7 @@ void benchmarkSelect(
List> futures = new ArrayList<>(parallelism);
for (int task = 0; task < parallelism; task++) {
- futures.add(executor.submit(() -> runQuery(sql, autoCommit, iterations, durations)));
+ futures.add(executor.submit(() -> runQuery(sql, autoCommit, iterations, numRows, durations)));
}
executor.shutdown();
assertTrue(executor.awaitTermination(1L, TimeUnit.HOURS));
@@ -161,5 +188,9 @@ void benchmarkSelect(
}
abstract void runQuery(
- String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue durations);
+ String sql,
+ boolean autoCommit,
+ int iterations,
+ int numRows,
+ ConcurrentLinkedQueue durations);
}
diff --git a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/GapicBenchmarkRunner.java b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/GapicBenchmarkRunner.java
index 38f004d7b3..4243b00736 100644
--- a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/GapicBenchmarkRunner.java
+++ b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/GapicBenchmarkRunner.java
@@ -31,11 +31,13 @@
import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
class GapicBenchmarkRunner extends AbstractBenchmarkRunner {
@@ -118,7 +120,11 @@ String getParameterName(int index) {
@Override
void runQuery(
- String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue durations) {
+ String sql,
+ boolean autoCommit,
+ int iterations,
+ int numRows,
+ ConcurrentLinkedQueue durations) {
try {
Session session =
client.createSession(
@@ -133,16 +139,32 @@ void runQuery(
Thread.sleep(sleepDuration);
}
String id = identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
+ Value paramValue;
+ if (numRows == 1) {
+ paramValue = Value.newBuilder().setStringValue(id).build();
+ } else {
+ paramValue =
+ Value.newBuilder()
+ .setListValue(
+ ListValue.newBuilder()
+ .addAllValues(
+ Arrays.stream(getRandomIds(numRows))
+ .map(
+ element ->
+ Value.newBuilder()
+ .setStringValue(element.toString())
+ .build())
+ .collect(Collectors.toList()))
+ .build())
+ .build();
+ }
Stopwatch watch = Stopwatch.createStarted();
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSession(session.getName())
.setSql(sql)
.setSeqno(1L)
- .setParams(
- Struct.newBuilder()
- .putFields("p1", Value.newBuilder().setStringValue(id).build())
- .build());
+ .setParams(Struct.newBuilder().putFields("p1", paramValue).build());
if (!autoCommit) {
builder.setTransaction(
TransactionSelector.newBuilder()
@@ -156,10 +178,10 @@ void runQuery(
if (spannerConfiguration.isUseStreamingSql()) {
ServerStream stream =
client.executeStreamingSqlCallable().call(builder.build());
- metadata = consumeStream(stream);
+ metadata = consumeStream(stream, numRows);
} else {
ResultSet resultSet = client.executeSql(builder.build());
- consumeResultSet(resultSet);
+ consumeResultSet(resultSet, numRows);
metadata = resultSet.getMetadata();
}
if (!autoCommit) {
@@ -180,7 +202,8 @@ void runQuery(
}
}
- private void consumeResultSet(ResultSet resultSet) {
+ private void consumeResultSet(ResultSet resultSet, int expectedNumRows) {
+ assertEquals(expectedNumRows, resultSet.getRowsList().size());
for (ListValue row : resultSet.getRowsList()) {
int col = 0;
for (Value value : row.getValuesList()) {
@@ -190,18 +213,24 @@ private void consumeResultSet(ResultSet resultSet) {
}
}
- private ResultSetMetadata consumeStream(ServerStream stream) {
+ private ResultSetMetadata consumeStream(
+ ServerStream stream, int expectedNumRows) {
ResultSetMetadata metadata = ResultSetMetadata.getDefaultInstance();
+ int expectedNumCells = -1;
+ int numCells = 0;
for (PartialResultSet partialResultSet : stream) {
if (partialResultSet.hasMetadata()) {
metadata = partialResultSet.getMetadata();
+ expectedNumCells = metadata.getRowType().getFieldsCount() * expectedNumRows;
}
int col = 0;
for (Value value : partialResultSet.getValuesList()) {
assertEquals(value, partialResultSet.getValues(col));
col++;
+ numCells++;
}
}
+ assertEquals(expectedNumCells, numCells);
return metadata;
}
diff --git a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/JdbcBenchmarkRunner.java b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/JdbcBenchmarkRunner.java
index b23b7d34e8..cb6267a1d9 100644
--- a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/JdbcBenchmarkRunner.java
+++ b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/JdbcBenchmarkRunner.java
@@ -57,7 +57,11 @@ String getParameterName(int index) {
@Override
void runQuery(
- String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue durations) {
+ String sql,
+ boolean autoCommit,
+ int iterations,
+ int numRows,
+ ConcurrentLinkedQueue durations) {
try (Connection connection = DriverManager.getConnection(connectionUrl)) {
connection.setAutoCommit(autoCommit);
for (int n = 0; n < iterations; n++) {
@@ -67,11 +71,15 @@ void runQuery(
.nextLong(benchmarkConfiguration.getMaxRandomWait().toMillis());
Thread.sleep(sleepDuration);
}
- String id = identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
Stopwatch watch = Stopwatch.createStarted();
try (PreparedStatement statement = connection.prepareStatement(sql)) {
- statement.setString(1, id);
+ if (numRows == 1) {
+ statement.setString(1, getRandomId());
+ } else {
+ statement.setArray(1, connection.createArrayOf("text", getRandomIds(numRows)));
+ }
try (ResultSet resultSet = statement.executeQuery()) {
+ int rowCount = 0;
while (resultSet.next()) {
for (int col = 1; col <= resultSet.getMetaData().getColumnCount(); col++) {
if (connection.isWrapperFor(CloudSpannerJdbcConnection.class)
@@ -96,7 +104,9 @@ void runQuery(
resultSet.getString(resultSet.getMetaData().getColumnLabel(col)));
}
}
+ rowCount++;
}
+ assertEquals(numRows, rowCount);
}
}
if (!autoCommit) {
diff --git a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/SpannerBenchmarkRunner.java b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/SpannerBenchmarkRunner.java
index 64280fb8d4..dc53c1955a 100644
--- a/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/SpannerBenchmarkRunner.java
+++ b/benchmarks/client-comparisons/src/main/java/com/google/cloud/pgadapter/benchmark/SpannerBenchmarkRunner.java
@@ -21,9 +21,11 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +85,11 @@ String getParameterName(int index) {
@Override
void runQuery(
- String sql, boolean autoCommit, int iterations, ConcurrentLinkedQueue durations) {
+ String sql,
+ boolean autoCommit,
+ int iterations,
+ int numRows,
+ ConcurrentLinkedQueue durations) {
try {
for (int n = 0; n < iterations; n++) {
if (!benchmarkConfiguration.getMaxRandomWait().isZero()) {
@@ -92,12 +98,22 @@ void runQuery(
.nextLong(benchmarkConfiguration.getMaxRandomWait().toMillis());
Thread.sleep(sleepDuration);
}
- String id = identifiers.get(ThreadLocalRandom.current().nextInt(identifiers.size()));
Stopwatch watch = Stopwatch.createStarted();
- Statement statement = Statement.newBuilder(sql).bind("p1").to(id).build();
+ Statement.Builder builder = Statement.newBuilder(sql);
+ if (numRows == 1) {
+ builder.bind("p1").to(getRandomId()).build();
+ } else {
+ builder
+ .bind("p1")
+ .toStringArray(
+ Arrays.stream(getRandomIds(numRows))
+ .map(Object::toString)
+ .collect(Collectors.toList()));
+ }
+ Statement statement = builder.build();
if (autoCommit) {
try (ResultSet resultSet = databaseClient.singleUse().executeQuery(statement)) {
- consumeResultSet(resultSet);
+ consumeResultSet(resultSet, numRows);
}
} else {
databaseClient
@@ -105,7 +121,7 @@ void runQuery(
.run(
transaction -> {
try (ResultSet resultSet = transaction.executeQuery(statement)) {
- consumeResultSet(resultSet);
+ consumeResultSet(resultSet, numRows);
}
return 0L;
});
@@ -118,14 +134,17 @@ void runQuery(
}
}
- private void consumeResultSet(ResultSet resultSet) {
+ private void consumeResultSet(ResultSet resultSet, int expectedNumRows) {
+ int numRows = 0;
while (resultSet.next()) {
for (int col = 0; col < resultSet.getColumnCount(); col++) {
assertEquals(
resultSet.getValue(col),
resultSet.getValue(resultSet.getMetadata().getRowType().getFields(col).getName()));
}
+ numRows++;
}
+ assertEquals(expectedNumRows, numRows);
}
private Spanner createSpanner() throws IOException {
diff --git a/benchmarks/client-comparisons/src/main/resources/application.properties b/benchmarks/client-comparisons/src/main/resources/application.properties
index 9343788883..104e23cfcf 100644
--- a/benchmarks/client-comparisons/src/main/resources/application.properties
+++ b/benchmarks/client-comparisons/src/main/resources/application.properties
@@ -36,7 +36,7 @@ benchmark.parallelism=1,2,4,8,16,24,32,48,64,100,150,200
benchmark.virtual-threads-factor=1
# The benchmarks to run
benchmark.benchmarks=SelectOneValueAutoCommit
- #,SelectOneRowAutoCommit,Select100RowsAutoCommit,SelectOneRowTransaction,Select100RowsTransaction
+ #,SelectOneRowAutoCommit,Select100RowsAutoCommit,SelectOneRowTransaction,Select10RowsTransaction
# --- Possible optimizations for TPC-C --- #
benchmark.use-read-only-transactions=false
diff --git a/pom.xml b/pom.xml
index 74ae9f060d..6f27014f6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,34 @@
+
+ com.google.cloud
+ google-cloud-core-bom
+ 2.29.0
+ pom
+ import
+
+
+ com.google.api
+ gax-bom
+ 2.39.0
+ pom
+ import
+
+
+ io.grpc
+ grpc-bom
+ 1.60.0
+ pom
+ import
+
+
+ com.google.cloud
+ google-cloud-spanner-bom
+ 6.56.1-SNAPSHOT
+ pom
+ import
+
com.google.cloud
libraries-bom
@@ -91,6 +119,41 @@
+
+ com.google.api
+ api-common
+ 2.22.0
+
+
+ com.google.errorprone
+ error_prone_annotations
+ 2.23.0
+
+
+ com.google.auth
+ google-auth-library-oauth2-http
+ 1.21.0
+
+
+ com.google.api.grpc
+ proto-google-iam-v1
+ 1.25.0
+
+
+ com.google.api.grpc
+ proto-google-common-protos
+ 2.30.0
+
+
+ com.google.api.grpc
+ grpc-google-common-protos
+ 2.30.0
+
+
+ com.google.auth
+ google-auth-library-credentials
+ 1.21.0
+
com.google.auto.value
auto-value-annotations