diff --git a/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java b/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java new file mode 100644 index 0000000..abf6ac7 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java @@ -0,0 +1,245 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Money transfers across bank accounts is a common usecase for a OLTP + * database. Transfers are a commonly used example for discussing + * transactions in databases because of its strong requirements on + * consistency guarantees. + * + * Simulate money transfers. The most important constraint here + * is that the total amount of money across all accounts should remain + * invariant. However, aggregating money across all accounts involves + * a full table scan and this exposes the query to read restarts. + * + * This app helps understand whether the new clockbound clock + * helps improve the performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * + * Setup: + * 1. Create a bank_accounts TABLE with columns (account_id INT, balance INT). + * 2. Insert 1000 accounts with account_id 0 to 999 initialized to 1000. + * + * Workload: + * There are two main operations in this workload: + * a. Transfer: Transfers a random amount money from one account to another. + * The amount must be <= the balance of the source account. + * b. Verify: Verifies that the total amount of money across all accounts + * is 1000 * 1000. + * + * Transfer Operation: + * 1. Pick a sender and a receiver pair at random (they must be different). + * 2. Start a repeatable read transaction. + * 3. Query the account balance of the sender. + * 4. If the balance is zero, abort the transaction. + * 5. Pick a random amount [1, balance]. + * 6. Decrement the balance of the sender by the amount. + * 7. Increment the balance of the receiver by the amount. + * 8. Commit the transaction. + * + * Verify Operation: + * 1. Sum the balances of all accounts. + * 2. Verify that the sum is 1000 * 1000. + */ +public class SqlBankTransfers extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlBankTransfers.class); + + // Static initialization of this app's config. + static { + // Use 1 Verify thread and 10 Transfer threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "bank_accounts"; + + // The number of accounts in the bank. + private static final int NUM_ACCOUNTS = 1000; + + // Initial balance of each account. + private static final int INIT_BALANCE = 1000; + + // Shared counter to store the number of inconsistent reads. + private static final AtomicLong numInconsistentReads = new AtomicLong(0); + + // Connection cache, one per thread. + private static final HashMap connections = new HashMap<>(); + + public Connection getConnection() { + Long id = Thread.currentThread().getId(); + if (!connections.containsKey(id)) { + try { + connections.put(id, getPostgresConnection()); + } catch (Exception e) { + LOG.fatal("Failed to create a postgres connection ", e); + } + } + + return connections.get(id); + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (account_id INT, balance INT)", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), %d", + getTableName(), NUM_ACCOUNTS, INIT_BALANCE)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + // Executes the Verify operation. + @Override + public long doRead() { + Connection connection = getConnection(); + try (Statement statement = connection.createStatement()) { + try { + ResultSet resultSet = statement.executeQuery(String.format( + "SELECT SUM(balance) FROM %s", getTableName())); + if (!resultSet.next()) { + throw new SQLException("No rows returned from sum query"); + } + int totalBalance = resultSet.getInt(1); + + // verify total balance. + if (totalBalance != NUM_ACCOUNTS * INIT_BALANCE) { + LOG.error(String.format("Total balance is %d", totalBalance)); + numInconsistentReads.incrementAndGet(); + } + } catch (Exception e) { + LOG.error("Error verifying balances ", e); + } + } catch (Exception e) { + LOG.error("Error creating a connection ", e); + } + return 1; + } + + // Executes the Transfer operation. + @Override + public long doWrite(int threadIdx) { + // Pick two random distinct accounts. + int sender = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS); + int receiver; + do { + receiver = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS); + } while (receiver == sender); + + Connection connection = getConnection(); + try (Statement statement = connection.createStatement()) { + // Start a repeatable read transaction. + connection.setAutoCommit(false); + connection.setTransactionIsolation( + Connection.TRANSACTION_REPEATABLE_READ); + try { + // Retrieve the balance of the sender. + ResultSet rs = statement.executeQuery(String.format( + "SELECT balance FROM %s WHERE account_id = %d", + getTableName(), sender)); + if (!rs.next()) { + throw new SQLException("No row found for account " + sender); + } + int senderBalance = rs.getInt("balance"); + + // If the sender has no money, abort the transaction. + if (senderBalance <= 0) { + if (senderBalance < 0) { + LOG.error(String.format( + "Sender %d has negative balance %d", sender, senderBalance)); + numInconsistentReads.incrementAndGet(); + } + throw new SQLException("Sender has no money"); + } + + // Pick a random amount to transfer [1, sendBalance]. + int amount = ThreadLocalRandom.current().nextInt(1, senderBalance + 1); + + // Decrement the sender's balance. + statement.executeUpdate(String.format( + "UPDATE %s SET balance = balance - %d WHERE account_id = %d", + getTableName(), amount, sender)); + + // Increment the receiver's balance. + statement.executeUpdate(String.format( + "UPDATE %s SET balance = balance + %d WHERE account_id = %d", + getTableName(), amount, receiver)); + + // Commit the transaction. + connection.commit(); + + // Transfer successful. + return 1; + } catch (Exception e) { + LOG.error("Error transferring money ", e); + connection.rollback(); + return 0; + } + } catch (Exception e) { + LOG.error("Error creating a connection ", e); + return 0; + } + } + + /* + * Appends the number of inconsistent reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Inconsistent reads: ").append( + numInconsistentReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlConsistentHashing.java b/src/main/java/com/yugabyte/sample/apps/SqlConsistentHashing.java new file mode 100644 index 0000000..ddab4a2 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlConsistentHashing.java @@ -0,0 +1,178 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.Statement; + +import java.util.HashMap; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Consistent hashing is useful when you have a dynamic set of nodes and + * you need to send a key-value request to one of the nodes. Consistent + * hashing is great at load balancing without moving too many keys when + * nodes are added or removed. + * + * This app maintains a list of hashes one for each "virtual" node and + * supports two operations: + * a. Config change: Add or remove a node. + * b. Get node: Get the node for a given key. + * + * Config Change Operation: + * 1. At coin flip, choose whether to add or remove a node. + * 2. If adding a node, add a node with a random hash. + * 3. If removing a node, remove a random node. + * + * Get Node Operation: + * 1. Pick a random key. + * 2. Find the node with the smallest hash greater than the key. + * If no such node exists, return the smallest hash node. + */ +public class SqlConsistentHashing extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlConsistentHashing.class); + + // Static initialization of this app's config. + static { + // Use 10 Get Node threads and 10 Config Change threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 10; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "consistent_hashing"; + + // Initial number of nodes. + private static final int INITIAL_NODES = 100000; + + // Connection cache, one per thread. + private static final HashMap connections = new HashMap<>(); + + public Connection getConnection() { + Long id = Thread.currentThread().getId(); + if (!connections.containsKey(id)) { + try { + connections.put(id, getPostgresConnection()); + } catch (Exception e) { + LOG.fatal("Failed to create a postgres connection ", e); + } + } + + return connections.get(id); + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (node_hash INT) SPLIT INTO 24 TABLETS", + getTableName())); + LOG.info("Created table " + getTableName()); + connection.createStatement().execute(String.format( + "INSERT INTO %s" + + " SELECT (RANDOM() * 1000000000)::INT" + + " FROM generate_series(1, %d)", + getTableName(), INITIAL_NODES)); + LOG.info("Inserted " + INITIAL_NODES + " nodes into " + getTableName()); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + @Override + public long doRead() { + Connection connection = getConnection(); + try (Statement statement = connection.createStatement()) { + int key = ThreadLocalRandom.current().nextInt(); + try { + statement.executeQuery(String.format( + "SELECT COALESCE(" + + " (SELECT MIN(node_hash) FROM %s WHERE node_hash > %d)," + + " (SELECT MIN(node_hash) FROM %s)" + + ")", + getTableName(), key, getTableName())); + return 1; + } catch (Exception e) { + LOG.error("Error retrieving node uuid", e); + return 0; + } + } catch (Exception e) { + LOG.error("Error creating a connection ", e); + return 0; + } + } + + @Override + public long doWrite(int threadIdx) { + Connection connection = getConnection(); + try (Statement statement = connection.createStatement()) { + int coinFlip = ThreadLocalRandom.current().nextInt(2); + if (coinFlip == 0) { + return addNode(statement); + } else { + return removeNode(statement); + } + } catch (Exception e) { + LOG.error("Error creating a connection ", e); + return 0; + } + } + + public long addNode(Statement statement) { + try { + int nodeHash = ThreadLocalRandom.current().nextInt(); + statement.executeUpdate(String.format( + "INSERT INTO %s (node_hash) VALUES (%d)", + getTableName(), nodeHash)); + return 1; + } catch (Exception e) { + LOG.error("Error adding a node " + e); + return 0; + } + } + + public long removeNode(Statement statement) { + try { + statement.executeUpdate(String.format( + "DELETE FROM %s WHERE node_hash =" + + " (SELECT node_hash FROM %s ORDER BY RANDOM() LIMIT 1)", + getTableName(), getTableName())); + return 1; + } catch (Exception e) { + LOG.error("Error removing a node " + e); + return 0; + } + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java b/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java new file mode 100644 index 0000000..7e035fb --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java @@ -0,0 +1,212 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Sometimes, applications want to track the number of times a particular event + * has occurred. Examples include user actions like clicks, purchases or + * page views. + * + * This app helps understand whether the new clockbound clock + * helps improve the performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * Not much variance is expected in the metrics. + * + * Setup: + * 1. Create a counters TABLE with columns (event INT, counter INT). + * 2. Insert 1000 counters with event 0 to 999 initialized to zero. + * + * Worklaod: + * We only run write threads, no read threads. + * Each write thread, + * 1. Starts a repeatable read transaction. + * 2. Reads the counter of a random event. + * 3. Verifies that the counter is not stale. + * 4. Increments the counter for the picked event. + * 5. Commits the transaction. + * 6. Updates the latest counter value in the cache. + */ +public class SqlEventCounter extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlEventCounter.class); + + // Static initialization of this app's config. + static { + // Only use 10 writer threads to avoid overloading the system. + // In real life, there are many more threads but there are other + // things to do too. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 0; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "event_counters"; + + // The number of unique events to track. + private static final int NUM_EVENTS = 1000; + + // Contains the latest updated counter indexed by event. + private static final AtomicIntegerArray counters = new AtomicIntegerArray(NUM_EVENTS); + + // Shared counter to store the number of stale reads. + private static final AtomicLong numStaleReads = new AtomicLong(0); + + // Connection cache, one per thread. + private static final HashMap connections = new HashMap<>(); + + public Connection getConnection() { + Long id = Thread.currentThread().getId(); + if (!connections.containsKey(id)) { + try { + connections.put(id, getPostgresConnection()); + } catch (Exception e) { + LOG.fatal("Failed to create a postgres connection ", e); + } + } + + return connections.get(id); + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (event INT, counter INT)", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), 0", + getTableName(), NUM_EVENTS)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + @Override + public long doWrite(int threadIdx) { + // Choose a random event to increment. + int event = ThreadLocalRandom.current().nextInt(NUM_EVENTS); + Connection connection = getConnection(); + + try (Statement statement = connection.createStatement()) { + try { + // Start a repeatable read transaction. + connection.setAutoCommit(false); + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + + // Retrieve the latest counter from the cache. + int cachedCounter = counters.get(event); + + // Fetch the current counter value for the event. + ResultSet rs = statement.executeQuery(String.format( + "SELECT counter FROM %s WHERE event = %d", + getTableName(), event)); + if (!rs.next()) { + throw new SQLException("No row found for event " + event); + } + int counter = rs.getInt("counter"); + + // Increment the counter for the event. + counter += 1; + statement.executeUpdate(String.format( + "UPDATE %s SET counter = %d WHERE event = %d", + getTableName(), counter, event)); + + // Commit the transaction. + connection.commit(); + + // Detect a stale read. + // Fetched counter after increment must be greater + // than the cached counter. Otherwise, the read is stale. + if (!(counter > cachedCounter)) { + numStaleReads.incrementAndGet(); + } + + // Update the counter cache as well. + // + // counters tracks the maximum observed counter for each event. + // This helps detect stale reads. + // The new counter may be the new maximum. + // In this case, update the cache. + // + // If the cached counter is higher than or equal to the + // new counter, the new counter is no longer the maximum. Skip. + // + // If the cached counter is lower than the new counter, + // we update the cache to the new counter. Do this + // only if the cache is still at the old value. Otherwise, + // fetch the new cached value and try again. + // This avoids overwriting a higher cached value with counter. + while (cachedCounter < counter && !counters.compareAndSet( + event, cachedCounter, counter)) { + cachedCounter = counters.get(event); + } + + // Counter incremented successfully. + return 1; + } catch (Exception e) { + LOG.error("Failed to increment the counter for event " + event, e); + connection.rollback(); + return 0; + } + } catch (Exception e) { + LOG.error("Failed to create a connection ", e); + return 0; + } + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlMessageQueue.java b/src/main/java/com/yugabyte/sample/apps/SqlMessageQueue.java new file mode 100644 index 0000000..78d63ac --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlMessageQueue.java @@ -0,0 +1,232 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import java.util.HashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; + +/* + * Message queue is a common usecase in distributed systems. One such + * application disseminates market data mesages to subscribers. + * + * In this app, we simulate a message queue where each message has ticker + * and stock level information. Subscribers subscribe to a ticker and + * accumulate the total stock. Subscribers read messages starting from + * when they last read a message. + * + * This app verifies that the total stock level accumulated by a subscriber + * is at least the stock level published by the publisher. Such a read + * pattern is prone to read restarts because of the constant upates from + * the publisher. + * + * This app helps understand whether the new clockbound clock helps + * with this workload. + * + * Setup: + * 1. Create a message_queue TABLE with columns + * (ticker INT, sequence_number INT, stock_level INT). + * + * Workload: + * There are two operations in this workload. + * a. Publisher: Publishes messages with a random ticker and stock level. + * b. Subscriber: Subscribes to a ticker and accumulates the total stock + * of a random ticker starting from where it left off previously. + * + * Publisher Operation: + * 1. Pick a random ticker. + * 2. Pick a random stock level between 1 and 10. + * 3. Insert the message into the message_queue TABLE with the ticker, + * sequence number and stock level. Sequence number is the max sequence + * number for the ticker + 1 or else 1. + * 4. Increment an atomic structure with the total stock level published. + * + * Subscriber Operation: + * 1. Pick a random ticker. + * 2. Fetch the previous sequence number for the ticker. + * 3. Fetch the stock level view of the publisher. + * 4. Accumulate stock levels for the ticker starting from the previous + * sequence number into an atomic array. + * 5. Verify that the total stock level accumulated is at least the + * stock level published by the publisher. + */ +public class SqlMessageQueue extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlEventCounter.class); + + // Static initialization of this app's config. + static { + // Use 1 Subscriber thread and 10 Publisher threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "message_queue"; + + // The number of tickers registered with the stock exchange. + private static final int NUM_TICKERS = 1000; + + // The maximum stock level for a ticker published in a single message. + private static final int MAX_STOCK_LEVEL = 10; + + // Updated by publishers whenever they publish new stock. + private static final AtomicLongArray totalStockPublished = + new AtomicLongArray(NUM_TICKERS); + + // The last sequence number for each ticker as seen by subscribers. + private static final AtomicLongArray lastSequenceNumber = + new AtomicLongArray(NUM_TICKERS); + + // Accumulated stock levels for each ticker as seen by subscribers. + private static final AtomicLongArray totalStockAccumulated = + new AtomicLongArray(NUM_TICKERS); + + // Number of stale reads. + private static final AtomicLong numStaleReads = new AtomicLong(0); + + // Connection cache, one per thread. + private static final HashMap connections = new HashMap<>(); + + public Connection getConnection() { + Long id = Thread.currentThread().getId(); + if (!connections.containsKey(id)) { + try { + connections.put(id, getPostgresConnection()); + } catch (Exception e) { + LOG.fatal("Failed to create a postgres connection ", e); + } + } + + return connections.get(id); + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (ticker INT, sequence_number INT, stock_level INT)" + + " SPLIT INTO 24 TABLETS", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + @Override + public long doRead() { + int ticker = ThreadLocalRandom.current().nextInt(NUM_TICKERS); + Connection connection = getConnection(); + + try (Statement statement = connection.createStatement()) { + try { + long prevSequenceNumber = lastSequenceNumber.get(ticker); + long stockLevelPublished = totalStockPublished.get(ticker); + + ResultSet resultSet = statement.executeQuery(String.format( + "SELECT MAX(sequence_number), SUM(stock_level)" + + " FROM %s WHERE ticker = %d AND sequence_number > %d", + getTableName(), ticker, prevSequenceNumber)); + if (!resultSet.next()) { + LOG.info("No new entries for ticker " + ticker); + return 0; + } + long maxSequenceNumber = resultSet.getLong(1); + long sumStockLevel = resultSet.getLong(2); + + if (lastSequenceNumber.compareAndSet( + ticker, prevSequenceNumber, maxSequenceNumber)) { + // Accumulate the stock level. + long accumulatedStockLevel = + totalStockAccumulated.addAndGet(ticker, sumStockLevel); + // For some reason, this hasn't caught up with the publisher. + if (accumulatedStockLevel < stockLevelPublished) { + LOG.error("Stale stock level for ticker " + ticker); + numStaleReads.incrementAndGet(); + } + return 1; + } + + // Someone else did the work, do not add to the total. + return 0; + } catch (Exception e) { + LOG.error("Error executing read query", e); + return 0; + } + } catch (Exception e) { + LOG.error("Error creating connection ", e); + return 0; + } + } + + @Override + public long doWrite(int threadIdx) { + int ticker = ThreadLocalRandom.current().nextInt(NUM_TICKERS); + int stockLevel = ThreadLocalRandom.current().nextInt(MAX_STOCK_LEVEL) + 1; + Connection connection = getConnection(); + + try (Statement statement = connection.createStatement()) { + try { + statement.executeUpdate(String.format( + "INSERT INTO %s (ticker, sequence_number, stock_level)" + + " SELECT %d, COALESCE(MAX(sequence_number), 0) + 1, %d" + + " FROM %s WHERE ticker = %d", + getTableName(), ticker, stockLevel, getTableName(), ticker)); + totalStockPublished.addAndGet(ticker, stockLevel); + return 1; + } catch (Exception e) { + LOG.error("Error executing write query", e); + return 0; + } + } catch (Exception e) { + LOG.error("Error creating connection ", e); + return 0; + } + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlWarehouseStock.java b/src/main/java/com/yugabyte/sample/apps/SqlWarehouseStock.java new file mode 100644 index 0000000..44a300c --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlWarehouseStock.java @@ -0,0 +1,200 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * E-commerce is another important usecase for OLTP databases. TPC-C is an + * example benchmark that simulates a simple e-commerce workload. + * + * Simulate restocking items in a warehouse. Customers + * continously place orders that deplete the stock of items. The warehouse + * restocks items whose stock has fallen below 10 items by adding a + * 100 items. The warehouse has 100 items of each type initially. + * There are 1000 types of items. + * + * The restocking operation does a full table scan to find the items that + * have a low stock. This leads to read restart errors. This app helps + * understand whether the new clockbound clock helps improve the + * performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * + * Setup: + * 1. Create a warehouse_stock TABLE with columns (item_id INT, stock INT). + * 2. Insert 100 items with item_id 0 to 999 initialized to 100 stock. + * + * Workload: + * There are two operations in this workload. + * a. NewOrder: Decrement the stock of a random item. + * b. Restock: Restocks items whose stock has fallen below 10 items by + * adding 100 items. + * + * NewOrder Operation: + * 1. Pick a random item_id. + * 2. Decrement the stock of the item only when there is enough stock. + * + * Restock Operation: + * 1. Scan the table, restock by 100 when below 10. + */ +public class SqlWarehouseStock extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlWarehouseStock.class); + + // Static initialization of this app's config. + static { + // Use 1 Restock thread and 100 NewOrder threads. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 100; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "warehouse_stock"; + + // The number of items in the warehouse. + private static final int NUM_ITEMS = 1000; + + // The stock level below which restocking is needed. + private static final int RESTOCK_THRESHOLD = 10; + + // The amount to restock by. + private static final int RESTOCK_AMOUNT = 100; + + // Initial stock. + private static final int INITIAL_STOCK = 100; + + // Shared counter to store the number of restocks required. + private static final AtomicLong numRestocksRequired = new AtomicLong(0); + + // Shared counter to store the number of stale reads. + private static final AtomicLong numStaleReads = new AtomicLong(0); + + // Connection cache, one per thread. + private static final HashMap connections = new HashMap<>(); + + public Connection getConnection() { + Long id = Thread.currentThread().getId(); + if (!connections.containsKey(id)) { + try { + connections.put(id, getPostgresConnection()); + } catch (Exception e) { + LOG.fatal("Failed to create a postgres connection ", e); + } + } + + return connections.get(id); + } + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + Connection connection = getConnection(); + // Every run should start cleanly. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (item_id INT PRIMARY KEY, stock INT)" + + " SPLIT INTO 24 TABLETS", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), %d", + getTableName(), NUM_ITEMS, INITIAL_STOCK)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + // Executes the Restock operation. + @Override + public long doRead() { + Connection connection = getConnection(); + try (Statement statement = connection.createStatement()) { + long restocksRequired = numRestocksRequired.get(); + int numRestocked = statement.executeUpdate(String.format( + "UPDATE %s SET stock = stock + %d WHERE stock < %d", + getTableName(), RESTOCK_AMOUNT, RESTOCK_THRESHOLD)); + if (numRestocked < restocksRequired) { + numStaleReads.incrementAndGet(); + } + numRestocksRequired.addAndGet(-numRestocked); + return 1; + } catch (Exception e) { + LOG.error("Error creating connection ", e); + return 0; + } + } + + // Executes a NewOrder operation. + @Override + public long doWrite(int threadIdx) { + Connection connection = getConnection(); + try (Statement statement = connection.createStatement()) { + int itemId = ThreadLocalRandom.current().nextInt(NUM_ITEMS); + ResultSet rs = statement.executeQuery(String.format( + "UPDATE %s SET stock = stock - 1" + + " WHERE item_id = %d AND stock > 0" + + " RETURNING stock", + getTableName(), itemId)); + if (!rs.next()) { + // No rows updated, return 0. + return 0; + } + int stock = rs.getInt(1); + if (stock < RESTOCK_THRESHOLD) { + numRestocksRequired.incrementAndGet(); + } + return 1; + } catch (Exception e) { + LOG.error("Error creating connection ", e); + return 0; + } + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total reads | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java index 9960a83..c90d033 100644 --- a/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java +++ b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java @@ -34,7 +34,7 @@ public PromMetrics(List nodes) throws IOException { promContactPoints = new ArrayList<>(); for (InetSocketAddress node : nodes) { promContactPoints.add(String.format( - "https://%s:9000/prometheus-metrics", node.getHostString())); + "http://%s:9000/prometheus-metrics", node.getHostString())); } disableSSLVerification(); }