diff --git a/java/yb-loadtester/pom.xml b/java/yb-loadtester/pom.xml
index d892d0134829..304a6271351b 100644
--- a/java/yb-loadtester/pom.xml
+++ b/java/yb-loadtester/pom.xml
@@ -87,6 +87,11 @@
com.google.code.gson
gson
+
+ org.postgresql
+ postgresql
+ 42.2.2
+
diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/Main.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/Main.java
index f165da987c24..4baa59b2f092 100644
--- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/Main.java
+++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/Main.java
@@ -141,7 +141,7 @@ public void terminate() {
app.terminate();
}
- public void run() {
+ public void run() throws Exception {
// Disable extended peer check, to ensure "SELECT * FROM system.peers" works without
// all columns.
System.setProperty("com.datastax.driver.EXTENDED_PEER_CHECK", "false");
diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java
index 7dce0a223ef7..1fb396e72446 100644
--- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java
+++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java
@@ -16,6 +16,9 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -98,6 +101,9 @@ public abstract class AppBase implements MetricsTracker.StatusMessageAppender {
// Keyspace name.
public static String keyspace = "ybdemo_keyspace";
+ // Postgres database name.
+ public static String postgres_database = "ybdemo_database";
+
//////////// Helper methods to return the client objects (Redis, Cassandra, etc). ////////////////
/**
@@ -114,6 +120,19 @@ protected Session getCassandraClient() {
return cassandra_session;
}
+ protected Connection getPostgresConnection() throws Exception {
+ Class.forName("org.postgresql.Driver");
+ ContactPoint contactPoint = getRandomContactPoint();
+ Connection connection = DriverManager.getConnection(
+ String.format("jdbc:postgresql://%s:%d/", contactPoint.getHost(),
+ contactPoint.getPort()));
+ connection.createStatement().executeUpdate(
+ String.format("CREATE DATABASE IF NOT EXISTS %s", postgres_database));
+ connection.createStatement().executeUpdate(
+ String.format("USE %s", postgres_database));
+ return connection;
+ }
+
protected static void createKeyspace(Session session, String ks) {
String create_keyspace_stmt = "CREATE KEYSPACE IF NOT EXISTS " + ks +
" WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1};";
@@ -371,7 +390,7 @@ public void initialize(CmdLineOpts configuration) {}
/**
* The apps extending this base should drop all the tables they create when this method is called.
*/
- public void dropTable() {}
+ public void dropTable() throws Exception {}
public void dropCassandraTable(String tableName) {
String drop_stmt = String.format("DROP TABLE IF EXISTS %s;", tableName);
@@ -382,7 +401,7 @@ public void dropCassandraTable(String tableName) {
/**
* The apps extending this base should create all the necessary tables in this method.
*/
- public void createTablesIfNeeded() {
+ public void createTablesIfNeeded() throws Exception {
for (String create_stmt : getCreateTableStatements()) {
Session session = getCassandraClient();
// consistency level of one to allow cross DC requests.
diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/PostgresqlSecondaryIndex.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/PostgresqlSecondaryIndex.java
new file mode 100644
index 000000000000..93a12b14f697
--- /dev/null
+++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/PostgresqlSecondaryIndex.java
@@ -0,0 +1,184 @@
+// Copyright (c) YugaByte, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+// in compliance with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed under the License
+// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions and limitations
+// under the License.
+//
+package com.yugabyte.sample.apps;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.List;
+
+import com.yugabyte.sample.common.CmdLineOpts;
+import org.apache.log4j.Logger;
+
+import com.yugabyte.sample.common.SimpleLoadGenerator.Key;
+
+/**
+ * This workload writes and reads some random string keys from a postgresql table with a secondary
+ * index on a non-primary-key column. When it reads a key, it queries the key by its associated
+ * value which is indexed.
+ */
+public class PostgresqlSecondaryIndex extends AppBase {
+ private static final Logger LOG = Logger.getLogger(PostgresqlSecondaryIndex.class);
+
+ // Static initialization of this workload's config. These are good defaults for getting a decent
+ // read dominated workload on a reasonably powered machine. Exact IOPS will of course vary
+ // depending on the machine and what resources it has to spare.
+ static {
+ // Disable the read-write percentage.
+ appConfig.readIOPSPercentage = -1;
+ // Set the read and write threads to 1 each.
+ appConfig.numReaderThreads = 24;
+ appConfig.numWriterThreads = 2;
+ // The number of keys to read.
+ appConfig.numKeysToRead = -1;
+ // The number of keys to write. This is the combined total number of inserts and updates.
+ appConfig.numKeysToWrite = -1;
+ // The number of unique keys to write. This determines the number of inserts (as opposed to
+ // updates).
+ appConfig.numUniqueKeysToWrite = NUM_UNIQUE_KEYS;
+ }
+
+ // The default table name to create and use for CRUD ops.
+ private static final String DEFAULT_TABLE_NAME = PostgresqlSecondaryIndex.class.getSimpleName();
+
+ // The shared prepared select statement for fetching the data.
+ private volatile PreparedStatement preparedSelect = null;
+
+ // The shared prepared insert statement for inserting the data.
+ private volatile PreparedStatement preparedInsert = null;
+
+ public PostgresqlSecondaryIndex() {
+ }
+
+ @Override
+ public void dropTable() throws Exception {
+ Connection connection = getPostgresConnection();
+ connection.createStatement().executeUpdate(
+ String.format("DROP TABLE IF EXISTS %s;", getTableName()));
+ LOG.info(String.format("Dropped table: %s", getTableName()));
+ }
+
+ @Override
+ public void createTablesIfNeeded() throws Exception {
+ Connection connection = getPostgresConnection();
+ connection.createStatement().executeUpdate(
+ String.format("CREATE TABLE IF NOT EXISTS %s (k varchar PRIMARY KEY, v varchar);",
+ getTableName()));
+ LOG.info(String.format("Created table: %s", getTableName()));
+ connection.createStatement().executeUpdate(
+ String.format("CREATE INDEX IF NOT EXISTS %s_index ON %s(v);",
+ getTableName(), getTableName()));
+ LOG.info(String.format("Created index on table: %s", getTableName()));
+ }
+
+ public String getTableName() {
+ return appConfig.tableName != null ? appConfig.tableName : DEFAULT_TABLE_NAME;
+ }
+
+ private PreparedStatement getPreparedSelect() throws Exception {
+ if (preparedSelect == null) {
+ preparedSelect = getPostgresConnection().prepareStatement(
+ String.format("SELECT k, v FROM %s WHERE v = ?;", getTableName()));
+ }
+ return preparedSelect;
+ }
+
+ @Override
+ public long doRead() {
+ Key key = getSimpleLoadGenerator().getKeyToRead();
+ if (key == null) {
+ // There are no keys to read yet.
+ return 0;
+ }
+
+ try {
+ PreparedStatement statement = getPreparedSelect();
+ statement.setString(1, key.getValueStr());
+ ResultSet rs = statement.executeQuery();
+ if (!rs.next()) {
+ LOG.fatal("Read key: " + key.getKeyWithHashPrefix() + " expected 1 row in result, got 0");
+ return 0;
+ }
+
+ if (!key.getKeyWithHashPrefix().equals(rs.getString("k"))) {
+ LOG.fatal("Read key: " + key.getKeyWithHashPrefix() + ", got " + rs.getString("k"));
+ }
+ LOG.debug("Read key: " + key.toString());
+
+ if (rs.next()) {
+ LOG.fatal("Read key: " + key.getKeyWithHashPrefix() +
+ " expected 1 row in result, got more than one");
+ return 0;
+ }
+ } catch (Exception e) {
+ LOG.fatal("Failed reading value: " + key.getValueStr(), e);
+ return 0;
+ }
+ return 1;
+ }
+
+ private PreparedStatement getPreparedInsert() throws Exception {
+ if (preparedInsert == null) {
+ Connection connection = getPostgresConnection();
+ preparedInsert = connection.prepareStatement(
+ String.format("INSERT INTO %s (k, v) VALUES (?, ?);", getTableName()));
+ }
+ return preparedInsert;
+ }
+
+ @Override
+ public long doWrite() {
+ Key key = getSimpleLoadGenerator().getKeyToWrite();
+ if (key == null) {
+ return 0;
+ }
+
+ int result = 0;
+ try {
+ PreparedStatement statement = getPreparedInsert();
+ // Prefix hashcode to ensure generated keys are random and not sequential.
+ statement.setString(1, key.getKeyWithHashPrefix());
+ statement.setString(2, key.getValueStr());
+ result = statement.executeUpdate();
+ LOG.debug("Wrote key: " + key.asString() + ", " + key.getValueStr() + ", return code: " +
+ result);
+ getSimpleLoadGenerator().recordWriteSuccess(key);
+ } catch (Exception e) {
+ getSimpleLoadGenerator().recordWriteFailure(key);
+ LOG.fatal("Failed writing key: " + key.asString(), e);
+ }
+ return result;
+ }
+
+ @Override
+ public List getWorkloadDescription() {
+ return Arrays.asList(
+ "Sample key-value app built on postgresql. The app writes out unique string keys",
+ "each with a string value to a postgres table with an index on the value column.",
+ "There are multiple readers and writers that update these keys and read them",
+ "indefinitely, with the readers query the keys by the associated values that are",
+ "indexed. Note that the number of reads and writes to perform can be specified as",
+ "a parameter.");
+ }
+
+ @Override
+ public List getExampleUsageOptions() {
+ return Arrays.asList(
+ "--num_unique_keys " + appConfig.numUniqueKeysToWrite,
+ "--num_reads " + appConfig.numKeysToRead,
+ "--num_writes " + appConfig.numKeysToWrite,
+ "--num_threads_read " + appConfig.numReaderThreads,
+ "--num_threads_write " + appConfig.numWriterThreads);
+ }
+}
diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
index a7361eed4461..aed25f531572 100644
--- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
+++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
@@ -56,6 +56,7 @@ public static enum AppName {
CassandraSparkWordCount,
CassandraSparkKeyValueCopy,
CassandraSecondaryIndex,
+ PostgresqlSecondaryIndex,
RedisKeyValue,
RedisPipelinedKeyValue,
RedisHashPipelined,
diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/SimpleLoadGenerator.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/SimpleLoadGenerator.java
index 46b654724d24..21852a9b5101 100644
--- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/SimpleLoadGenerator.java
+++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/common/SimpleLoadGenerator.java
@@ -13,6 +13,7 @@
package com.yugabyte.sample.common;
+import java.security.MessageDigest;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
@@ -47,6 +48,13 @@ public long asNumber() {
public String asString() { return keyPrefix + ":" + key.toString();
}
+ public String getKeyWithHashPrefix() throws Exception {
+ String k = asString();
+ MessageDigest md = MessageDigest.getInstance("MD5");
+ md.update(k.getBytes());
+ return new String(md.digest()) + ":" + k;
+ }
+
public String getValueStr() {
return ("val:" + key.toString());
}