diff --git a/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java b/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java new file mode 100644 index 0000000..cb8b85f --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java @@ -0,0 +1,302 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Money transfers across bank accounts is a common usecase for a OLTP + * database. Transfers are a commonly used example for discussing + * transactions in databases because of its strong requirements on + * consistency guarantees. + * + * Simulate money transfers. The most important constraint here + * is that the total amount of money across all accounts should remain + * invariant. However, aggregating money across all accounts involves + * a full table scan and this exposes the query to read restarts. + * + * This app helps understand whether the new clockbound clock + * helps improve the performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * + * Setup: + * 1. Create a bank_accounts TABLE with columns (account_id INT, balance INT). + * 2. Insert 1000 accounts with account_id 0 to 999 initialized to 1000. + * + * Workload: + * There are two main operations in this workload: + * a. Transfer: Transfers a random amount money from one account to another. + * The amount must be <= the balance of the source account. + * b. Verify: Verifies that the total amount of money across all accounts + * is 1000 * 1000. + * + * Transfer Operation: + * 1. Pick a sender and a receiver pair at random (they must be different). + * 2. Start a repeatable read transaction. + * 3. Query the account balance of the sender. + * 4. If the balance is zero, abort the transaction. + * 5. Pick a random amount [1, balance]. + * 6. Decrement the balance of the sender by the amount. + * 7. Increment the balance of the receiver by the amount. + * 8. Commit the transaction. + * + * Verify Operation: + * 1. Sum the balances of all accounts. + * 2. Verify that the sum is 1000 * 1000. + */ +public class SqlBankTransfers extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlBankTransfers.class); + + // Static initialization of this app's config. + static { + // Use 1 Verify thread and 10 Transfer threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "bank_accounts"; + + // The number of accounts in the bank. + private static final int NUM_ACCOUNTS = 1000; + + // Initial balance of each account. + private static final int INIT_BALANCE = 1000; + + // Shared counter to store the number of inconsistent reads. + private static final AtomicLong numInconsistentReads = new AtomicLong(0); + + // Cache connection and statements. + private Connection connection = null; + private PreparedStatement preparedTotalBalance = null; + private PreparedStatement preparedFetchSenderBalance = null; + private PreparedStatement preparedDebitSender = null; + private PreparedStatement preparedCreditReceiver = null; + + public Connection getConnection() { + if (connection == null) { + try { + connection = getPostgresConnection(); + } catch (Exception e) { + LOG.fatal("Failed to create a connection ", e); + } + } + return connection; + } + + public PreparedStatement getPreparedTotalBalance() { + if (preparedTotalBalance == null) { + try { + preparedTotalBalance = getConnection().prepareStatement( + String.format("SELECT SUM(balance) FROM %s", getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare total balance statement ", e); + } + } + return preparedTotalBalance; + } + + public PreparedStatement getPreparedFetchSenderBalance() { + if (preparedFetchSenderBalance == null) { + try { + preparedFetchSenderBalance = getConnection().prepareStatement( + String.format("SELECT balance FROM %s WHERE account_id = ?", + getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare fetch sender balance statement ", e); + } + } + return preparedFetchSenderBalance; + } + + public PreparedStatement getPreparedDebitSender() { + if (preparedDebitSender == null) { + try { + preparedDebitSender = getConnection().prepareStatement( + String.format("UPDATE %s SET balance = balance - ? WHERE account_id = ?", + getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare debit sender statement ", e); + } + } + return preparedDebitSender; + } + + public PreparedStatement getPreparedCreditReceiver() { + if (preparedCreditReceiver == null) { + try { + preparedCreditReceiver = getConnection().prepareStatement( + String.format("UPDATE %s SET balance = balance + ? WHERE account_id = ?", + getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare credit receiver statement ", e); + } + } + return preparedCreditReceiver; + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (account_id INT, balance INT)", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), %d", + getTableName(), NUM_ACCOUNTS, INIT_BALANCE)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + // Executes the Verify operation. + @Override + public long doRead() { + try { + PreparedStatement preparedTotalBalance = getPreparedTotalBalance(); + ResultSet resultSet = preparedTotalBalance.executeQuery(); + if (!resultSet.next()) { + throw new SQLException("No rows returned from sum query"); + } + int totalBalance = resultSet.getInt(1); + + // verify total balance. + if (totalBalance != NUM_ACCOUNTS * INIT_BALANCE) { + LOG.error(String.format("Total balance is %d", totalBalance)); + numInconsistentReads.incrementAndGet(); + } + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error verifying balances ", e); + } + return 0; + } + return 1; + } + + // Executes the Transfer operation. + @Override + public long doWrite(int threadIdx) { + // Pick two random distinct accounts. + int sender = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS); + int receiver; + do { + receiver = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS); + } while (receiver == sender); + + Connection connection = getConnection(); + try { + // Start a repeatable read transaction. + connection.setAutoCommit(false); + connection.setTransactionIsolation( + Connection.TRANSACTION_REPEATABLE_READ); + + // Retrieve the balance of the sender. + PreparedStatement preparedFetchSenderBalance = + getPreparedFetchSenderBalance(); + preparedFetchSenderBalance.setInt(1, sender); + ResultSet rs = preparedFetchSenderBalance.executeQuery(); + if (!rs.next()) { + throw new SQLException("No row found for account " + sender); + } + int senderBalance = rs.getInt("balance"); + + // If the sender has no money, abort the transaction. + if (senderBalance <= 0) { + if (senderBalance < 0) { + LOG.error(String.format( + "Sender %d has negative balance %d", sender, senderBalance)); + numInconsistentReads.incrementAndGet(); + } + throw new SQLException("Sender has no money"); + } + + // Pick a random amount to transfer [1, sendBalance]. + int amount = ThreadLocalRandom.current().nextInt(1, senderBalance + 1); + + // Decrement the sender's balance. + PreparedStatement preparedDebitSender = getPreparedDebitSender(); + preparedDebitSender.setInt(1, amount); + preparedDebitSender.setInt(2, sender); + preparedDebitSender.executeUpdate(); + + // Increment the receiver's balance. + PreparedStatement preparedCreditReceiver = getPreparedCreditReceiver(); + preparedCreditReceiver.setInt(1, amount); + preparedCreditReceiver.setInt(2, receiver); + preparedCreditReceiver.executeUpdate(); + + // Commit the transaction. + connection.commit(); + + // Transfer successful. + return 1; + } catch (Exception e) { + try { + connection.rollback(); + } catch (Exception e1) { + LOG.fatal("Error rolling back transaction ", e1); + } + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error transferring money ", e); + } + return 0; + } + } + + /* + * Appends the number of inconsistent reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Inconsistent reads: ").append( + numInconsistentReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlConsistentHashing.java b/src/main/java/com/yugabyte/sample/apps/SqlConsistentHashing.java new file mode 100644 index 0000000..2753904 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlConsistentHashing.java @@ -0,0 +1,213 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Consistent hashing is useful when you have a dynamic set of nodes and + * you need to send a key-value request to one of the nodes. Consistent + * hashing is great at load balancing without moving too many keys when + * nodes are added or removed. + * + * This app maintains a list of hashes one for each "virtual" node and + * supports two operations: + * a. Config change: Add or remove a node. + * b. Get node: Get the node for a given key. + * + * Config Change Operation: + * 1. At coin flip, choose whether to add or remove a node. + * 2. If adding a node, add a node with a random hash. + * 3. If removing a node, remove a random node. + * + * Get Node Operation: + * 1. Pick a random key. + * 2. Find the node with the smallest hash greater than the key. + * If no such node exists, return the smallest hash node. + */ +public class SqlConsistentHashing extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlConsistentHashing.class); + + // Static initialization of this app's config. + static { + // Use 10 Get Node threads and 10 Config Change threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 10; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "consistent_hashing"; + + // Initial number of nodes. + private static final int INITIAL_NODES = 100000; + + // Cache connection and statements. + private Connection connection = null; + private PreparedStatement preparedAddNode = null; + private PreparedStatement preparedRemoveNode = null; + private PreparedStatement preparedGetNode = null; + + public Connection getConnection() { + if (connection == null) { + try { + connection = getPostgresConnection(); + } catch (Exception e) { + LOG.fatal("Failed to create a connection ", e); + } + } + return connection; + } + + public PreparedStatement prepareAddNode() { + if (preparedAddNode == null) { + try { + preparedAddNode = getConnection().prepareStatement( + String.format("INSERT INTO %s (node_hash) VALUES (?)", getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare add node statement ", e); + } + } + return preparedAddNode; + } + + public PreparedStatement prepareRemoveNode() { + if (preparedRemoveNode == null) { + try { + preparedRemoveNode = getConnection().prepareStatement( + String.format("DELETE FROM %s WHERE node_hash =" + + " (SELECT node_hash FROM %s ORDER BY RANDOM() LIMIT 1)", + getTableName(), getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare remove node statement ", e); + } + } + return preparedRemoveNode; + } + + public PreparedStatement prepareGetNode() { + if (preparedGetNode == null) { + try { + preparedGetNode = getConnection().prepareStatement( + String.format("SELECT COALESCE(" + + " (SELECT MIN(node_hash) FROM %s WHERE node_hash > ?)," + + " (SELECT MIN(node_hash) FROM %s)" + + ")", + getTableName(), getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare get node statement ", e); + } + } + return preparedGetNode; + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (node_hash INT) SPLIT INTO 24 TABLETS", + getTableName())); + LOG.info("Created table " + getTableName()); + connection.createStatement().execute(String.format( + "INSERT INTO %s" + + " SELECT (RANDOM() * 1000000000)::INT" + + " FROM generate_series(1, %d)", + getTableName(), INITIAL_NODES)); + LOG.info("Inserted " + INITIAL_NODES + " nodes into " + getTableName()); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + @Override + public long doRead() { + int key = ThreadLocalRandom.current().nextInt(); + try { + PreparedStatement preparedGetNode = prepareGetNode(); + preparedGetNode.setInt(1, key); + preparedGetNode.executeQuery(); + return 1; + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error retrieving node uuid", e); + } + return 0; + } + } + + @Override + public long doWrite(int threadIdx) { + int coinFlip = ThreadLocalRandom.current().nextInt(2); + if (coinFlip == 0) { + return addNode(); + } else { + return removeNode(); + } + } + + public long addNode() { + try { + int nodeHash = ThreadLocalRandom.current().nextInt(); + PreparedStatement preparedAddNode = prepareAddNode(); + preparedAddNode.setInt(1, nodeHash); + preparedAddNode.executeUpdate(); + return 1; + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error adding a node " + e); + } + return 0; + } + } + + public long removeNode() { + try { + PreparedStatement preparedRemoveNode = prepareRemoveNode(); + preparedRemoveNode.executeUpdate(); + return 1; + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error removing a node " + e); + } + return 0; + } + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java b/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java new file mode 100644 index 0000000..a04a196 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java @@ -0,0 +1,241 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Sometimes, applications want to track the number of times a particular event + * has occurred. Examples include user actions like clicks, purchases or + * page views. + * + * This app helps understand whether the new clockbound clock + * helps improve the performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * Not much variance is expected in the metrics. + * + * Setup: + * 1. Create a counters TABLE with columns (event INT, counter INT). + * 2. Insert 1000 counters with event 0 to 999 initialized to zero. + * + * Worklaod: + * We only run write threads, no read threads. + * Each write thread, + * 1. Starts a repeatable read transaction. + * 2. Reads the counter of a random event. + * 3. Verifies that the counter is not stale. + * 4. Increments the counter for the picked event. + * 5. Commits the transaction. + * 6. Updates the latest counter value in the cache. + */ +public class SqlEventCounter extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlEventCounter.class); + + // Static initialization of this app's config. + static { + // Only use 10 writer threads to avoid overloading the system. + // In real life, there are many more threads but there are other + // things to do too. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 0; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "event_counters"; + + // The number of unique events to track. + private static final int NUM_EVENTS = 1000; + + // Contains the latest updated counter indexed by event. + private static final AtomicIntegerArray counters = new AtomicIntegerArray(NUM_EVENTS); + + // Shared counter to store the number of stale reads. + private static final AtomicLong numStaleReads = new AtomicLong(0); + + // Cache connection and statements. + private Connection connection = null; + private PreparedStatement preparedCounterFetch = null; + private PreparedStatement preparedCounterIncrement = null; + + public Connection getConnection() { + if (connection == null) { + try { + connection = getPostgresConnection(); + } catch (Exception e) { + LOG.fatal("Failed to create a connection ", e); + } + } + return connection; + } + + public PreparedStatement getPreparedCounterFetch() { + if (preparedCounterFetch == null) { + try { + preparedCounterFetch = getConnection().prepareStatement( + String.format("SELECT counter FROM %s WHERE event = ?", + getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare statement: SELECT counter FROM " + getTableName(), e); + } + } + return preparedCounterFetch; + } + + public PreparedStatement getPreparedCounterIncrement() { + if (preparedCounterIncrement == null) { + try { + preparedCounterIncrement = getConnection().prepareStatement( + String.format("UPDATE %s SET counter = ? WHERE event = ?", + getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare statement: UPDATE " + getTableName(), e); + } + } + return preparedCounterIncrement; + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (event INT, counter INT)", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), 0", + getTableName(), NUM_EVENTS)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + @Override + public long doWrite(int threadIdx) { + // Choose a random event to increment. + int event = ThreadLocalRandom.current().nextInt(NUM_EVENTS); + Connection connection = getConnection(); + + try { + // Start a repeatable read transaction. + connection.setAutoCommit(false); + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + + // Retrieve the latest counter from the cache. + int cachedCounter = counters.get(event); + + // Fetch the current counter value for the event. + PreparedStatement preparedCounterFetch = getPreparedCounterFetch(); + preparedCounterFetch.setInt(1, event); + ResultSet rs = preparedCounterFetch.executeQuery(); + if (!rs.next()) { + throw new SQLException("No row found for event " + event); + } + int counter = rs.getInt("counter"); + + // Increment the counter for the event. + counter += 1; + PreparedStatement preparedCounterIncrement = + getPreparedCounterIncrement(); + preparedCounterIncrement.setInt(1, counter); + preparedCounterIncrement.setInt(2, event); + preparedCounterIncrement.executeUpdate(); + + // Commit the transaction. + connection.commit(); + + // Detect a stale read. + // Fetched counter after increment must be greater + // than the cached counter. Otherwise, the read is stale. + if (!(counter > cachedCounter)) { + numStaleReads.incrementAndGet(); + } + + // Update the counter cache as well. + // + // counters tracks the maximum observed counter for each event. + // This helps detect stale reads. + // The new counter may be the new maximum. + // In this case, update the cache. + // + // If the cached counter is higher than or equal to the + // new counter, the new counter is no longer the maximum. Skip. + // + // If the cached counter is lower than the new counter, + // we update the cache to the new counter. Do this + // only if the cache is still at the old value. Otherwise, + // fetch the new cached value and try again. + // This avoids overwriting a higher cached value with counter. + while (cachedCounter < counter && !counters.compareAndSet( + event, cachedCounter, counter)) { + cachedCounter = counters.get(event); + } + + // Counter incremented successfully. + return 1; + } catch (Exception e) { + try { + connection.rollback(); + } catch (SQLException e1) { + LOG.fatal("Failed to rollback transaction", e1); + } + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Failed to increment the counter for event " + event, e); + } + return 0; + } + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlMessageQueue.java b/src/main/java/com/yugabyte/sample/apps/SqlMessageQueue.java new file mode 100644 index 0000000..0a432c7 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlMessageQueue.java @@ -0,0 +1,254 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; + +/* + * Message queue is a common usecase in distributed systems. One such + * application disseminates market data mesages to subscribers. + * + * In this app, we simulate a message queue where each message has ticker + * and stock level information. Subscribers subscribe to a ticker and + * accumulate the total stock. Subscribers read messages starting from + * when they last read a message. + * + * This app verifies that the total stock level accumulated by a subscriber + * is at least the stock level published by the publisher. Such a read + * pattern is prone to read restarts because of the constant upates from + * the publisher. + * + * This app helps understand whether the new clockbound clock helps + * with this workload. + * + * Setup: + * 1. Create a message_queue TABLE with columns + * (ticker INT, sequence_number INT, stock_level INT). + * + * Workload: + * There are two operations in this workload. + * a. Publisher: Publishes messages with a random ticker and stock level. + * b. Subscriber: Subscribes to a ticker and accumulates the total stock + * of a random ticker starting from where it left off previously. + * + * Publisher Operation: + * 1. Pick a random ticker. + * 2. Pick a random stock level between 1 and 10. + * 3. Insert the message into the message_queue TABLE with the ticker, + * sequence number and stock level. Sequence number is the max sequence + * number for the ticker + 1 or else 1. + * 4. Increment an atomic structure with the total stock level published. + * + * Subscriber Operation: + * 1. Pick a random ticker. + * 2. Fetch the previous sequence number for the ticker. + * 3. Fetch the stock level view of the publisher. + * 4. Accumulate stock levels for the ticker starting from the previous + * sequence number into an atomic array. + * 5. Verify that the total stock level accumulated is at least the + * stock level published by the publisher. + */ +public class SqlMessageQueue extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlEventCounter.class); + + // Static initialization of this app's config. + static { + // Use 1 Subscriber thread and 10 Publisher threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "message_queue"; + + // The number of tickers registered with the stock exchange. + private static final int NUM_TICKERS = 1000; + + // The maximum stock level for a ticker published in a single message. + private static final int MAX_STOCK_LEVEL = 10; + + // Updated by publishers whenever they publish new stock. + private static final AtomicLongArray totalStockPublished = + new AtomicLongArray(NUM_TICKERS); + + // The last sequence number for each ticker as seen by subscribers. + private static final AtomicLongArray lastSequenceNumber = + new AtomicLongArray(NUM_TICKERS); + + // Accumulated stock levels for each ticker as seen by subscribers. + private static final AtomicLongArray totalStockAccumulated = + new AtomicLongArray(NUM_TICKERS); + + // Number of stale reads. + private static final AtomicLong numStaleReads = new AtomicLong(0); + + // Cache connection and statements. + private Connection connection = null; + private PreparedStatement preparedSubscriber = null; + private PreparedStatement preparedPublisher = null; + + public Connection getConnection() { + if (connection == null) { + try { + connection = getPostgresConnection(); + } catch (Exception e) { + LOG.fatal("Failed to create a connection ", e); + } + } + return connection; + } + + public PreparedStatement getPreparedSubscriber() { + if (preparedSubscriber == null) { + try { + preparedSubscriber = getConnection().prepareStatement( + String.format("SELECT MAX(sequence_number), SUM(stock_level)" + + " FROM %s WHERE ticker = ? AND sequence_number > ?", + getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare statement: SELECT MAX(sequence_number), SUM(stock_level)" + + " FROM " + getTableName(), e); + } + } + return preparedSubscriber; + } + + public PreparedStatement getPreparedPublisher() { + if (preparedPublisher == null) { + try { + preparedPublisher = getConnection().prepareStatement( + String.format("INSERT INTO %s (ticker, sequence_number, stock_level)" + + " SELECT ?, COALESCE(MAX(sequence_number), 0) + 1, ?" + + " FROM %s WHERE ticker = ?", + getTableName(), getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare statement: INSERT INTO " + getTableName(), e); + } + } + return preparedPublisher; + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (ticker INT, sequence_number INT, stock_level INT)" + + " SPLIT INTO 24 TABLETS", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + @Override + public long doRead() { + int ticker = ThreadLocalRandom.current().nextInt(NUM_TICKERS); + + try { + long prevSequenceNumber = lastSequenceNumber.get(ticker); + long stockLevelPublished = totalStockPublished.get(ticker); + + PreparedStatement preparedSubcriber = getPreparedSubscriber(); + preparedSubcriber.setInt(1, ticker); + preparedSubcriber.setLong(2, prevSequenceNumber); + ResultSet resultSet = preparedSubcriber.executeQuery(); + if (!resultSet.next()) { + LOG.info("No new entries for ticker " + ticker); + return 0; + } + long maxSequenceNumber = resultSet.getLong(1); + long sumStockLevel = resultSet.getLong(2); + + if (lastSequenceNumber.compareAndSet( + ticker, prevSequenceNumber, maxSequenceNumber)) { + // Accumulate the stock level. + long accumulatedStockLevel = + totalStockAccumulated.addAndGet(ticker, sumStockLevel); + // For some reason, this hasn't caught up with the publisher. + if (accumulatedStockLevel < stockLevelPublished) { + LOG.error("Stale stock level for ticker " + ticker); + numStaleReads.incrementAndGet(); + } + return 1; + } + + // Someone else did the work, do not add to the total. + return 0; + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) + LOG.error("Error executing read query", e); + return 0; + } + } + + @Override + public long doWrite(int threadIdx) { + int ticker = ThreadLocalRandom.current().nextInt(NUM_TICKERS); + int stockLevel = ThreadLocalRandom.current().nextInt(MAX_STOCK_LEVEL) + 1; + + try { + PreparedStatement preparedPublisher = getPreparedPublisher(); + preparedPublisher.setInt(1, ticker); + preparedPublisher.setInt(2, stockLevel); + preparedPublisher.setInt(3, ticker); + preparedPublisher.executeUpdate(); + totalStockPublished.addAndGet(ticker, stockLevel); + return 1; + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error publishing message ", e); + } + return 0; + } + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlWarehouseStock.java b/src/main/java/com/yugabyte/sample/apps/SqlWarehouseStock.java new file mode 100644 index 0000000..23f3888 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlWarehouseStock.java @@ -0,0 +1,228 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * E-commerce is another important usecase for OLTP databases. TPC-C is an + * example benchmark that simulates a simple e-commerce workload. + * + * Simulate restocking items in a warehouse. Customers + * continously place orders that deplete the stock of items. The warehouse + * restocks items whose stock has fallen below 10 items by adding a + * 100 items. The warehouse has 100 items of each type initially. + * There are 1000 types of items. + * + * The restocking operation does a full table scan to find the items that + * have a low stock. This leads to read restart errors. This app helps + * understand whether the new clockbound clock helps improve the + * performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * + * Setup: + * 1. Create a warehouse_stock TABLE with columns (item_id INT, stock INT). + * 2. Insert 100 items with item_id 0 to 999 initialized to 100 stock. + * + * Workload: + * There are two operations in this workload. + * a. NewOrder: Decrement the stock of a random item. + * b. Restock: Restocks items whose stock has fallen below 10 items by + * adding 100 items. + * + * NewOrder Operation: + * 1. Pick a random item_id. + * 2. Decrement the stock of the item only when there is enough stock. + * + * Restock Operation: + * 1. Scan the table, restock by 100 when below 10. + */ +public class SqlWarehouseStock extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlWarehouseStock.class); + + // Static initialization of this app's config. + static { + // Use 1 Restock thread and 100 NewOrder threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 100; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "warehouse_stock"; + + // The number of items in the warehouse. + private static final int NUM_ITEMS = 1000; + + // The stock level below which restocking is needed. + private static final int RESTOCK_THRESHOLD = 10; + + // The amount to restock by. + private static final int RESTOCK_AMOUNT = 100; + + // Initial stock. + private static final int INITIAL_STOCK = 100; + + // Shared counter to store the number of restocks required. + private static final AtomicLong numRestocksRequired = new AtomicLong(0); + + // Shared counter to store the number of stale reads. + private static final AtomicLong numStaleReads = new AtomicLong(0); + + // Cache connection and statements. + private Connection connection = null; + private PreparedStatement preparedRestock = null; + private PreparedStatement preparedNewOrder = null; + + public Connection getConnection() { + if (connection == null) { + try { + connection = getPostgresConnection(); + } catch (Exception e) { + LOG.fatal("Failed to create a connection ", e); + } + } + return connection; + } + + public PreparedStatement getPreparedRestock() { + if (preparedRestock == null) { + try { + preparedRestock = getConnection().prepareStatement( + String.format("UPDATE %s SET stock = stock + %d WHERE stock < %d", + getTableName(), RESTOCK_AMOUNT, RESTOCK_THRESHOLD)); + } catch (Exception e) { + LOG.fatal("Failed to prepare statement: UPDATE " + getTableName(), e); + } + } + return preparedRestock; + } + + public PreparedStatement getPreparedNewOrder() { + if (preparedNewOrder == null) { + try { + preparedNewOrder = getConnection().prepareStatement( + String.format("UPDATE %s SET stock = stock - 1" + + " WHERE item_id = ? AND stock > 0" + + " RETURNING stock", + getTableName())); + } catch (Exception e) { + LOG.fatal("Failed to prepare statement: UPDATE " + getTableName(), e); + } + } + return preparedNewOrder; + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (item_id INT PRIMARY KEY, stock INT)" + + " SPLIT INTO 24 TABLETS", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), %d", + getTableName(), NUM_ITEMS, INITIAL_STOCK)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + // Executes the Restock operation. + @Override + public long doRead() { + try { + long restocksRequired = numRestocksRequired.get(); + PreparedStatement preparedRestock = getPreparedRestock(); + int numRestocked = preparedRestock.executeUpdate(); + if (numRestocked < restocksRequired) { + numStaleReads.incrementAndGet(); + } + numRestocksRequired.addAndGet(-numRestocked); + return 1; + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error restocking ", e); + } + return 0; + } + } + + // Executes a NewOrder operation. + @Override + public long doWrite(int threadIdx) { + try { + int itemId = ThreadLocalRandom.current().nextInt(NUM_ITEMS); + PreparedStatement preparedNewOrder = getPreparedNewOrder(); + preparedNewOrder.setInt(1, itemId); + ResultSet rs = preparedNewOrder.executeQuery(); + if (!rs.next()) { + // No rows updated, return 0. + return 0; + } + int stock = rs.getInt(1); + if (stock < RESTOCK_THRESHOLD) { + numRestocksRequired.incrementAndGet(); + } + return 1; + } catch (Exception e) { + // Suppress this error for readability. + if (!e.getMessage().contains("Restart read required")) { + LOG.error("Error creating a new order ", e); + } + return 0; + } + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java index 9960a83..c90d033 100644 --- a/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java +++ b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java @@ -34,7 +34,7 @@ public PromMetrics(List nodes) throws IOException { promContactPoints = new ArrayList<>(); for (InetSocketAddress node : nodes) { promContactPoints.add(String.format( - "https://%s:9000/prometheus-metrics", node.getHostString())); + "http://%s:9000/prometheus-metrics", node.getHostString())); } disableSSLVerification(); }