diff --git a/README.md b/README.md index 24ea988..540c783 100644 --- a/README.md +++ b/README.md @@ -10,19 +10,19 @@ The workloads here have drivers compatible with the above and emulate a number o ## Running the generator Download the [latest yb-sample-apps](https://github.com/yugabyte/yb-sample-apps/releases/latest) JAR. The command below downloads version 1.4.1. -``` +```console $ wget https://github.com/yugabyte/yb-sample-apps/releases/download/v1.4.1/yb-sample-apps.jar ``` For help, simply run the following: -``` +```console $ java -jar yb-sample-apps.jar --help ``` You should see the set of workloads available in the app. To get details on running any app, just pass the app name as a parameter to the `--help` flag: -``` +```console $ java -jar yb-sample-apps.jar --help CassandraKeyValue 1 [main] INFO com.yugabyte.sample.Main - Starting sample app... Usage and options for workload CassandraKeyValue in YugabyteDB Sample Apps. @@ -57,18 +57,18 @@ You need the following to build: * Maven version 3.3.9 or above To build, simply run the following: -``` +```console $ mvn -DskipTests -DskipDockerBuild package ``` You can find the executable one-jar at the following location: -``` +```console $ ls target/yb-sample-apps.jar target/yb-sample-apps.jar ``` To docker image with the package, simply run the following: -``` +```console $ mvn package ``` diff --git a/src/main/java/com/yugabyte/sample/Main.java b/src/main/java/com/yugabyte/sample/Main.java index e9c17bd..22dbb98 100644 --- a/src/main/java/com/yugabyte/sample/Main.java +++ b/src/main/java/com/yugabyte/sample/Main.java @@ -39,7 +39,7 @@ * |__ Main.java * |__ apps/ * |__ AppBase.java : Base class for all the apps. Has helper methods for creating - * Cassandra and Redis clients. + * YSQL, Cassandra and Redis clients. * |__ AppConfig.java : Configuration for all the apps. * |__ CassandraHelloWorld.java : The simplest app that writes one employee record. Good * starting point to understand how to write a Cassandra app. @@ -51,6 +51,7 @@ * |__ RedisPipelinedKeyValue.java : Similar to RedisKeyValue but uses pipelined mode. * |__ RedisHashPipelined : Similar to RedisPipelinedKeyValue. Uses HMSET/HMGET instead. * |__ RedisYBClientKeyValue.java : Similar to RedisKeyValue but uses YBJedis client. + * |__ SqlStaleReadDetector.java : Simple SQL app that detects any stale reads. * * * Usage @@ -176,14 +177,21 @@ public void run() throws Exception { } // Create the reader and writer threads. + if (AppBase.appConfig.concurrencyDisabled) { + LOG.info("Concurrency is disabled. Executing threads in lock step."); + } int idx = 0; for (; idx < cmdLineOpts.getNumWriterThreads(); idx++) { iopsThreads.add(new IOPSThread(idx, cmdLineOpts.createAppInstance(), - IOType.Write, app.appConfig.printAllExceptions)); + IOType.Write, app.appConfig.printAllExceptions, + app.appConfig.concurrencyDisabled, + app.appConfig.maxWriteThreadThroughput)); } for (; idx < cmdLineOpts.getNumWriterThreads() + cmdLineOpts.getNumReaderThreads(); idx++) { iopsThreads.add(new IOPSThread(idx, cmdLineOpts.createAppInstance(), - IOType.Read, app.appConfig.printAllExceptions)); + IOType.Read, app.appConfig.printAllExceptions, + app.appConfig.concurrencyDisabled, + app.appConfig.maxReadThreadThroughput)); } app.recordExistingRowCount(); @@ -222,7 +230,9 @@ private void setupForPureReads() { LOG.info("Using " + num_writers + " writer threads for pure read setup."); for (int idx = 0; idx < num_writers; idx++) { writeThreads.add(new IOPSThread(idx, cmdLineOpts.createAppInstance(false), - IOType.Write, app.appConfig.printAllExceptions)); + IOType.Write, app.appConfig.printAllExceptions, + app.appConfig.concurrencyDisabled, + app.appConfig.maxWriteThreadThroughput)); } // Start the reader and writer threads. for (IOPSThread writeThread : writeThreads) { diff --git a/src/main/java/com/yugabyte/sample/apps/AppBase.java b/src/main/java/com/yugabyte/sample/apps/AppBase.java index 02b4e9f..014c8ea 100644 --- a/src/main/java/com/yugabyte/sample/apps/AppBase.java +++ b/src/main/java/com/yugabyte/sample/apps/AppBase.java @@ -14,6 +14,7 @@ package com.yugabyte.sample.apps; import java.io.FileInputStream; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -39,6 +40,8 @@ import javax.net.ssl.TrustManagerFactory; import com.yugabyte.sample.common.metrics.Observation; +import com.yugabyte.sample.common.metrics.PromMetrics; + import org.apache.log4j.Logger; import com.datastax.driver.core.Cluster; @@ -75,7 +78,7 @@ /** * Abstract base class for all apps. This class does the following: - * - Provides various helper methods including methods for creating Redis and Cassandra clients. + * - Provides various helper methods including methods for creating YSQL, YCQL and Redis clients. * - Has a metrics tracker object, and internally tracks reads and writes. * - Has the abstract methods that are implemented by the various apps. */ @@ -127,6 +130,9 @@ public abstract class AppBase implements MetricsTracker.StatusMessageAppender { // YCQL keyspace name. public static String keyspace = "ybdemo_keyspace"; + // Prometheus metrics. + private static PromMetrics promMetrics; + public enum TableOp { NoOp, DropTable, @@ -643,6 +649,12 @@ public void run() {} */ @Override public void appendMessage(StringBuilder sb) { + if (promMetrics != null) { + sb.append( + "Num restart read requests: " + + promMetrics.getCounter("restart_read_requests", getTableName()) + + " | "); + } sb.append("Uptime: " + (System.currentTimeMillis() - workloadStartTime) + " ms | "); } @@ -707,6 +719,16 @@ private synchronized void initMetricsTracker() { metricsTracker.createMetric(MetricName.Write); metricsTracker.registerStatusMessageAppender(this); metricsTracker.start(); + + if (appConfig.restartReadsReported) { + LOG.info("Reporting restart read requests."); + try { + promMetrics = new PromMetrics(getNodesAsInet()); + } catch (IOException e) { + LOG.error("Failed to create prometheus metrics tracker with exception: ", e); + promMetrics = null; + } + } } } } diff --git a/src/main/java/com/yugabyte/sample/apps/AppConfig.java b/src/main/java/com/yugabyte/sample/apps/AppConfig.java index 04fc430..c70eb58 100644 --- a/src/main/java/com/yugabyte/sample/apps/AppConfig.java +++ b/src/main/java/com/yugabyte/sample/apps/AppConfig.java @@ -214,4 +214,16 @@ public static enum Type { // Replication factor to be used for the SqlGeoPartitionedTable workload. public int replicationFactor; + + // Throttle number of reads per second per thread. + public double maxReadThreadThroughput = -1; + + // Throttle number of writes per second per thread. + public double maxWriteThreadThroughput = -1; + + // Disable concurrent execution, i.e. exec only a single operation at a time. + public boolean concurrencyDisabled = false; + + // Flag to report restart read requests. + public boolean restartReadsReported = false; } diff --git a/src/main/java/com/yugabyte/sample/apps/SqlStaleReadDetector.java b/src/main/java/com/yugabyte/sample/apps/SqlStaleReadDetector.java new file mode 100644 index 0000000..d998496 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlStaleReadDetector.java @@ -0,0 +1,252 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; + +/** + * This workload writes and reads some random string keys from a postgresql table. + */ +public class SqlStaleReadDetector extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlStaleReadDetector.class); + + // Static initialization of this workload's config. These are good defaults for getting a decent + // read dominated workload on a reasonably powered machine. Exact IOPS will of course vary + // depending on the machine and what resources it has to spare. + static { + // Drop the table and create a new one. + appConfig.tableOp = TableOp.DropTable; + // Disable the read-write percentage. + appConfig.readIOPSPercentage = -1; + // Set the read and write threads to 1 each. + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 1; + // Disable num keys to read. + appConfig.numKeysToRead = -1; + // Disable num keys to write. + appConfig.numKeysToWrite = -1; + // Disable num unique keys to write. + appConfig.numUniqueKeysToWrite = -1; + // INSERT maxWrittenKey rows into the table before running the app. + appConfig.maxWrittenKey = NUM_UNIQUE_KEYS; + // Run the sum every 1 second. + appConfig.maxReadThreadThroughput = 1; + // Increment 100 counters each second. + appConfig.maxWriteThreadThroughput = 100; + // Run the app for 5 minutes. + appConfig.runTimeSeconds = 300; + // Do not use lock step by default. + appConfig.concurrencyDisabled = false; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Disable YB load balancing to enforce round robin. + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for CRUD ops. + private static final String DEFAULT_TABLE_NAME = "PostgresqlKeyValue"; + + // The shared prepared select statement for fetching the data. + private volatile PreparedStatement preparedSelect = null; + + // The shared prepared insert statement for inserting the data. + private volatile PreparedStatement preparedUpdate = null; + + // Store the number of times the counters were incremented to + // detect stale reads. + // This is a shared counter between the reader and writer threads. + private static AtomicLong numIncrements = new AtomicLong(0); + + // Shared counter to store the number of stale reads. + private static AtomicLong numStaleReads = new AtomicLong(0); + + public SqlStaleReadDetector() {} + + /** + * Drop the table created by this app. + */ + @Override + public void dropTable() throws Exception { + try (Connection connection = getPostgresConnection()) { + connection.createStatement().execute("DROP TABLE IF EXISTS " + getTableName()); + LOG.info(String.format("Dropped table: %s", getTableName())); + } + } + + /* + * Create a key, value table, both integer data type. + * Insert (a million by default) rows initialized zero. + */ + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + if (appConfig.tableOp != TableOp.DropTable && appConfig.tableOp != TableOp.TruncateTable) { + throw new IllegalArgumentException("tableOp should be set to DropTable or TruncateTable."); + } + if (appConfig.numUniqueKeysToWrite != -1) { + throw new IllegalArgumentException("numUniqueKeysToWrite should not be set."); + } + try (Connection connection = getPostgresConnection()) { + // (Re)Create the table (every run should start cleanly with an empty table). + if (tableOp.equals(TableOp.DropTable)) { + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + } + connection.createStatement().execute(String.format( + "CREATE TABLE IF NOT EXISTS %s (k INT PRIMARY KEY, v INT) SPLIT INTO 24 TABLETS", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + if (tableOp.equals(TableOp.TruncateTable)) { + connection.createStatement().execute( + String.format("TRUNCATE TABLE %s", getTableName())); + LOG.info(String.format("Truncated table: %s", getTableName())); + } + // INSERT 1 to maxWrittenKey, zero initialized. + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(1, %d), 0", + getTableName(), appConfig.maxWrittenKey)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + } + + public String getTableName() { + String tableName = appConfig.tableName != null ? appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + private PreparedStatement getPreparedSelect() throws Exception { + if (preparedSelect == null) { + preparedSelect = getPostgresConnection().prepareStatement( + String.format("SELECT SUM(v) FROM %s;", getTableName())); + } + return preparedSelect; + } + + /* + * Reads sum all values in the table. + * Checks for stale reads by + * 1. Reading the number of increments so far. + * 2. Summing across all the values in the table. + * The sum must be at least the number of increments. + * Otherwise, the database definitely suffers from stale reads. + * + * The stale read behavior is prominent when + * 1. We use lock step. + * appConfig.concurrencyDisabled=true + * 2. There is real clock skew. + * 3. We disable clock skew related checks. + * T-Server GFlags + * a. max_clock_skew_usec=0 + * b. fail_on_out_of_range_clock_skew=false + * c. clock_skew_force_crash_bound_usec=0 + * + * Records stale reads as a metric counter. + */ + @Override + public long doRead() { + try { + long currentCounter = numIncrements.get(); + PreparedStatement statement = getPreparedSelect(); + try (ResultSet rs = statement.executeQuery()) { + if (!rs.next()) { + LOG.error("Failed to read!", new IllegalStateException("No rows returned!")); + return 0; + } + + Long sum = rs.getLong(1); + if (sum < currentCounter) { + LOG.error(String.format( + "Stale read detected! Expected current counter atmost sum = %d, got %d", + currentCounter, sum)); + numStaleReads.incrementAndGet(); + } + } + } catch (Exception e) { + LOG.error("Failed read!", e); + preparedSelect = null; + return 0; + } + return 1; + } + + private PreparedStatement getPreparedUpdate() throws Exception { + if (preparedUpdate == null) { + preparedUpdate = getPostgresConnection().prepareStatement( + String.format("UPDATE %s SET v=v+1 WHERE k=?;", getTableName())); + } + return preparedUpdate; + } + + /* + * Picks a random key and then increment it. + * Additionally, increments the shared counter. + */ + @Override + public long doWrite(int threadIdx) { + long key = getSimpleLoadGenerator().getKeyToRead().asNumber() + 1; + + int result = 0; + try { + PreparedStatement statement = getPreparedUpdate(); + statement.setLong(1, key); + result = statement.executeUpdate(); + numIncrements.addAndGet(result); + } catch (Exception e) { + LOG.error(String.format("Failed incrementing key: %d", key), e); + preparedUpdate = null; + } + return result; + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total ops | "); + super.appendMessage(sb); + } + + @Override + public List getWorkloadDescription() { + return Arrays.asList( + "Sample key-value app built on PostgreSQL with one reader and one writer.", + " Initially `numUniqueKeys` rows are inserts with value initialized to zero.", + " The writer thread then increments the value of a random key.", + " The reader thread sums up the values across all the keys.", + " The reader thread also verifies whether the sum is equal to the number of increments.", + " This check will help us detect any stale reads.", + " The reader and writer threads are executed sequentially for the check.", + " Optionally use the maxWrittenKey config to set the number of rows to insert", + " before running the app. Default: " + String.valueOf(NUM_UNIQUE_KEYS) + "." + ); + } + + @Override + public List getWorkloadOptionalArguments() { + return Arrays.asList( + "--read_rate " + appConfig.maxReadThreadThroughput, + "--write_rate " + appConfig.maxWriteThreadThroughput, + "--lock_step", + "--maxWrittenKey " + appConfig.maxWrittenKey + ); + } +} diff --git a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java index 509b227..58c9b79 100644 --- a/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java +++ b/src/main/java/com/yugabyte/sample/common/CmdLineOpts.java @@ -73,7 +73,8 @@ public class CmdLineOpts { SqlInserts.class, SqlSecondaryIndex.class, SqlSnapshotTxns.class, - SqlUpdates.class + SqlUpdates.class, + SqlStaleReadDetector.class ); // The class type of the app needed to spawn new objects. @@ -148,6 +149,24 @@ public void initialize(CommandLine commandLine) throws ClassNotFoundException { } } + if (commandLine.hasOption("read_rate")) { + AppBase.appConfig.maxReadThreadThroughput = + Double.parseDouble(commandLine.getOptionValue("read_rate")); + } + + if (commandLine.hasOption("write_rate")) { + AppBase.appConfig.maxWriteThreadThroughput = + Double.parseDouble(commandLine.getOptionValue("write_rate")); + } + + if (commandLine.hasOption("lock_step")) { + AppBase.appConfig.concurrencyDisabled = true; + } + + if (commandLine.hasOption("report_read_restarts")) { + AppBase.appConfig.restartReadsReported = true; + } + // Set the number of threads. initializeThreadCount(commandLine); // Initialize num keys. @@ -919,6 +938,18 @@ public static CmdLineOpts createFromArgs(String[] args) throws Exception { "tablespace for each partition. This option should not be used along " + "with --tablespaces"); + options.addOption("read_rate", true, + "Throttle read throughput."); + + options.addOption("write_rate", true, + "Throttle write throughput."); + + options.addOption("lock_step", false, + "Execute read and write threads in lock step."); + + options.addOption("report_read_restarts", false, + "Report restart read requests."); + // First check if a "--help" argument is passed with a simple parser. Note that if we add // required args, then the help string would not work. See: // https://stackoverflow.com/questions/36720946/apache-cli-required-options-contradicts-with-help-option diff --git a/src/main/java/com/yugabyte/sample/common/IOPSThread.java b/src/main/java/com/yugabyte/sample/common/IOPSThread.java index 00f45f8..91f40c4 100644 --- a/src/main/java/com/yugabyte/sample/common/IOPSThread.java +++ b/src/main/java/com/yugabyte/sample/common/IOPSThread.java @@ -13,6 +13,8 @@ package com.yugabyte.sample.common; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.log4j.Logger; import com.yugabyte.sample.apps.AppBase; @@ -47,11 +49,29 @@ public static enum IOType { private final boolean printAllExceptions; - public IOPSThread(int threadIdx, AppBase app, IOType ioType, boolean printAllExceptions) { + // Flag to disable concurrency, i.e. execute ops in lock step. + private final boolean concurrencyDisabled; + + // Lock to disable concurrency. + private static final ReentrantLock lock = new ReentrantLock(); + + // Throttle reads or writes in this thread. + private final Throttler throttler; + + public IOPSThread(int threadIdx, AppBase app, IOType ioType, boolean printAllExceptions, + boolean concurrencyDisabled, double maxThroughput) { this.threadIdx = threadIdx; this.app = app; this.ioType = ioType; this.printAllExceptions = printAllExceptions; + this.concurrencyDisabled = concurrencyDisabled; + if (maxThroughput > 0) { + LOG.info("Throttling " + (ioType == IOType.Read ? "read" : "write") + + " ops to " + maxThroughput + " ops/sec."); + this.throttler = new Throttler(maxThroughput); + } else { + this.throttler = null; + } } public int getNumExceptions() { @@ -82,7 +102,14 @@ public void run() { LOG.debug("Starting " + ioType.toString() + " IOPS thread #" + threadIdx); int numConsecutiveExceptions = 0; while (!app.hasFinished()) { + if (AppBase.appConfig.concurrencyDisabled) { + // Wait for the previous thread to execute its step. + lock.lock(); + } try { + if (throttler != null) { + throttler.traceOp(); + } switch (ioType) { case Write: app.performWrite(threadIdx); break; case Read: app.performRead(); break; @@ -113,6 +140,15 @@ public void run() { ioThreadFailed = true; return; } + } finally { + if (AppBase.appConfig.concurrencyDisabled) { + // Signal the next thread in the wait queue to resume. + lock.unlock(); + } + if (throttler != null) { + // Sleep only after releasing the lock above. + throttler.throttleOp(); + } } } } finally { diff --git a/src/main/java/com/yugabyte/sample/common/Throttler.java b/src/main/java/com/yugabyte/sample/common/Throttler.java new file mode 100644 index 0000000..126419d --- /dev/null +++ b/src/main/java/com/yugabyte/sample/common/Throttler.java @@ -0,0 +1,35 @@ +package com.yugabyte.sample.common; + +import org.apache.commons.math3.distribution.PoissonDistribution; + +// Throttles the IO operations to a certain throughput. +// The wait time is sampled from a Poisson distribution to introduce +// randomness and prevent every thread from resuming at the same time. +public class Throttler { + private final PoissonDistribution poissonDistribution; + private long startTime; + + public Throttler(double maxThroughput) { + double throttleDelay = 1000.0 / maxThroughput; + this.poissonDistribution = new PoissonDistribution(throttleDelay); + } + + // Begin throttling an operation. + public void traceOp() { + startTime = System.currentTimeMillis(); + } + + // Operation done. Wait until the next operation can start. + public void throttleOp() { + long opDelay = poissonDistribution.sample(); + long endTime = System.currentTimeMillis(); + long waitTime = opDelay - (endTime - startTime); + if (waitTime > 0) { + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java new file mode 100644 index 0000000..40e9ca4 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java @@ -0,0 +1,138 @@ +package com.yugabyte.sample.common.metrics; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.security.cert.X509Certificate; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.apache.log4j.Logger; + +public class PromMetrics { + private final List promContactPoints; + + private static final Logger LOG = Logger.getLogger(PromMetrics.class); + + /* + * Initializes with T-Server nodes to be contacted for the metrics. + */ + public PromMetrics(List nodes) throws IOException { + promContactPoints = new ArrayList<>(); + for (InetSocketAddress node : nodes) { + promContactPoints.add(String.format( + "https://%s:9000/prometheus-metrics", node.getHostString())); + } + disableSSLVerification(); + } + + /* + * Disable SSL since prometheus-metrics are not exposed with a valid + * certificate. + * + * TODO: Figure out how to do this in a more secure way. + */ + public static void disableSSLVerification() throws IOException { + try { + TrustManager[] trustAllCerts = new TrustManager[]{ + new X509TrustManager() { + public X509Certificate[] getAcceptedIssuers() { + return null; + } + public void checkClientTrusted(X509Certificate[] certs, String authType) { + } + public void checkServerTrusted(X509Certificate[] certs, String authType) { + } + } + }; + + SSLContext sc = SSLContext.getInstance("TLS"); + sc.init(null, trustAllCerts, new java.security.SecureRandom()); + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); + + HostnameVerifier allHostsValid = (hostname, session) -> true; + HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); + } catch (Exception e) { + throw new IOException("Failed to disable SSL verification", e); + } + } + + /* + * Fetches the counter for the given metric and table name + * and accumulates across all the T-Servers. + */ + public long getCounter(String metricName, String tableName) { + long counter = 0; + for (String promContactPoint : promContactPoints) { + long fetchedCounter = fetchPromCounter(metricName, tableName, promContactPoint); + if (fetchedCounter > 0) { + counter += fetchedCounter; + } + } + + return counter; + } + + /* + * Fetches the metric counter for one T-Server. + */ + private long fetchPromCounter(String metricName, String tableName, String promContactPoint) { + HttpURLConnection connection = null; + + try { + URL url = new URL(promContactPoint); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + BufferedReader reader = new BufferedReader( + new InputStreamReader(connection.getInputStream())); + StringBuilder response = new StringBuilder(); + String line; + + while ((line = reader.readLine()) != null) { + response.append(line).append("\n"); + } + reader.close(); + + Pattern pattern = Pattern.compile( + "^" + metricName + "\\{[^}]*table_name=\"" + tableName + + "\"[^}]*\\}\\s+(\\d+)\\s+(\\d+)", Pattern.MULTILINE); + Matcher matcher = pattern.matcher(response.toString()); + + if (matcher.find()) { + long counter = Long.parseLong(matcher.group(1)); + return counter; + } + + LOG.error("Failed to find metric " + metricName + " for table " + tableName); + } else if (responseCode == HttpURLConnection.HTTP_MOVED_TEMP || responseCode == HttpURLConnection.HTTP_MOVED_PERM) { + String newUrl = connection.getHeaderField("Location"); + LOG.info("Redirecting to " + newUrl); + return fetchPromCounter(metricName, tableName, newUrl); + } else { + LOG.error("Failed to fetch metrics: HTTP response code " + responseCode); + } + } catch(IOException e) { + LOG.error("Failed to fetch metrics", e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + + return -1; + } +}