clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
+ _numBrokers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
+ .count());
+ _numServers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
+ .count());
+
+ if (_maxConcurrentQueries > 0) {
+ _semaphore = new AdjustableSemaphore(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers), true);
+ }
+ }
+
+ /**
+ * Returns true if the query can be executed (waiting until it can be executed if necessary), false otherwise.
+ *
+ * {@link #release()} should be called after the query is done executing. It is the responsibility of the caller to
+ * ensure that {@link #release()} is called exactly once for each call to this method.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public boolean tryAcquire(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (_maxConcurrentQueries <= 0) {
+ return true;
+ }
+ return _semaphore.tryAcquire(timeout, unit);
+ }
+
+ /**
+ * Should be called after the query is done executing. It is the responsibility of the caller to ensure that this
+ * method is called exactly once for each call to {@link #tryAcquire(long, TimeUnit)}.
+ */
+ public void release() {
+ if (_maxConcurrentQueries > 0) {
+ _semaphore.release();
+ }
+ }
+
+ @Override
+ public void processClusterChange(HelixConstants.ChangeType changeType) {
+ Preconditions.checkArgument(
+ changeType == HelixConstants.ChangeType.EXTERNAL_VIEW || changeType == HelixConstants.ChangeType.CLUSTER_CONFIG,
+ "MultiStageQuerySemaphore can only handle EXTERNAL_VIEW and CLUSTER_CONFIG changes");
+
+ if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
+ List clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
+ int numBrokers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
+ .count());
+ int numServers = Math.max(1, (int) clusterInstances.stream()
+ .filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
+ .count());
+
+ if (numBrokers != _numBrokers || numServers != _numServers) {
+ _numBrokers = numBrokers;
+ _numServers = numServers;
+ if (_maxConcurrentQueries > 0) {
+ _semaphore.setPermits(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers));
+ }
+ }
+ } else {
+ int maxConcurrentQueries = Integer.parseInt(
+ _helixAdmin.getConfig(_helixConfigScope,
+ Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
+ .getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
+ CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
+
+ if (_maxConcurrentQueries == maxConcurrentQueries) {
+ return;
+ }
+
+ if (_maxConcurrentQueries <= 0 && maxConcurrentQueries > 0
+ || _maxConcurrentQueries > 0 && maxConcurrentQueries <= 0) {
+ // This operation isn't safe to do while queries are running so we require a restart of the broker for this
+ // change to take effect.
+ LOGGER.warn("Enabling or disabling limitation of the maximum number of multi-stage queries running "
+ + "concurrently requires a restart of the broker to take effect");
+ return;
+ }
+
+ if (maxConcurrentQueries > 0) {
+ _semaphore.setPermits(Math.max(1, maxConcurrentQueries * _numServers / _numBrokers));
+ }
+ _maxConcurrentQueries = maxConcurrentQueries;
+ }
+ }
+
+ @VisibleForTesting
+ int availablePermits() {
+ return _semaphore.availablePermits();
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 52cf63f562e0..d14f2860138a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -53,6 +53,7 @@
import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
import org.apache.pinot.tsdb.spi.RangeTimeSeriesRequest;
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +71,7 @@ public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, Brok
_queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, tableCache);
_queryEnvironment.init(config);
_queryDispatcher = queryDispatcher;
+ TimeSeriesBuilderFactoryProvider.init(config);
}
@Override
@@ -117,6 +119,10 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String
if (timeSeriesResponse == null
|| timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS)) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED, 1);
+ final String errorMessage = timeSeriesResponse == null ? "null time-series response"
+ : timeSeriesResponse.getError();
+ // TODO(timeseries): Remove logging for failed queries.
+ LOGGER.warn("time-series query failed with error: {}", errorMessage);
}
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index 59aa65406da8..b4ed517192a4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -93,10 +93,11 @@ private Interval extractIntervalFromSegmentZKMetaZNRecord(String segment, @Nulla
return DEFAULT_INTERVAL;
}
+ // Validate time interval
long startTime = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
if (startTime < 0 || endTime < 0 || startTime > endTime) {
- LOGGER.warn("Failed to find valid time interval for segment: {}, table: {}", segment, _tableNameWithType);
+ // Consuming and committing segments don't have time interval
return DEFAULT_INTERVAL;
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java
index 2cab3985d5ea..89cd94f68506 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.java
@@ -26,6 +26,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -135,20 +136,16 @@ public void testGet()
}
@Test(expectedExceptions = ServiceUnavailableException.class)
- public void testRejectHandler()
- throws InterruptedException {
+ public void testRejectHandler() {
BrokerManagedAsyncExecutorProvider provider = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics);
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) provider.getExecutorService();
+ ExecutorService threadPoolExecutor = provider.getExecutorService();
// test the rejection policy
- AtomicInteger counter = new AtomicInteger();
- CountDownLatch latch = new CountDownLatch(10);
- for (int i = 0; i < 10; i++) {
- threadPoolExecutor.execute(() -> {
- counter.incrementAndGet();
- latch.countDown();
- });
+ int taskCount = 3;
+ Phaser phaser = new Phaser(taskCount);
+ for (int i = 0; i < taskCount; i++) {
+ threadPoolExecutor.execute(phaser::arriveAndAwaitAdvance);
}
- latch.await();
+ phaser.arriveAndDeregister();
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
new file mode 100644
index 000000000000..fe2a5a124006
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.requesthandler;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+
+public class MultiStageQueryThrottlerTest {
+
+ private AutoCloseable _mocks;
+ @Mock
+ private HelixManager _helixManager;
+ @Mock
+ private HelixAdmin _helixAdmin;
+ private MultiStageQueryThrottler _multiStageQueryThrottler;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ when(_helixManager.getClusterManagmentTool()).thenReturn(_helixAdmin);
+ when(_helixManager.getClusterName()).thenReturn("testCluster");
+ when(_helixAdmin.getConfig(any(), any())).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4"));
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Server_0", "Server_1"));
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testBasicAcquireRelease()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
+ _multiStageQueryThrottler.release();
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+ }
+
+ @Test
+ public void testAcquireTimeout()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDisabledThrottling()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(), any())).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ // If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
+ // succeed
+ for (int i = 0; i < 100; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testIncreaseNumBrokers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Increase the number of brokers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3", "Server_0", "Server_1"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Verify that the number of permits on this broker have been reduced to account for the new brokers
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+
+ for (int i = 0; i < 4; i++) {
+ _multiStageQueryThrottler.release();
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDecreaseNumBrokers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Decrease the number of brokers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(List.of("Broker_0", "Server_0", "Server_1"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Ensure that the permits from the removed broker are added to this one.
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
+ }
+
+ @Test
+ public void testIncreaseNumServers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Increase the number of servers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Server_0", "Server_1", "Server_2"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Ensure that the permits on this broker are increased to account for the new server
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
+ }
+
+ @Test
+ public void testDecreaseNumServers()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Decrease the number of servers
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(List.of("Broker_0", "Broker_1", "Server_0"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
+
+ // Verify that the number of permits on this broker have been reduced to account for the removed server
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+
+ for (int i = 0; i < 4; i++) {
+ _multiStageQueryThrottler.release();
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testIncreaseMaxConcurrentQueries()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Increase the value of cluster config maxConcurrentQueries
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))))
+ .thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "8"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDecreaseMaxConcurrentQueries()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+
+ // Decrease the value of cluster config maxConcurrentQueries
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "3"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -1);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+
+ for (int i = 0; i < 4; i++) {
+ _multiStageQueryThrottler.release();
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testEnabledToDisabledTransitionDisallowed()
+ throws Exception {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+
+ // Disable the throttling mechanism via cluster config change
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ // Should not be allowed to disable the throttling mechanism if it is enabled during startup
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+
+ for (int i = 0; i < 4; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testDisabledToEnabledTransitionDisallowed()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ // If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
+ // succeed
+ for (int i = 0; i < 100; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+
+ // Enable the throttling mechanism via cluster config change
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4"));
+ _multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
+
+ // Should not be allowed to enable the throttling mechanism if it is disabled during startup
+ for (int i = 0; i < 100; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+ }
+
+ @Test
+ public void testMaxConcurrentQueriesSmallerThanNumBrokers()
+ throws Exception {
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2"));
+ when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
+ List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3", "Server_0", "Server_1"));
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ // The total permits should be capped at 1 even though maxConcurrentQueries * numServers / numBrokers is 0.
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/pinot-clients/pinot-java-client/pom.xml b/pinot-clients/pinot-java-client/pom.xml
index 4678af3e4f5e..72f0d1932e15 100644
--- a/pinot-clients/pinot-java-client/pom.xml
+++ b/pinot-clients/pinot-java-client/pom.xml
@@ -24,7 +24,7 @@
pinot-clients
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-java-client
Pinot Java Client
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
index 3b2a789eac02..c2e1b98caf1a 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
@@ -190,20 +190,14 @@ protected void updateBrokerData()
}
public String getBroker(String... tableNames) {
- List brokers = null;
// If tableNames is not-null, filter out nulls
- tableNames =
- tableNames == null ? tableNames : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
- if (!(tableNames == null || tableNames.length == 0)) {
- // returning list of common brokers hosting all the tables.
- brokers = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
- _brokerData.getTableToBrokerMap());
+ tableNames = tableNames == null ? tableNames
+ : Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
+ if (tableNames == null || tableNames.length == 0) {
+ List brokers = _brokerData.getBrokers();
+ return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}
-
- if (brokers == null || brokers.isEmpty()) {
- brokers = _brokerData.getBrokers();
- }
- return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
+ return BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), _brokerData.getTableToBrokerMap());
}
public List getBrokers() {
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index 6683b6a5fc60..498a68ce0be4 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -91,10 +91,10 @@ private void refresh() {
public String selectBroker(String... tableNames) {
if (!(tableNames == null || tableNames.length == 0 || tableNames[0] == null)) {
// getting list of brokers hosting all the tables.
- List list = BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
+ String randomBroker = BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames),
_tableToBrokerListMapRef.get());
- if (list != null && !list.isEmpty()) {
- return list.get(ThreadLocalRandom.current().nextInt(list.size()));
+ if (randomBroker != null) {
+ return randomBroker;
}
}
diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
index e3a1df44db7b..c465f101aa08 100644
--- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
+++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
@@ -19,9 +19,13 @@
package org.apache.pinot.client.utils;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
import org.apache.pinot.client.ExternalViewReader;
@@ -34,35 +38,52 @@ private BrokerSelectorUtils() {
*
* @param tableNames: List of table names.
* @param brokerData: map holding data for table hosting on brokers.
- * @return list of common brokers hosting all the tables.
+ * @return list of common brokers hosting all the tables or null if no common brokers found.
+ * @deprecated Use {@link #getTablesCommonBrokersSet(List, Map)} instead. It is more efficient and its semantics are
+ * clearer (ie it returns an empty set instead of null if no common brokers are found).
*/
- public static List getTablesCommonBrokers(List tableNames, Map> brokerData) {
- List> tablesBrokersList = new ArrayList<>();
- for (String name: tableNames) {
- String tableName = getTableNameWithoutSuffix(name);
- int idx = tableName.indexOf('.');
-
- if (brokerData.containsKey(tableName)) {
- tablesBrokersList.add(brokerData.get(tableName));
- } else if (idx > 0) {
- // In case tableName is formatted as .
- tableName = tableName.substring(idx + 1);
- tablesBrokersList.add(brokerData.get(tableName));
- }
+ @Nullable
+ @Deprecated
+ public static List getTablesCommonBrokers(@Nullable List tableNames,
+ Map> brokerData) {
+ Set tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
+ if (tablesCommonBrokersSet == null || tablesCommonBrokersSet.isEmpty()) {
+ return null;
}
+ return new ArrayList<>(tablesCommonBrokersSet);
+ }
- // return null if tablesBrokersList is empty or contains null
- if (tablesBrokersList.isEmpty()
- || tablesBrokersList.stream().anyMatch(Objects::isNull)) {
+ /**
+ * Returns a random broker from the common brokers hosting all the tables.
+ */
+ @Nullable
+ public static String getRandomBroker(@Nullable List tableNames, Map> brokerData) {
+ Set tablesCommonBrokersSet = getTablesCommonBrokersSet(tableNames, brokerData);
+ if (tablesCommonBrokersSet.isEmpty()) {
return null;
}
+ return tablesCommonBrokersSet.stream()
+ .skip(ThreadLocalRandom.current().nextInt(tablesCommonBrokersSet.size()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No broker found"));
+ }
- // Make a copy of the brokersList of the first table. retainAll does inplace modifications.
- // So lists from brokerData should not be used directly.
- List commonBrokers = new ArrayList<>(tablesBrokersList.get(0));
- for (int i = 1; i < tablesBrokersList.size(); i++) {
- commonBrokers.retainAll(tablesBrokersList.get(i));
+ /**
+ *
+ * @param tableNames: List of table names.
+ * @param brokerData: map holding data for table hosting on brokers.
+ * @return set of common brokers hosting all the tables
+ */
+ public static Set getTablesCommonBrokersSet(
+ @Nullable List tableNames, Map> brokerData) {
+ if (tableNames == null || tableNames.isEmpty()) {
+ return Collections.emptySet();
+ }
+ HashSet commonBrokers = getBrokers(tableNames.get(0), brokerData);
+ for (int i = 1; i < tableNames.size() && !commonBrokers.isEmpty(); i++) {
+ commonBrokers.retainAll(getBrokers(tableNames.get(i), brokerData));
}
+
return commonBrokers;
}
@@ -71,4 +92,28 @@ private static String getTableNameWithoutSuffix(String tableName) {
tableName.replace(ExternalViewReader.OFFLINE_SUFFIX, "").
replace(ExternalViewReader.REALTIME_SUFFIX, "");
}
+
+ /**
+ * Returns the brokers for the given table name.
+ *
+ * This means that an empty set is returned if there are no brokers for the given table name.
+ */
+ private static HashSet getBrokers(String tableName, Map> brokerData) {
+ String tableNameWithoutSuffix = getTableNameWithoutSuffix(tableName);
+ int idx = tableNameWithoutSuffix.indexOf('.');
+
+ List brokers = brokerData.get(tableNameWithoutSuffix);
+ if (brokers != null) {
+ return new HashSet<>(brokers);
+ } else if (idx > 0) {
+ // TODO: This is probably unnecessary and even wrong. `brokerData` should include the fully qualified name.
+ // In case tableNameWithoutSuffix is formatted as . and not found in the fully qualified name
+ tableNameWithoutSuffix = tableNameWithoutSuffix.substring(idx + 1);
+ List brokersWithoutDb = brokerData.get(tableNameWithoutSuffix);
+ if (brokersWithoutDb != null) {
+ return new HashSet<>(brokersWithoutDb);
+ }
+ }
+ return new HashSet<>();
+ }
}
diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
index d52438ab542c..986b4773c7c2 100644
--- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
+++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/DynamicBrokerSelectorTest.java
@@ -152,4 +152,24 @@ public void testCloseZkClient() {
Mockito.verify(_mockZkClient, times(1)).close();
}
+
+ @Test
+ public void testSelectBrokerWithInvalidTable() {
+ Map> tableToBrokerListMap = new HashMap<>();
+ tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
+ when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
+ _dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
+ String result = _dynamicBrokerSelectorUnderTest.selectBroker("invalidTable");
+ assertEquals(result, "broker1");
+ }
+
+ @Test
+ public void testSelectBrokerWithTwoTablesOneInvalid() {
+ Map> tableToBrokerListMap = new HashMap<>();
+ tableToBrokerListMap.put("table1", Collections.singletonList("broker1"));
+ when(_mockExternalViewReader.getTableToBrokersMap()).thenReturn(tableToBrokerListMap);
+ _dynamicBrokerSelectorUnderTest.handleDataChange("dataPath", "data");
+ String result = _dynamicBrokerSelectorUnderTest.selectBroker("table1", "invalidTable");
+ assertEquals(result, "broker1");
+ }
}
diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java
new file mode 100644
index 000000000000..512a0a3c862a
--- /dev/null
+++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/utils/BrokerSelectorUtilsTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+
+public class BrokerSelectorUtilsTest {
+
+ HashMap> _brokerData = new HashMap<>();
+ @Test
+ public void getTablesCommonBrokersSetNullTables() {
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(null, _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListNullTables() {
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(null, _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetEmptyTables() {
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of(), _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListEmptyTables() {
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of(), _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetNotExistentTable() {
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("notExistent"), _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListNotExistentTable() {
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("notExistent"), _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetOneTable() {
+ _brokerData.put("table1", List.of("broker1"));
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1"), _brokerData);
+ Assert.assertEquals(tableSet, Set.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersListOneTable() {
+ _brokerData.put("table1", List.of("broker1"));
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1"), _brokerData);
+ Assert.assertNotNull(tableList);
+ Assert.assertEquals(tableList, List.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetTwoTables() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker1"));
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
+ Assert.assertNotNull(tableSet);
+ Assert.assertEquals(tableSet, Set.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersListTwoTables() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker1"));
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
+ Assert.assertNotNull(tableList);
+ Assert.assertEquals(tableList, List.of("broker1"));
+ }
+
+ @Test
+ public void getTablesCommonBrokersSetTwoTablesDifferentBrokers() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker2"));
+ Set tableSet = BrokerSelectorUtils.getTablesCommonBrokersSet(List.of("table1", "table2"), _brokerData);
+ Assert.assertEquals(tableSet, Set.of());
+ }
+
+ @Test
+ public void getTablesCommonBrokersListTwoTablesDifferentBrokers() {
+ _brokerData.put("table1", List.of("broker1"));
+ _brokerData.put("table2", List.of("broker2"));
+ List tableList = BrokerSelectorUtils.getTablesCommonBrokers(List.of("table1", "table2"), _brokerData);
+ Assert.assertNull(tableList);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ _brokerData.clear();
+ }
+}
diff --git a/pinot-clients/pinot-jdbc-client/pom.xml b/pinot-clients/pinot-jdbc-client/pom.xml
index 4dbc070ff367..210f8fc8e8b1 100644
--- a/pinot-clients/pinot-jdbc-client/pom.xml
+++ b/pinot-clients/pinot-jdbc-client/pom.xml
@@ -24,7 +24,7 @@
pinot-clients
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-jdbc-client
Pinot JDBC Client
diff --git a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java
index 3ca537b518fe..7e9b4df15233 100644
--- a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java
+++ b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/utils/DateTimeUtils.java
@@ -32,48 +32,49 @@ private DateTimeUtils() {
private static final String TIMESTAMP_FORMAT_STR = "yyyy-MM-dd HH:mm:ss";
private static final String DATE_FORMAT_STR = "yyyy-MM-dd";
- private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT_STR);
- private static final SimpleDateFormat TIMESTAMP_FORMAT = new SimpleDateFormat(TIMESTAMP_FORMAT_STR);
+ private static final ThreadLocal DATE_FORMAT =
+ ThreadLocal.withInitial(() -> new SimpleDateFormat(DATE_FORMAT_STR));
+ private static final ThreadLocal TIMESTAMP_FORMAT =
+ ThreadLocal.withInitial(() -> new SimpleDateFormat(TIMESTAMP_FORMAT_STR));
public static Date getDateFromString(String value, Calendar cal)
throws ParseException {
- DATE_FORMAT.setTimeZone(cal.getTimeZone());
- java.util.Date date = DATE_FORMAT.parse(value);
- Date sqlDate = new Date(date.getTime());
- return sqlDate;
+ SimpleDateFormat dateFormat = DATE_FORMAT.get();
+ dateFormat.setTimeZone(cal.getTimeZone());
+ java.util.Date date = dateFormat.parse(value);
+ return new Date(date.getTime());
}
public static Time getTimeFromString(String value, Calendar cal)
throws ParseException {
- TIMESTAMP_FORMAT.setTimeZone(cal.getTimeZone());
- java.util.Date date = TIMESTAMP_FORMAT.parse(value);
- Time sqlTime = new Time(date.getTime());
- return sqlTime;
+ SimpleDateFormat timestampFormat = TIMESTAMP_FORMAT.get();
+ timestampFormat.setTimeZone(cal.getTimeZone());
+ java.util.Date date = timestampFormat.parse(value);
+ return new Time(date.getTime());
}
public static Timestamp getTimestampFromString(String value, Calendar cal)
throws ParseException {
- TIMESTAMP_FORMAT.setTimeZone(cal.getTimeZone());
- java.util.Date date = TIMESTAMP_FORMAT.parse(value);
- Timestamp sqlTime = new Timestamp(date.getTime());
- return sqlTime;
+ SimpleDateFormat timestampFormat = TIMESTAMP_FORMAT.get();
+ timestampFormat.setTimeZone(cal.getTimeZone());
+ java.util.Date date = timestampFormat.parse(value);
+ return new Timestamp(date.getTime());
}
public static Timestamp getTimestampFromLong(Long value) {
- Timestamp sqlTime = new Timestamp(value);
- return sqlTime;
+ return new Timestamp(value);
}
public static String dateToString(Date date) {
- return DATE_FORMAT.format(date.getTime());
+ return DATE_FORMAT.get().format(date.getTime());
}
public static String timeToString(Time time) {
- return TIMESTAMP_FORMAT.format(time.getTime());
+ return TIMESTAMP_FORMAT.get().format(time.getTime());
}
public static String timeStampToString(Timestamp timestamp) {
- return TIMESTAMP_FORMAT.format(timestamp.getTime());
+ return TIMESTAMP_FORMAT.get().format(timestamp.getTime());
}
public static long timeStampToLong(Timestamp timestamp) {
diff --git a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
index 255d14d47087..c62a9b9e5465 100644
--- a/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
+++ b/pinot-clients/pinot-jdbc-client/src/test/java/org/apache/pinot/client/PinotResultSetTest.java
@@ -26,6 +26,10 @@
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.client.utils.DateTimeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -139,7 +143,7 @@ public void testFetchDates()
@Test
public void testFetchBigDecimals()
- throws Exception {
+ throws Exception {
ResultSetGroup resultSetGroup = getResultSet(TEST_RESULT_SET_RESOURCE);
ResultSet resultSet = resultSetGroup.getResultSet(0);
PinotResultSet pinotResultSet = new PinotResultSet(resultSet);
@@ -207,6 +211,79 @@ public void testGetCalculatedScale() {
Assert.assertEquals(calculatedResult, 3);
}
+ @Test
+ public void testDateFromStringConcurrent()
+ throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ AtomicReference throwable = new AtomicReference<>();
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ Assert.assertEquals(DateTimeUtils.getDateFromString("2020-01-01", Calendar.getInstance()).toString(),
+ "2020-01-01");
+ } catch (Throwable t) {
+ throwable.set(t);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+
+ if (throwable.get() != null) {
+ throw throwable.get();
+ }
+ }
+
+ @Test
+ public void testTimeFromStringConcurrent()
+ throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ AtomicReference throwable = new AtomicReference<>();
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ Assert.assertEquals(DateTimeUtils.getTimeFromString("2020-01-01 12:00:00", Calendar.getInstance()).toString(),
+ "12:00:00");
+ } catch (Throwable t) {
+ throwable.set(t);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+
+ if (throwable.get() != null) {
+ throw throwable.get();
+ }
+ }
+
+ @Test
+ public void testTimestampFromStringConcurrent()
+ throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ AtomicReference throwable = new AtomicReference<>();
+ for (int i = 0; i < 10; i++) {
+ executorService.submit(() -> {
+ try {
+ Assert.assertEquals(
+ DateTimeUtils.getTimestampFromString("2020-01-01 12:00:00", Calendar.getInstance()).toString(),
+ "2020-01-01 12:00:00.0");
+ } catch (Throwable t) {
+ throwable.set(t);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+
+ if (throwable.get() != null) {
+ throw throwable.get();
+ }
+ }
+
private ResultSetGroup getResultSet(String resourceName) {
_dummyJsonTransport._resource = resourceName;
Connection connection = ConnectionFactory.fromHostList(Collections.singletonList("dummy"), _dummyJsonTransport);
diff --git a/pinot-clients/pom.xml b/pinot-clients/pom.xml
index 66cb0f2f30e7..40368b3ed7a0 100644
--- a/pinot-clients/pom.xml
+++ b/pinot-clients/pom.xml
@@ -24,7 +24,7 @@
pinot
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-clients
pom
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index af2001a9e14c..7b44a94375bb 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -24,7 +24,7 @@
pinot
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-common
Pinot Common
@@ -45,7 +45,9 @@
org.apache.maven.plugins
maven-surefire-plugin
-
+
+ 1
+ true
usedefaultlisteners
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOnlyExecutor.java b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
similarity index 50%
rename from pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOnlyExecutor.java
rename to pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
index 6f5bd46c83fc..2bbc25e42a0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
@@ -16,27 +16,36 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.query.distinct.raw;
+package org.apache.pinot.common.concurrency;
-import java.math.BigDecimal;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.query.distinct.DistinctExecutor;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Semaphore;
/**
- * {@link DistinctExecutor} for distinct only queries with single raw BIG_DECIMAL column.
+ * A semaphore that allows adjusting the number of permits in a non-blocking way.
*/
-public class RawBigDecimalSingleColumnDistinctOnlyExecutor extends BaseRawBigDecimalSingleColumnDistinctExecutor {
+public class AdjustableSemaphore extends Semaphore {
- public RawBigDecimalSingleColumnDistinctOnlyExecutor(ExpressionContext expression, DataType dataType, int limit,
- boolean nullHandlingEnabled) {
- super(expression, dataType, limit, nullHandlingEnabled);
+ private int _totalPermits;
+
+ public AdjustableSemaphore(int permits) {
+ super(permits);
+ _totalPermits = permits;
+ }
+
+ public AdjustableSemaphore(int permits, boolean fair) {
+ super(permits, fair);
+ _totalPermits = permits;
}
- @Override
- protected boolean add(BigDecimal value) {
- _valueSet.add(value);
- return _valueSet.size() >= _limit;
+ public void setPermits(int permits) {
+ Preconditions.checkArgument(permits > 0, "Permits must be a positive integer");
+ if (permits < _totalPermits) {
+ reducePermits(_totalPermits - permits);
+ } else if (permits > _totalPermits) {
+ release(permits - _totalPermits);
+ }
+ _totalPermits = permits;
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
new file mode 100644
index 000000000000..186a668d651a
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+ protected String _brokerHost;
+ protected int _brokerPort;
+ protected String _brokerId;
+ protected BrokerMetrics _brokerMetrics;
+ protected long _expirationIntervalInMs;
+
+ protected void init(String brokerHost, int brokerPort, String brokerId, BrokerMetrics brokerMetrics,
+ String expirationTime) {
+ _brokerMetrics = brokerMetrics;
+ _brokerHost = brokerHost;
+ _brokerPort = brokerPort;
+ _brokerId = brokerId;
+ _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+ }
+
+ /**
+ * Initialize the store.
+ * @param config Subset configuration of pinot.broker.cursor.response.store.<type>
+ * @param brokerHost Hostname of the broker where ResponseStore is created
+ * @param brokerPort Port of the broker where the ResponseStore is created
+ * @param brokerId ID of the broker where the ResponseStore is created.
+ * @param brokerMetrics Metrics utility to track cursor metrics.
+ */
+ public abstract void init(PinotConfiguration config, String brokerHost, int brokerPort, String brokerId,
+ BrokerMetrics brokerMetrics, String expirationTime)
+ throws Exception;
+
+ /**
+ * Get the hostname of the broker where the query is executed
+ * @return String containing the hostname
+ */
+ protected String getBrokerHost() {
+ return _brokerHost;
+ }
+
+ /**
+ * Get the port of the broker where the query is executed
+ * @return int containing the port
+ */
+ protected int getBrokerPort() {
+ return _brokerPort;
+ }
+
+ /**
+ * Get the expiration interval of a query response.
+ * @return long containing the expiration interval.
+ */
+ protected long getExpirationIntervalInMs() {
+ return _expirationIntervalInMs;
+ }
+
+ /**
+ * Write a CursorResponse
+ * @param requestId Request ID of the response
+ * @param response The response to write
+ * @throws Exception Thrown if there is any error while writing the response
+ */
+ protected abstract void writeResponse(String requestId, CursorResponse response)
+ throws Exception;
+
+ /**
+ * Write a {@link ResultTable} to the store
+ * @param requestId Request ID of the response
+ * @param resultTable The {@link ResultTable} of the query
+ * @throws Exception Thrown if there is any error while writing the result table.
+ * @return Returns the number of bytes written
+ */
+ protected abstract long writeResultTable(String requestId, ResultTable resultTable)
+ throws Exception;
+
+ /**
+ * Read the response (excluding the {@link ResultTable}) from the store
+ * @param requestId Request ID of the response
+ * @return CursorResponse (without the {@link ResultTable})
+ * @throws Exception Thrown if there is any error while reading the response
+ */
+ public abstract CursorResponse readResponse(String requestId)
+ throws Exception;
+
+ /**
+ * Read the {@link ResultTable} of a query response
+ * @param requestId Request ID of the query
+ * @param offset Offset of the result slice
+ * @param numRows Number of rows required in the slice
+ * @return {@link ResultTable} of the query
+ * @throws Exception Thrown if there is any error while reading the result table
+ */
+ protected abstract ResultTable readResultTable(String requestId, int offset, int numRows)
+ throws Exception;
+
+ protected abstract boolean deleteResponseImpl(String requestId)
+ throws Exception;
+
+ /**
+ * Stores the response in the store. {@link CursorResponse} and {@link ResultTable} are stored separately.
+ * @param response Response to be stored
+ * @throws Exception Thrown if there is any error while storing the response.
+ */
+ public void storeResponse(BrokerResponse response)
+ throws Exception {
+ String requestId = response.getRequestId();
+
+ CursorResponse cursorResponse = new CursorResponseNative(response);
+
+ long submissionTimeMs = System.currentTimeMillis();
+ // Initialize all CursorResponse specific metadata
+ cursorResponse.setBrokerHost(getBrokerHost());
+ cursorResponse.setBrokerPort(getBrokerPort());
+ cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+ cursorResponse.setExpirationTimeMs(submissionTimeMs + getExpirationIntervalInMs());
+ cursorResponse.setOffset(0);
+ cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+ try {
+ long bytesWritten = writeResultTable(requestId, response.getResultTable());
+
+ // Remove the resultTable from the response as it is serialized in a data file.
+ cursorResponse.setResultTable(null);
+ cursorResponse.setBytesWritten(bytesWritten);
+ writeResponse(requestId, cursorResponse);
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE, bytesWritten);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 1);
+ deleteResponse(requestId);
+ throw e;
+ }
+ }
+
+ /**
+ * Reads the response from the store and populates it with a slice of the {@link ResultTable}
+ * @param requestId Request ID of the query
+ * @param offset Offset of the result slice
+ * @param numRows Number of rows required in the slice
+ * @return A CursorResponse with a slice of the {@link ResultTable}
+ * @throws Exception Thrown if there is any error during the operation.
+ */
+ public CursorResponse handleCursorRequest(String requestId, int offset, int numRows)
+ throws Exception {
+
+ CursorResponse response;
+ ResultTable resultTable;
+
+ try {
+ response = readResponse(requestId);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+ throw e;
+ }
+
+ int totalTableRows = response.getNumRowsResultSet();
+
+ if (totalTableRows == 0 && offset == 0) {
+ // If sum records is 0, then result set is empty.
+ response.setResultTable(null);
+ response.setOffset(0);
+ response.setNumRows(0);
+ return response;
+ } else if (offset >= totalTableRows) {
+ throw new RuntimeException("Offset " + offset + " should be lesser than totalRecords " + totalTableRows);
+ }
+
+ long fetchStartTime = System.currentTimeMillis();
+ try {
+ resultTable = readResultTable(requestId, offset, numRows);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 1);
+ throw e;
+ }
+
+ response.setResultTable(resultTable);
+ response.setCursorFetchTimeMs(System.currentTimeMillis() - fetchStartTime);
+ response.setOffset(offset);
+ response.setNumRows(resultTable.getRows().size());
+ response.setNumRowsResultSet(totalTableRows);
+ return response;
+ }
+
+ /**
+ * Returns the list of responses created by the broker.
+ * Note that the ResponseStore object in a broker should only return responses created by it.
+ * @return A list of CursorResponse objects created by the specific broker
+ * @throws Exception Thrown if there is an error during an operation.
+ */
+ public List getAllStoredResponses()
+ throws Exception {
+ List responses = new ArrayList<>();
+
+ for (String requestId : getAllStoredRequestIds()) {
+ responses.add(readResponse(requestId));
+ }
+
+ return responses;
+ }
+
+ @Override
+ public boolean deleteResponse(String requestId) throws Exception {
+ if (!exists(requestId)) {
+ return false;
+ }
+
+ long bytesWritten = readResponse(requestId).getBytesWritten();
+ boolean isSucceeded = deleteResponseImpl(requestId);
+ if (isSucceeded) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE, bytesWritten * -1);
+ }
+ return isSucceeded;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java
index 27c4952b1fcf..d27a3fa6cccd 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java
@@ -40,11 +40,59 @@ public static double divide(double a, double b, double defaultValue) {
return (b == 0) ? defaultValue : a / b;
}
+ @ScalarFunction
+ public static long intDiv(double a, double b) {
+ return (long) Math.floor(a / b);
+ }
+
+ @ScalarFunction
+ public static long intDivOrZero(double a, double b) {
+ //Same as intDiv but returns zero when dividing by zero or when dividing a minimal negative number by minus one.
+ return (b == 0 || (a == Long.MIN_VALUE && b == -1)) ? 0 : intDiv(a, b);
+ }
+
+ @ScalarFunction
+ public static int isFinite(double value) {
+ return Double.isFinite(value) ? 1 : 0;
+ }
+
+ @ScalarFunction
+ public static int isInfinite(double value) {
+ return Double.isInfinite(value) ? 1 : 0;
+ }
+
+ @ScalarFunction
+ public static double ifNotFinite(double valueToCheck, double defaultValue) {
+ return Double.isFinite(valueToCheck) ? valueToCheck : defaultValue;
+ }
+
+ @ScalarFunction
+ public static int isNaN(double value) {
+ return Double.isNaN(value) ? 1 : 0;
+ }
+
@ScalarFunction
public static double mod(double a, double b) {
return a % b;
}
+ @ScalarFunction
+ public static double moduloOrZero(double a, double b) {
+ //Same as mod but returns zero when dividing by zero or when dividing a minimal negative number by minus one.
+ return (b == 0 || (a == Long.MIN_VALUE && b == -1)) ? 0 : mod(a, b);
+ }
+
+ @ScalarFunction
+ public static double positiveModulo(double a, double b) {
+ double result = a % b;
+ return result >= 0 ? result : result + Math.abs(b);
+ }
+
+ @ScalarFunction
+ public static double negate(double a) {
+ return -a;
+ }
+
@ScalarFunction
public static double least(double a, double b) {
return Double.min(a, b);
@@ -117,7 +165,6 @@ public static double power(double a, double exponent) {
return Math.pow(a, exponent);
}
-
// Big Decimal Implementation has been used here to avoid overflows
// when multiplying by Math.pow(10, scale) for rounding
@ScalarFunction
@@ -143,4 +190,33 @@ public static double truncate(double a, int scale) {
public static double truncate(double a) {
return Math.signum(a) * Math.floor(Math.abs(a));
}
+
+ @ScalarFunction
+ public static long gcd(long a, long b) {
+ return a == 0 ? Math.abs(b) : gcd(b % a, a);
+ }
+
+ @ScalarFunction
+ public static long lcm(long a, long b) {
+ if (a == 0 || b == 0) {
+ return 0;
+ }
+ return Math.abs(a) / gcd(a, b) * Math.abs(b);
+ }
+
+ @ScalarFunction
+ public static double hypot(double a, double b) {
+ return Math.hypot(a, b);
+ }
+
+ @ScalarFunction
+ public static int byteswapInt(int a) {
+ return Integer.reverseBytes(a);
+ }
+
+ @ScalarFunction
+ public static long byteswapLong(long a) {
+ // Skip the heading 0s in the long value
+ return Long.reverseBytes(a);
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index ea6a66251ce8..22be35405f4b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -169,7 +169,27 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* For each query with at least one window function, this meter is increased as many times as window functions in the
* query.
*/
- WINDOW_COUNT("queries", true),;
+ WINDOW_COUNT("queries", true),
+
+ /**
+ * Number of queries executed with cursors. This count includes queries that use SSE and MSE
+ */
+ CURSOR_QUERIES_GLOBAL("queries", true),
+
+ /**
+ * Number of exceptions when writing a response to the response store
+ */
+ CURSOR_WRITE_EXCEPTION("exceptions", true),
+
+ /**
+ * Number of exceptions when reading a response and result table from the response store
+ */
+ CURSOR_READ_EXCEPTION("exceptions", true),
+
+ /**
+ * The number of bytes stored in the response store. Only the size of the result table is tracked.
+ */
+ CURSOR_RESPONSE_STORE_SIZE("bytes", true);
private final String _brokerMeterName;
private final String _unit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index cdb99f0f904d..a978219343ec 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -68,6 +68,7 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
NUM_MINION_SUBTASKS_WAITING("NumMinionSubtasksWaiting", true),
NUM_MINION_SUBTASKS_RUNNING("NumMinionSubtasksRunning", true),
NUM_MINION_SUBTASKS_ERROR("NumMinionSubtasksError", true),
+ NUM_MINION_SUBTASKS_UNKNOWN("NumMinionSubtasksUnknown", true),
PERCENT_MINION_SUBTASKS_IN_QUEUE("PercentMinionSubtasksInQueue", true),
PERCENT_MINION_SUBTASKS_IN_ERROR("PercentMinionSubtasksInError", true),
TIER_BACKEND_TABLE_COUNT("TierBackendTableCount", true),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index b999e7b8e435..7c1826582a70 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -77,6 +77,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT("upsertValidDocIdSnapshotCount", false),
UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount", false),
REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
+ REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false),
+ REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false),
REALTIME_CONSUMER_DIR_USAGE("bytes", true);
private final String _gaugeName;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
new file mode 100644
index 000000000000..14e65f6fbb4b
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response;
+
+public interface CursorResponse extends BrokerResponse {
+
+ void setBrokerHost(String brokerHost);
+
+ /**
+ * get hostname of the processing broker
+ * @return String containing the hostname
+ */
+ String getBrokerHost();
+
+ void setBrokerPort(int brokerPort);
+
+ /**
+ * get port of the processing broker
+ * @return int containing the port.
+ */
+ int getBrokerPort();
+
+ /**
+ * Set the starting offset of result table slice
+ * @param offset Offset of the result table slice
+ */
+ void setOffset(int offset);
+
+ /**
+ * Current offset in the query result.
+ * Starts from 0.
+ * @return current offset.
+ */
+ int getOffset();
+
+ /**
+ * Set the number of rows in the result table slice.
+ * @param numRows Number of rows in the result table slice
+ */
+ void setNumRows(int numRows);
+
+ /**
+ * Number of rows in the current response.
+ * @return Number of rows in the current response.
+ */
+ int getNumRows();
+
+ /**
+ * Return the time to write the results to the response store.
+ * @return time in milliseconds
+ */
+ long getCursorResultWriteTimeMs();
+
+ /**
+ * Time taken to write cursor results to query storage.
+ * @param cursorResultWriteMs Time in milliseconds.
+ */
+ void setCursorResultWriteTimeMs(long cursorResultWriteMs);
+
+ /**
+ * Return the time to fetch results from the response store.
+ * @return time in milliseconds.
+ */
+ long getCursorFetchTimeMs();
+
+ /**
+ * Set the time taken to fetch a cursor. The time is specific to the current call.
+ * @param cursorFetchTimeMs time in milliseconds
+ */
+ void setCursorFetchTimeMs(long cursorFetchTimeMs);
+
+ /**
+ * Unix timestamp when the query was submitted. The timestamp is used to calculate the expiration time when the
+ * response will be deleted from the response store.
+ * @param submissionTimeMs Unix timestamp when the query was submitted.
+ */
+ void setSubmissionTimeMs(long submissionTimeMs);
+
+ /**
+ * Get the unix timestamp when the query was submitted
+ * @return Submission unix timestamp when the query was submitted
+ */
+ long getSubmissionTimeMs();
+
+ /**
+ * Set the expiration time (unix timestamp) when the response will be deleted from the response store.
+ * @param expirationTimeMs unix timestamp when the response expires in the response store
+ */
+ void setExpirationTimeMs(long expirationTimeMs);
+
+ /**
+ * Get the expiration time (unix timestamp) when the response will be deleted from the response store.
+ * @return expirationTimeMs unix timestamp when the response expires in the response store
+ */
+ long getExpirationTimeMs();
+
+ /**
+ * Set the number of rows in the result set. This is required because BrokerResponse checks the ResultTable
+ * to get the number of rows. However the ResultTable is set to null in CursorResponse. So the numRowsResultSet has to
+ * be remembered.
+ * @param numRowsResultSet Number of rows in the result set.
+ */
+ void setNumRowsResultSet(int numRowsResultSet);
+
+ /**
+ * Set the number of bytes written to the response store when storing the result table.
+ * @param bytesWritten Number of bytes written
+ */
+ void setBytesWritten(long bytesWritten);
+
+ /**
+ * Get the number of bytes written when storing the result table
+ * @return number of bytes written
+ */
+ long getBytesWritten();
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
new file mode 100644
index 000000000000..d4c220374984
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+
+
+@JsonPropertyOrder({
+ "resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached", "timeUsedMs",
+ "requestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+ "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
+ "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched",
+ "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer",
+ "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs",
+ "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
+ "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
+ "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries",
+ // Fields specific to CursorResponse
+ "offset", "numRows", "cursorResultWriteTimeMs", "cursorFetchTimeMs", "submissionTimeMs", "expirationTimeMs",
+ "brokerHost", "brokerPort", "bytesWritten"
+})
+public class CursorResponseNative extends BrokerResponseNative implements CursorResponse {
+ private int _offset;
+ private int _numRows;
+ private long _cursorResultWriteTimeMs;
+ private long _cursorFetchTimeMs;
+ private long _submissionTimeMs;
+ private long _expirationTimeMs;
+ private String _brokerHost;
+ private int _brokerPort;
+ private long _bytesWritten;
+
+ public CursorResponseNative() {
+ }
+
+ public CursorResponseNative(BrokerResponse response) {
+ // Copy all the member variables of BrokerResponse to CursorResponse.
+ setResultTable(response.getResultTable());
+ setNumRowsResultSet(response.getNumRowsResultSet());
+ setExceptions(response.getExceptions());
+ setNumGroupsLimitReached(response.isNumGroupsLimitReached());
+ setTimeUsedMs(response.getTimeUsedMs());
+ setRequestId(response.getRequestId());
+ setBrokerId(response.getBrokerId());
+ setNumDocsScanned(response.getNumDocsScanned());
+ setTotalDocs(response.getTotalDocs());
+ setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
+ setNumEntriesScannedPostFilter(response.getNumEntriesScannedPostFilter());
+ setNumServersQueried(response.getNumServersQueried());
+ setNumServersResponded(response.getNumServersResponded());
+ setNumSegmentsQueried(response.getNumSegmentsQueried());
+ setNumSegmentsProcessed(response.getNumSegmentsProcessed());
+ setNumSegmentsMatched(response.getNumSegmentsMatched());
+ setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
+ setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
+ setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
+ setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
+ setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
+ setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
+ setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
+ setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
+ setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
+ setBrokerReduceTimeMs(response.getBrokerReduceTimeMs());
+ setOfflineThreadCpuTimeNs(response.getOfflineThreadCpuTimeNs());
+ setRealtimeThreadCpuTimeNs(response.getRealtimeThreadCpuTimeNs());
+ setOfflineSystemActivitiesCpuTimeNs(response.getOfflineSystemActivitiesCpuTimeNs());
+ setRealtimeSystemActivitiesCpuTimeNs(response.getRealtimeSystemActivitiesCpuTimeNs());
+ setOfflineResponseSerializationCpuTimeNs(response.getOfflineResponseSerializationCpuTimeNs());
+ setRealtimeResponseSerializationCpuTimeNs(response.getRealtimeResponseSerializationCpuTimeNs());
+ setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
+ setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
+ setTraceInfo(response.getTraceInfo());
+ setTablesQueried(response.getTablesQueried());
+ }
+
+ @Override
+ public String getBrokerHost() {
+ return _brokerHost;
+ }
+
+ @Override
+ public void setBrokerHost(String brokerHost) {
+ _brokerHost = brokerHost;
+ }
+
+ @Override
+ public int getBrokerPort() {
+ return _brokerPort;
+ }
+
+ @Override
+ public void setBrokerPort(int brokerPort) {
+ _brokerPort = brokerPort;
+ }
+
+ @Override
+ public void setOffset(int offset) {
+ _offset = offset;
+ }
+
+ @Override
+ public void setNumRows(int numRows) {
+ _numRows = numRows;
+ }
+
+ @Override
+ public void setCursorFetchTimeMs(long cursorFetchTimeMs) {
+ _cursorFetchTimeMs = cursorFetchTimeMs;
+ }
+
+ public long getSubmissionTimeMs() {
+ return _submissionTimeMs;
+ }
+
+ @Override
+ public void setSubmissionTimeMs(long submissionTimeMs) {
+ _submissionTimeMs = submissionTimeMs;
+ }
+
+ public long getExpirationTimeMs() {
+ return _expirationTimeMs;
+ }
+
+ @Override
+ public void setBytesWritten(long bytesWritten) {
+ _bytesWritten = bytesWritten;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return _bytesWritten;
+ }
+
+ @Override
+ public void setExpirationTimeMs(long expirationTimeMs) {
+ _expirationTimeMs = expirationTimeMs;
+ }
+
+ @Override
+ public int getOffset() {
+ return _offset;
+ }
+
+ @Override
+ public int getNumRows() {
+ return _numRows;
+ }
+
+ @Override
+ public long getCursorResultWriteTimeMs() {
+ return _cursorResultWriteTimeMs;
+ }
+
+ @Override
+ public void setCursorResultWriteTimeMs(long cursorResultWriteMs) {
+ _cursorResultWriteTimeMs = cursorResultWriteMs;
+ }
+
+ @Override
+ public long getCursorFetchTimeMs() {
+ return _cursorFetchTimeMs;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
index ce54424d16ed..500cfff946c8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdsMetadataInfo.java
@@ -30,17 +30,20 @@ public class ValidDocIdsMetadataInfo {
private final long _totalDocs;
private final String _segmentCrc;
private final ValidDocIdsType _validDocIdsType;
+ private final long _segmentSizeInBytes;
public ValidDocIdsMetadataInfo(@JsonProperty("segmentName") String segmentName,
@JsonProperty("totalValidDocs") long totalValidDocs, @JsonProperty("totalInvalidDocs") long totalInvalidDocs,
@JsonProperty("totalDocs") long totalDocs, @JsonProperty("segmentCrc") String segmentCrc,
- @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType) {
+ @JsonProperty("validDocIdsType") ValidDocIdsType validDocIdsType,
+ @JsonProperty("segmentSizeInBytes") long segmentSizeInBytes) {
_segmentName = segmentName;
_totalValidDocs = totalValidDocs;
_totalInvalidDocs = totalInvalidDocs;
_totalDocs = totalDocs;
_segmentCrc = segmentCrc;
_validDocIdsType = validDocIdsType;
+ _segmentSizeInBytes = segmentSizeInBytes;
}
public String getSegmentName() {
@@ -66,4 +69,8 @@ public String getSegmentCrc() {
public ValidDocIdsType getValidDocIdsType() {
return _validDocIdsType;
}
+
+ public long getSegmentSizeInBytes() {
+ return _segmentSizeInBytes;
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 7751d4dc5688..2fb2aef48a2e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -419,6 +419,10 @@ public boolean isWholeNumberArray() {
return INTEGRAL_ARRAY_TYPES.contains(this);
}
+ public boolean isUnknown() {
+ return UNKNOWN.equals(this);
+ }
+
public boolean isCompatible(ColumnDataType anotherColumnDataType) {
// All numbers are compatible with each other
return this == anotherColumnDataType || (this.isNumber() && anotherColumnDataType.isNumber()) || (
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java
new file mode 100644
index 000000000000..36449a54229f
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PauselessConsumptionUtils.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Optional;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+
+
+public class PauselessConsumptionUtils {
+
+ private PauselessConsumptionUtils() {
+ // Private constructor to prevent instantiation of utility class
+ }
+
+ /**
+ * Checks if pauseless consumption is enabled for the given table configuration.
+ * Returns false if any configuration component is missing or if the flag is not set to true.
+ *
+ * @param tableConfig The table configuration to check. Must not be null.
+ * @return true if pauseless consumption is explicitly enabled, false otherwise
+ * @throws NullPointerException if tableConfig is null
+ */
+ public static boolean isPauselessEnabled(@NotNull TableConfig tableConfig) {
+ return Optional.ofNullable(tableConfig.getIngestionConfig()).map(IngestionConfig::getStreamIngestionConfig)
+ .map(StreamIngestionConfig::isPauselessConsumptionEnabled).orElse(false);
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
index 45a791bc9af2..f034bb3fdcd5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStartableUtils.java
@@ -24,14 +24,13 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_TIMEZONE;
-
public class ServiceStartableUtils {
private ServiceStartableUtils() {
@@ -44,7 +43,10 @@ private ServiceStartableUtils() {
protected static String _timeZone;
/**
- * Applies the ZK cluster config to the given instance config if it does not already exist.
+ * Applies the ZK cluster config to:
+ * - The given instance config if it does not already exist.
+ * - Set the timezone.
+ * - Initialize the default values in {@link ForwardIndexConfig}.
*
* In the ZK cluster config:
* - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
@@ -70,7 +72,8 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
if (clusterConfigZNRecord == null) {
LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
- setupTimezone(instanceConfig);
+ setTimezone(instanceConfig);
+ initForwardIndexConfig(instanceConfig);
return;
}
@@ -90,9 +93,10 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
}
}
} finally {
- zkClient.close();
+ ZkStarter.closeAsync(zkClient);
}
- setupTimezone(instanceConfig);
+ setTimezone(instanceConfig);
+ initForwardIndexConfig(instanceConfig);
}
private static void addConfigIfNotExists(PinotConfiguration instanceConfig, String key, String value) {
@@ -101,10 +105,31 @@ private static void addConfigIfNotExists(PinotConfiguration instanceConfig, Stri
}
}
- private static void setupTimezone(PinotConfiguration instanceConfig) {
+ private static void setTimezone(PinotConfiguration instanceConfig) {
TimeZone localTimezone = TimeZone.getDefault();
- _timeZone = instanceConfig.getProperty(CONFIG_OF_TIMEZONE, localTimezone.getID());
+ _timeZone = instanceConfig.getProperty(CommonConstants.CONFIG_OF_TIMEZONE, localTimezone.getID());
System.setProperty("user.timezone", _timeZone);
LOGGER.info("Timezone: {}", _timeZone);
}
+
+ private static void initForwardIndexConfig(PinotConfiguration instanceConfig) {
+ String defaultRawIndexWriterVersion =
+ instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_RAW_INDEX_WRITER_VERSION);
+ if (defaultRawIndexWriterVersion != null) {
+ LOGGER.info("Setting forward index default raw index writer version to: {}", defaultRawIndexWriterVersion);
+ ForwardIndexConfig.setDefaultRawIndexWriterVersion(Integer.parseInt(defaultRawIndexWriterVersion));
+ }
+ String defaultTargetMaxChunkSize =
+ instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_MAX_CHUNK_SIZE);
+ if (defaultTargetMaxChunkSize != null) {
+ LOGGER.info("Setting forward index default target max chunk size to: {}", defaultTargetMaxChunkSize);
+ ForwardIndexConfig.setDefaultTargetMaxChunkSize(defaultTargetMaxChunkSize);
+ }
+ String defaultTargetDocsPerChunk =
+ instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_DOCS_PER_CHUNK);
+ if (defaultTargetDocsPerChunk != null) {
+ LOGGER.info("Setting forward index default target docs per chunk to: {}", defaultTargetDocsPerChunk);
+ ForwardIndexConfig.setDefaultTargetDocsPerChunk(Integer.parseInt(defaultTargetDocsPerChunk));
+ }
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/Timer.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/Timer.java
new file mode 100644
index 000000000000..23d3ca2da4a3
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/Timer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+/**
+ * Utility class that works with a timeout in milliseconds and provides methods to check remaining time and expiration.
+ */
+public class Timer {
+ private final long _timeoutMillis;
+ private final long _startTime;
+
+ /**
+ * Initializes the Timer with the specified timeout in milliseconds.
+ *
+ * @param timeoutMillis the timeout duration in milliseconds
+ */
+ public Timer(long timeoutMillis) {
+ _timeoutMillis = timeoutMillis;
+ _startTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Returns the remaining time in milliseconds. If the timeout has expired, it returns 0.
+ *
+ * @return the remaining time in milliseconds
+ */
+ public long getRemainingTime() {
+ long elapsedTime = System.currentTimeMillis() - _startTime;
+ long remainingTime = _timeoutMillis - elapsedTime;
+ return Math.max(remainingTime, 0);
+ }
+
+ /**
+ * Checks if the timer has expired.
+ *
+ * @return true if the timer has expired, false otherwise
+ */
+ public boolean hasExpired() {
+ return getRemainingTime() == 0;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java
index de3be516dbb0..3a15089710cf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/ZkStarter.java
@@ -21,6 +21,8 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.spi.utils.NetUtils;
@@ -179,10 +181,9 @@ public void run() {
// Wait until the ZK server is started
for (int retry = 0; retry < DEFAULT_ZK_CLIENT_RETRIES; retry++) {
try {
- Thread.sleep(1000L);
ZkClient client = new ZkClient("localhost:" + port, 1000 * (DEFAULT_ZK_CLIENT_RETRIES - retry));
client.waitUntilConnected(DEFAULT_ZK_CLIENT_RETRIES - retry, TimeUnit.SECONDS);
- client.close();
+ closeAsync(client);
break;
} catch (Exception e) {
if (retry < DEFAULT_ZK_CLIENT_RETRIES - 1) {
@@ -191,6 +192,7 @@ public void run() {
LOGGER.warn("Failed to connect to zk server.", e);
throw e;
}
+ Thread.sleep(50L);
}
}
return new ZookeeperInstance(zookeeperServerMain, dataDirPath, port);
@@ -200,6 +202,17 @@ public void run() {
}
}
+ public static void closeAsync(ZkClient client) {
+ if (client != null) {
+ ZK_DISCONNECTOR.submit(() -> {
+ client.close();
+ });
+ }
+ }
+
+ private static final ExecutorService ZK_DISCONNECTOR =
+ Executors.newFixedThreadPool(1, new NamedThreadFactory("zk-disconnector"));
+
/**
* Stops a local Zk instance, deleting its data directory
*/
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 8dbd4bb40228..5f88a9691c0b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -190,6 +190,15 @@ public static boolean isUseMultistageEngine(Map queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.USE_MULTISTAGE_ENGINE));
}
+ public static boolean isGetCursor(Map queryOptions) {
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.GET_CURSOR));
+ }
+
+ public static Integer getCursorNumRows(Map queryOptions) {
+ String cursorNumRows = queryOptions.get(QueryOptionKey.CURSOR_NUM_ROWS);
+ return checkedParseIntPositive(QueryOptionKey.CURSOR_NUM_ROWS, cursorNumRows);
+ }
+
public static Optional isExplainAskingServers(Map queryOptions) {
String value = queryOptions.get(QueryOptionKey.EXPLAIN_ASKING_SERVERS);
if (value == null) {
@@ -204,6 +213,13 @@ public static Integer getMaxExecutionThreads(Map queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString);
}
+ @Nullable
+ public static Integer getGroupTrimSize(Map queryOptions) {
+ String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE);
+ // NOTE: Non-positive value means turning off the intermediate level trim
+ return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize);
+ }
+
@Nullable
public static Integer getMinSegmentGroupTrimSize(Map queryOptions) {
String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
@@ -259,6 +275,10 @@ public static Integer getMultiStageLeafLimit(Map queryOptions) {
return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr);
}
+ public static boolean getErrorOnNumGroupsLimit(Map queryOptions) {
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ERROR_ON_NUM_GROUPS_LIMIT));
+ }
+
@Nullable
public static Integer getNumGroupsLimit(Map queryOptions) {
String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index b8c013427d1c..2d1e38d84a64 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -42,6 +43,7 @@
import org.apache.calcite.sql.SqlNumericLiteral;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.ExpressionType;
@@ -53,6 +55,7 @@
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlCompilationException;
@@ -631,4 +634,32 @@ public static Map getOptionsFromJson(JsonNode request, String op
public static Map getOptionsFromString(String optionStr) {
return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(optionStr);
}
+
+ public static void applyTimestampIndexOverrideHints(Expression expression, PinotQuery query) {
+ applyTimestampIndexOverrideHints(expression, query, timeColumnWithGranularity -> true);
+ }
+
+ public static void applyTimestampIndexOverrideHints(
+ Expression expression, PinotQuery query, Predicate timeColumnWithGranularityPredicate
+ ) {
+ if (!expression.isSetFunctionCall()) {
+ return;
+ }
+ Function function = expression.getFunctionCall();
+ if (!function.getOperator().equalsIgnoreCase(TransformFunctionType.DATE_TRUNC.getName())) {
+ return;
+ }
+ String granularString = function.getOperands().get(0).getLiteral().getStringValue().toUpperCase();
+ Expression timeExpression = function.getOperands().get(1);
+ if (((function.getOperandsSize() == 2) || (function.getOperandsSize() == 3 && "MILLISECONDS".equalsIgnoreCase(
+ function.getOperands().get(2).getLiteral().getStringValue()))) && TimestampIndexUtils.isValidGranularity(
+ granularString) && timeExpression.getIdentifier() != null) {
+ String timeColumn = timeExpression.getIdentifier().getName();
+ String timeColumnWithGranularity = TimestampIndexUtils.getColumnWithGranularity(timeColumn, granularString);
+
+ if (timeColumnWithGranularityPredicate.test(timeColumnWithGranularity)) {
+ query.putToExpressionOverrideHints(expression, getIdentifierExpression(timeColumnWithGranularity));
+ }
+ }
+ }
}
diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto
index 49d357307648..5e3d733e45e4 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -69,6 +69,8 @@ message AggregateNode {
repeated int32 groupKeys = 3;
AggType aggType = 4;
bool leafReturnFinalResult = 5;
+ repeated Collation collations = 6;
+ int32 limit = 7;
}
message FilterNode {
@@ -144,13 +146,15 @@ message MailboxReceiveNode {
}
message MailboxSendNode {
- int32 receiverStageId = 1;
+ // kept for backward compatibility. Brokers populate it, but servers should prioritize receiverStageIds
+ int32 receiverStageId = 1 [deprecated = true];
ExchangeType exchangeType = 2;
DistributionType distributionType = 3;
repeated int32 keys = 4;
bool prePartitioned = 5;
repeated Collation collations = 6;
bool sort = 7;
+ repeated int32 receiverStageIds = 8;
}
message ProjectNode {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
index 21645d201cbb..1f458a444829 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
@@ -40,6 +40,7 @@ public abstract class ControllerPrometheusMetricsTest extends PinotPrometheusMet
private static final List GLOBAL_GAUGES_ACCEPTING_TASKTYPE =
List.of(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS, ControllerGauge.NUM_MINION_SUBTASKS_RUNNING,
ControllerGauge.NUM_MINION_SUBTASKS_WAITING, ControllerGauge.NUM_MINION_SUBTASKS_ERROR,
+ ControllerGauge.NUM_MINION_SUBTASKS_UNKNOWN,
ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR);
//local gauges that accept partition
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
index 7ff5a99b8792..4a4a3c14407a 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
@@ -56,7 +56,8 @@ public abstract class ServerPrometheusMetricsTest extends PinotPrometheusMetrics
List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG, ServerGauge.REALTIME_INGESTION_DELAY_MS,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
- ServerGauge.DEDUP_PRIMARY_KEYS_COUNT);
+ ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
+ ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
private static final List GAUGES_ACCEPTING_RAW_TABLE_NAME =
List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardBrokerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardBrokerPrometheusMetricsTest.java
index 8d1abd44b286..6ba25fed7920 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardBrokerPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardBrokerPrometheusMetricsTest.java
@@ -19,6 +19,9 @@
package org.apache.pinot.common.metrics.prometheus.dropwizard;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.metrics.prometheus.BrokerPrometheusMetricsTest;
import org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory;
import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
@@ -28,7 +31,7 @@
/**
* Disabling tests as Pinot currently uses Yammer and these tests fail for for {@link DropwizardMetricsFactory}
*/
-@Test(enabled = false)
+@Test(enabled = false) // enabled=false on class level doesn't seem to work in intellij
public class DropwizardBrokerPrometheusMetricsTest extends BrokerPrometheusMetricsTest {
@Override
protected PinotMetricsFactory getPinotMetricsFactory() {
@@ -40,4 +43,19 @@ protected String getConfigFile() {
//todo: return the correct dir once this test is enabled
return null;
}
+
+ @Test(dataProvider = "brokerGauges", enabled = false)
+ public void timerTest(BrokerTimer timer) {
+ super.timerTest(timer);
+ }
+
+ @Test(dataProvider = "brokerMeters", enabled = false)
+ public void meterTest(BrokerMeter meter) {
+ super.meterTest(meter);
+ }
+
+ @Test(dataProvider = "brokerGauges", enabled = false)
+ public void gaugeTest(BrokerGauge gauge) {
+ super.gaugeTest(gauge);
+ }
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardControllerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardControllerPrometheusMetricsTest.java
index 005d83466b91..9c4bc7b45dbb 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardControllerPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardControllerPrometheusMetricsTest.java
@@ -19,6 +19,9 @@
package org.apache.pinot.common.metrics.prometheus.dropwizard;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.common.metrics.prometheus.ControllerPrometheusMetricsTest;
import org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory;
import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
@@ -28,7 +31,7 @@
/**
* Disabling tests as Pinot currently uses Yammer and these tests fail for for {@link DropwizardMetricsFactory}
*/
-@Test(enabled = false)
+@Test(enabled = false) // enabled=false on class level doesn't seem to work in intellij
public class DropwizardControllerPrometheusMetricsTest extends ControllerPrometheusMetricsTest {
@Override
@@ -41,4 +44,19 @@ protected String getConfigFile() {
//todo: return the correct dir once this test is enabled
return null;
}
+
+ @Test(dataProvider = "controllerTimers", enabled = false)
+ public void timerTest(ControllerTimer controllerTimer) {
+ super.timerTest(controllerTimer);
+ }
+
+ @Test(dataProvider = "controllerMeters", enabled = false)
+ public void meterTest(ControllerMeter meter) {
+ super.meterTest(meter);
+ }
+
+ @Test(dataProvider = "controllerGauges", enabled = false)
+ public void gaugeTest(ControllerGauge controllerGauge) {
+ super.gaugeTest(controllerGauge);
+ }
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardMinionPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardMinionPrometheusMetricsTest.java
index 08183428a8eb..3bfa51f52664 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardMinionPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardMinionPrometheusMetricsTest.java
@@ -19,6 +19,9 @@
package org.apache.pinot.common.metrics.prometheus.dropwizard;
+import org.apache.pinot.common.metrics.MinionGauge;
+import org.apache.pinot.common.metrics.MinionMeter;
+import org.apache.pinot.common.metrics.MinionTimer;
import org.apache.pinot.common.metrics.prometheus.MinionPrometheusMetricsTest;
import org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory;
import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
@@ -28,7 +31,7 @@
/**
* Disabling tests as Pinot currently uses Yammer and these tests fail for for {@link DropwizardMetricsFactory}
*/
-@Test(enabled = false)
+@Test(enabled = false) // enabled=false on class level doesn't seem to work in intellij
public class DropwizardMinionPrometheusMetricsTest extends MinionPrometheusMetricsTest {
@Override
@@ -41,4 +44,19 @@ protected String getConfigFile() {
//todo: return the correct dir once this test is enabled
return null;
}
+
+ @Test(dataProvider = "minionTimers", enabled = false)
+ public void timerTest(MinionTimer timer) {
+ super.timerTest(timer);
+ }
+
+ @Test(dataProvider = "minionMeters", enabled = false)
+ public void meterTest(MinionMeter meter) {
+ super.meterTest(meter);
+ }
+
+ @Test(dataProvider = "minionGauges", enabled = false)
+ public void gaugeTest(MinionGauge gauge) {
+ super.gaugeTest(gauge);
+ }
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardServerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardServerPrometheusMetricsTest.java
index 9fc5bf4b9690..70768e995455 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardServerPrometheusMetricsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/dropwizard/DropwizardServerPrometheusMetricsTest.java
@@ -19,6 +19,9 @@
package org.apache.pinot.common.metrics.prometheus.dropwizard;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.metrics.prometheus.ServerPrometheusMetricsTest;
import org.apache.pinot.plugin.metrics.dropwizard.DropwizardMetricsFactory;
import org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory;
@@ -29,7 +32,7 @@
/**
* Disabling tests as Pinot currently uses Yammer and these tests fail for for {@link DropwizardMetricsFactory}
*/
-@Test(enabled = false)
+@Test(enabled = false) // enabled=false on class level doesn't seem to work in intellij
public class DropwizardServerPrometheusMetricsTest extends ServerPrometheusMetricsTest {
@Override
@@ -42,4 +45,19 @@ protected String getConfigFile() {
//todo: return the correct dir once this test is enabled
return null;
}
+
+ @Test(dataProvider = "serverTimers", enabled = false)
+ public void timerTest(ServerTimer serverTimer) {
+ super.timerTest(serverTimer);
+ }
+
+ @Test(dataProvider = "serverMeters", enabled = false)
+ public void meterTest(ServerMeter serverMeter) {
+ super.meterTest(serverMeter);
+ }
+
+ @Test(dataProvider = "serverGauges", enabled = false)
+ public void gaugeTest(ServerGauge serverGauge) {
+ super.gaugeTest(serverGauge);
+ }
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
index 245ea7235dc5..47807d674b6f 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.common.utils;
+import com.fasterxml.jackson.core.JsonProcessingException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -220,6 +222,22 @@ public void testJSON() {
assertEquals(JSON.convert(new Timestamp(1620324238610L), TIMESTAMP), "1620324238610");
}
+ @Test
+ public void testJSONArray()
+ throws JsonProcessingException {
+ assertEquals(JSON.convert(new Object[]{false}, BOOLEAN), "[false]");
+ assertEquals(JSON.convert(new Object[]{true}, BOOLEAN), "[true]"); // Base64 encoding.
+ assertEquals(JSON.convert(new Object[]{
+ JsonUtils.stringToObject("{\"bytes\":\"AAE=\"}", Map.class),
+ JsonUtils.stringToObject("{\"map\":{\"key1\":\"value\",\"key2\":null,\"array\":[-5.4,4,\"2\"]}}",
+ Map.class),
+ JsonUtils.stringToObject("{\"timestamp\":1620324238610}", Map.class)}, JSON),
+ "[{\"bytes\":\"AAE=\"},{\"map\":{\"key1\":\"value\",\"key2\":null,\"array\":[-5.4,4,\"2\"]}},"
+ + "{\"timestamp\":1620324238610}]");
+ assertEquals(JSON.convert(new Object[]{}, JSON), "[]");
+ assertEquals(JSON.convert(new Object[]{new Timestamp(1620324238610L)}, TIMESTAMP), "[1620324238610]");
+ }
+
@Test
public void testObject() {
assertEquals(OBJECT.toInt(new NumberObject("123")), 123);
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
index 05dfff8a6f45..f5dba93bf2f9 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
@@ -48,132 +48,134 @@ public void testHostAddressRoundRobin()
InetAddress.getByAddress("localhost", InetAddresses.forString("0:0:0:0:0:0:0:1").getAddress())
};
- MockedStatic mock = Mockito.mockStatic(InetAddress.class);
- mock.when(() -> InetAddress.getAllByName("localhost")).thenReturn(localHostAddresses);
- mock.when(() -> InetAddress.getAllByName("testweb.com")).thenReturn(testWebAddresses);
+ try (MockedStatic mock = Mockito.mockStatic(InetAddress.class)) {
+ mock.when(() -> InetAddress.getAllByName("localhost")).thenReturn(localHostAddresses);
+ mock.when(() -> InetAddress.getAllByName("testweb.com")).thenReturn(testWebAddresses);
- TestCase[] testCases = new TestCase[]{
- new TestCase("http://127.0.0.1", Collections.singletonList("http://127.0.0.1")),
- new TestCase("http://127.0.0.1/", Collections.singletonList("http://127.0.0.1/")),
- new TestCase("http://127.0.0.1/?", Collections.singletonList("http://127.0.0.1/?")),
- new TestCase("http://127.0.0.1/?it=5", Collections.singletonList("http://127.0.0.1/?it=5")),
- new TestCase("http://127.0.0.1/me/out?it=5", Collections.singletonList("http://127.0.0.1/me/out?it=5")),
- new TestCase("http://127.0.0.1:20000", Collections.singletonList("http://127.0.0.1:20000")),
- new TestCase("http://127.0.0.1:20000/", Collections.singletonList("http://127.0.0.1:20000/")),
- new TestCase("http://127.0.0.1:20000/?", Collections.singletonList("http://127.0.0.1:20000/?")),
- new TestCase("http://127.0.0.1:20000/?it=5", Collections.singletonList("http://127.0.0.1:20000/?it=5")),
- new TestCase("http://127.0.0.1:20000/me/out?it=5",
- Collections.singletonList("http://127.0.0.1:20000/me/out?it=5")),
+ TestCase[] testCases = new TestCase[]{
+ new TestCase("http://127.0.0.1", Collections.singletonList("http://127.0.0.1")),
+ new TestCase("http://127.0.0.1/", Collections.singletonList("http://127.0.0.1/")),
+ new TestCase("http://127.0.0.1/?", Collections.singletonList("http://127.0.0.1/?")),
+ new TestCase("http://127.0.0.1/?it=5", Collections.singletonList("http://127.0.0.1/?it=5")),
+ new TestCase("http://127.0.0.1/me/out?it=5", Collections.singletonList("http://127.0.0.1/me/out?it=5")),
+ new TestCase("http://127.0.0.1:20000", Collections.singletonList("http://127.0.0.1:20000")),
+ new TestCase("http://127.0.0.1:20000/", Collections.singletonList("http://127.0.0.1:20000/")),
+ new TestCase("http://127.0.0.1:20000/?", Collections.singletonList("http://127.0.0.1:20000/?")),
+ new TestCase("http://127.0.0.1:20000/?it=5", Collections.singletonList("http://127.0.0.1:20000/?it=5")),
+ new TestCase("http://127.0.0.1:20000/me/out?it=5",
+ Collections.singletonList("http://127.0.0.1:20000/me/out?it=5")),
- new TestCase("http://localhost", Arrays.asList("http://127.0.0.1", "http://[0:0:0:0:0:0:0:1]")),
- new TestCase("http://localhost/", Arrays.asList("http://127.0.0.1/", "http://[0:0:0:0:0:0:0:1]/")),
- new TestCase("http://localhost/?", Arrays.asList("http://127.0.0.1/?", "http://[0:0:0:0:0:0:0:1]/?")),
- new TestCase("http://localhost/?it=5",
- Arrays.asList("http://127.0.0.1/?it=5", "http://[0:0:0:0:0:0:0:1]/?it=5")),
- new TestCase("http://localhost/me/out?it=5",
- Arrays.asList("http://127.0.0.1/me/out?it=5", "http://[0:0:0:0:0:0:0:1]/me/out?it=5")),
- new TestCase("http://localhost:20000",
- Arrays.asList("http://127.0.0.1:20000", "http://[0:0:0:0:0:0:0:1]:20000")),
- new TestCase("http://localhost:20000/",
- Arrays.asList("http://127.0.0.1:20000/", "http://[0:0:0:0:0:0:0:1]:20000/")),
- new TestCase("http://localhost:20000/?",
- Arrays.asList("http://127.0.0.1:20000/?", "http://[0:0:0:0:0:0:0:1]:20000/?")),
- new TestCase("http://localhost:20000/?it=5",
- Arrays.asList("http://127.0.0.1:20000/?it=5", "http://[0:0:0:0:0:0:0:1]:20000/?it=5")),
- new TestCase("http://localhost:20000/me/out?it=5",
- Arrays.asList("http://127.0.0.1:20000/me/out?it=5", "http://[0:0:0:0:0:0:0:1]:20000/me/out?it=5")),
+ new TestCase("http://localhost", Arrays.asList("http://127.0.0.1", "http://[0:0:0:0:0:0:0:1]")),
+ new TestCase("http://localhost/", Arrays.asList("http://127.0.0.1/", "http://[0:0:0:0:0:0:0:1]/")),
+ new TestCase("http://localhost/?", Arrays.asList("http://127.0.0.1/?", "http://[0:0:0:0:0:0:0:1]/?")),
+ new TestCase("http://localhost/?it=5",
+ Arrays.asList("http://127.0.0.1/?it=5", "http://[0:0:0:0:0:0:0:1]/?it=5")),
+ new TestCase("http://localhost/me/out?it=5",
+ Arrays.asList("http://127.0.0.1/me/out?it=5", "http://[0:0:0:0:0:0:0:1]/me/out?it=5")),
+ new TestCase("http://localhost:20000",
+ Arrays.asList("http://127.0.0.1:20000", "http://[0:0:0:0:0:0:0:1]:20000")),
+ new TestCase("http://localhost:20000/",
+ Arrays.asList("http://127.0.0.1:20000/", "http://[0:0:0:0:0:0:0:1]:20000/")),
+ new TestCase("http://localhost:20000/?",
+ Arrays.asList("http://127.0.0.1:20000/?", "http://[0:0:0:0:0:0:0:1]:20000/?")),
+ new TestCase("http://localhost:20000/?it=5",
+ Arrays.asList("http://127.0.0.1:20000/?it=5", "http://[0:0:0:0:0:0:0:1]:20000/?it=5")),
+ new TestCase("http://localhost:20000/me/out?it=5",
+ Arrays.asList("http://127.0.0.1:20000/me/out?it=5", "http://[0:0:0:0:0:0:0:1]:20000/me/out?it=5")),
- new TestCase("http://testweb.com",
- Arrays.asList("http://192.168.3.1", "http://192.168.3.2", "http://192.168.3.3")),
- new TestCase("http://testweb.com/",
- Arrays.asList("http://192.168.3.1/", "http://192.168.3.2/", "http://192.168.3.3/")),
- new TestCase("http://testweb.com/?",
- Arrays.asList("http://192.168.3.1/?", "http://192.168.3.2/?", "http://192.168.3.3/?")),
- new TestCase("http://testweb.com/?it=5",
- Arrays.asList("http://192.168.3.1/?it=5", "http://192.168.3.2/?it=5", "http://192.168.3.3/?it=5")),
- new TestCase("http://testweb.com/me/out?it=5",
- Arrays.asList("http://192.168.3.1/me/out?it=5", "http://192.168.3.2/me/out?it=5",
- "http://192.168.3.3/me/out?it=5")),
- new TestCase("http://testweb.com:20000",
- Arrays.asList("http://192.168.3.1:20000", "http://192.168.3.2:20000", "http://192.168.3.3:20000")),
- new TestCase("http://testweb.com:20000/",
- Arrays.asList("http://192.168.3.1:20000/", "http://192.168.3.2:20000/", "http://192.168.3.3:20000/")),
- new TestCase("http://testweb.com:20000/?",
- Arrays.asList("http://192.168.3.1:20000/?", "http://192.168.3.2:20000/?", "http://192.168.3.3:20000/?")),
- new TestCase("http://testweb.com:20000/?it=5",
- Arrays.asList("http://192.168.3.1:20000/?it=5", "http://192.168.3.2:20000/?it=5",
- "http://192.168.3.3:20000/?it=5")),
- new TestCase("http://testweb.com:20000/me/out?it=5",
- Arrays.asList("http://192.168.3.1:20000/me/out?it=5", "http://192.168.3.2:20000/me/out?it=5",
- "http://192.168.3.3:20000/me/out?it=5")),
+ new TestCase("http://testweb.com",
+ Arrays.asList("http://192.168.3.1", "http://192.168.3.2", "http://192.168.3.3")),
+ new TestCase("http://testweb.com/",
+ Arrays.asList("http://192.168.3.1/", "http://192.168.3.2/", "http://192.168.3.3/")),
+ new TestCase("http://testweb.com/?",
+ Arrays.asList("http://192.168.3.1/?", "http://192.168.3.2/?", "http://192.168.3.3/?")),
+ new TestCase("http://testweb.com/?it=5",
+ Arrays.asList("http://192.168.3.1/?it=5", "http://192.168.3.2/?it=5", "http://192.168.3.3/?it=5")),
+ new TestCase("http://testweb.com/me/out?it=5",
+ Arrays.asList("http://192.168.3.1/me/out?it=5", "http://192.168.3.2/me/out?it=5",
+ "http://192.168.3.3/me/out?it=5")),
+ new TestCase("http://testweb.com:20000",
+ Arrays.asList("http://192.168.3.1:20000", "http://192.168.3.2:20000", "http://192.168.3.3:20000")),
+ new TestCase("http://testweb.com:20000/",
+ Arrays.asList("http://192.168.3.1:20000/", "http://192.168.3.2:20000/", "http://192.168.3.3:20000/")),
+ new TestCase("http://testweb.com:20000/?",
+ Arrays.asList("http://192.168.3.1:20000/?", "http://192.168.3.2:20000/?", "http://192.168.3.3:20000/?")),
+ new TestCase("http://testweb.com:20000/?it=5",
+ Arrays.asList("http://192.168.3.1:20000/?it=5", "http://192.168.3.2:20000/?it=5",
+ "http://192.168.3.3:20000/?it=5")),
+ new TestCase("http://testweb.com:20000/me/out?it=5",
+ Arrays.asList("http://192.168.3.1:20000/me/out?it=5", "http://192.168.3.2:20000/me/out?it=5",
+ "http://192.168.3.3:20000/me/out?it=5")),
- new TestCase("https://127.0.0.1", Collections.singletonList("https://127.0.0.1")),
- new TestCase("https://127.0.0.1/", Collections.singletonList("https://127.0.0.1/")),
- new TestCase("https://127.0.0.1/?", Collections.singletonList("https://127.0.0.1/?")),
- new TestCase("https://127.0.0.1/?it=5", Collections.singletonList("https://127.0.0.1/?it=5")),
- new TestCase("https://127.0.0.1/me/out?it=5", Collections.singletonList("https://127.0.0.1/me/out?it=5")),
- new TestCase("https://127.0.0.1:20000", Collections.singletonList("https://127.0.0.1:20000")),
- new TestCase("https://127.0.0.1:20000/", Collections.singletonList("https://127.0.0.1:20000/")),
- new TestCase("https://127.0.0.1:20000/?", Collections.singletonList("https://127.0.0.1:20000/?")),
- new TestCase("https://127.0.0.1:20000/?it=5", Collections.singletonList("https://127.0.0.1:20000/?it=5")),
- new TestCase("https://127.0.0.1:20000/me/out?it=5",
- Collections.singletonList("https://127.0.0.1:20000/me/out?it=5")),
+ new TestCase("https://127.0.0.1", Collections.singletonList("https://127.0.0.1")),
+ new TestCase("https://127.0.0.1/", Collections.singletonList("https://127.0.0.1/")),
+ new TestCase("https://127.0.0.1/?", Collections.singletonList("https://127.0.0.1/?")),
+ new TestCase("https://127.0.0.1/?it=5", Collections.singletonList("https://127.0.0.1/?it=5")),
+ new TestCase("https://127.0.0.1/me/out?it=5", Collections.singletonList("https://127.0.0.1/me/out?it=5")),
+ new TestCase("https://127.0.0.1:20000", Collections.singletonList("https://127.0.0.1:20000")),
+ new TestCase("https://127.0.0.1:20000/", Collections.singletonList("https://127.0.0.1:20000/")),
+ new TestCase("https://127.0.0.1:20000/?", Collections.singletonList("https://127.0.0.1:20000/?")),
+ new TestCase("https://127.0.0.1:20000/?it=5", Collections.singletonList("https://127.0.0.1:20000/?it=5")),
+ new TestCase("https://127.0.0.1:20000/me/out?it=5",
+ Collections.singletonList("https://127.0.0.1:20000/me/out?it=5")),
- new TestCase("https://localhost", Arrays.asList("https://127.0.0.1", "https://[0:0:0:0:0:0:0:1]")),
- new TestCase("https://localhost/", Arrays.asList("https://127.0.0.1/", "https://[0:0:0:0:0:0:0:1]/")),
- new TestCase("https://localhost/?", Arrays.asList("https://127.0.0.1/?", "https://[0:0:0:0:0:0:0:1]/?")),
- new TestCase("https://localhost/?it=5",
- Arrays.asList("https://127.0.0.1/?it=5", "https://[0:0:0:0:0:0:0:1]/?it=5")),
- new TestCase("https://localhost/me/out?it=5",
- Arrays.asList("https://127.0.0.1/me/out?it=5", "https://[0:0:0:0:0:0:0:1]/me/out?it=5")),
- new TestCase("https://localhost:20000",
- Arrays.asList("https://127.0.0.1:20000", "https://[0:0:0:0:0:0:0:1]:20000")),
- new TestCase("https://localhost:20000/",
- Arrays.asList("https://127.0.0.1:20000/", "https://[0:0:0:0:0:0:0:1]:20000/")),
- new TestCase("https://localhost:20000/?",
- Arrays.asList("https://127.0.0.1:20000/?", "https://[0:0:0:0:0:0:0:1]:20000/?")),
- new TestCase("https://localhost:20000/?it=5",
- Arrays.asList("https://127.0.0.1:20000/?it=5", "https://[0:0:0:0:0:0:0:1]:20000/?it=5")),
+ new TestCase("https://localhost", Arrays.asList("https://127.0.0.1", "https://[0:0:0:0:0:0:0:1]")),
+ new TestCase("https://localhost/", Arrays.asList("https://127.0.0.1/", "https://[0:0:0:0:0:0:0:1]/")),
+ new TestCase("https://localhost/?", Arrays.asList("https://127.0.0.1/?", "https://[0:0:0:0:0:0:0:1]/?")),
+ new TestCase("https://localhost/?it=5",
+ Arrays.asList("https://127.0.0.1/?it=5", "https://[0:0:0:0:0:0:0:1]/?it=5")),
+ new TestCase("https://localhost/me/out?it=5",
+ Arrays.asList("https://127.0.0.1/me/out?it=5", "https://[0:0:0:0:0:0:0:1]/me/out?it=5")),
+ new TestCase("https://localhost:20000",
+ Arrays.asList("https://127.0.0.1:20000", "https://[0:0:0:0:0:0:0:1]:20000")),
+ new TestCase("https://localhost:20000/",
+ Arrays.asList("https://127.0.0.1:20000/", "https://[0:0:0:0:0:0:0:1]:20000/")),
+ new TestCase("https://localhost:20000/?",
+ Arrays.asList("https://127.0.0.1:20000/?", "https://[0:0:0:0:0:0:0:1]:20000/?")),
+ new TestCase("https://localhost:20000/?it=5",
+ Arrays.asList("https://127.0.0.1:20000/?it=5", "https://[0:0:0:0:0:0:0:1]:20000/?it=5")),
- new TestCase("https://testweb.com",
- Arrays.asList("https://192.168.3.1", "https://192.168.3.2", "https://192.168.3.3")),
- new TestCase("https://testweb.com/",
- Arrays.asList("https://192.168.3.1/", "https://192.168.3.2/", "https://192.168.3.3/")),
- new TestCase("https://testweb.com/?",
- Arrays.asList("https://192.168.3.1/?", "https://192.168.3.2/?", "https://192.168.3.3/?")),
- new TestCase("https://testweb.com/?it=5",
- Arrays.asList("https://192.168.3.1/?it=5", "https://192.168.3.2/?it=5", "https://192.168.3.3/?it=5")),
- new TestCase("https://testweb.com/me/out?it=5",
- Arrays.asList("https://192.168.3.1/me/out?it=5", "https://192.168.3.2/me/out?it=5",
- "https://192.168.3.3/me/out?it=5")),
- new TestCase("https://testweb.com:20000",
- Arrays.asList("https://192.168.3.1:20000", "https://192.168.3.2:20000", "https://192.168.3.3:20000")),
- new TestCase("https://testweb.com:20000/",
- Arrays.asList("https://192.168.3.1:20000/", "https://192.168.3.2:20000/", "https://192.168.3.3:20000/")),
- new TestCase("https://testweb.com:20000/?",
- Arrays.asList("https://192.168.3.1:20000/?", "https://192.168.3.2:20000/?", "https://192.168.3.3:20000/?")),
- new TestCase("https://testweb.com:20000/?it=5",
- Arrays.asList("https://192.168.3.1:20000/?it=5", "https://192.168.3.2:20000/?it=5",
- "https://192.168.3.3:20000/?it=5")),
- new TestCase("https://testweb.com:20000/me/out?it=5",
- Arrays.asList("https://192.168.3.1:20000/me/out?it=5", "https://192.168.3.2:20000/me/out?it=5",
- "https://192.168.3.3:20000/me/out?it=5")),
- };
+ new TestCase("https://testweb.com",
+ Arrays.asList("https://192.168.3.1", "https://192.168.3.2", "https://192.168.3.3")),
+ new TestCase("https://testweb.com/",
+ Arrays.asList("https://192.168.3.1/", "https://192.168.3.2/", "https://192.168.3.3/")),
+ new TestCase("https://testweb.com/?",
+ Arrays.asList("https://192.168.3.1/?", "https://192.168.3.2/?", "https://192.168.3.3/?")),
+ new TestCase("https://testweb.com/?it=5",
+ Arrays.asList("https://192.168.3.1/?it=5", "https://192.168.3.2/?it=5", "https://192.168.3.3/?it=5")),
+ new TestCase("https://testweb.com/me/out?it=5",
+ Arrays.asList("https://192.168.3.1/me/out?it=5", "https://192.168.3.2/me/out?it=5",
+ "https://192.168.3.3/me/out?it=5")),
+ new TestCase("https://testweb.com:20000",
+ Arrays.asList("https://192.168.3.1:20000", "https://192.168.3.2:20000", "https://192.168.3.3:20000")),
+ new TestCase("https://testweb.com:20000/",
+ Arrays.asList("https://192.168.3.1:20000/", "https://192.168.3.2:20000/", "https://192.168.3.3:20000/")),
+ new TestCase("https://testweb.com:20000/?",
+ Arrays.asList("https://192.168.3.1:20000/?", "https://192.168.3.2:20000/?",
+ "https://192.168.3.3:20000/?")),
+ new TestCase("https://testweb.com:20000/?it=5",
+ Arrays.asList("https://192.168.3.1:20000/?it=5", "https://192.168.3.2:20000/?it=5",
+ "https://192.168.3.3:20000/?it=5")),
+ new TestCase("https://testweb.com:20000/me/out?it=5",
+ Arrays.asList("https://192.168.3.1:20000/me/out?it=5", "https://192.168.3.2:20000/me/out?it=5",
+ "https://192.168.3.3:20000/me/out?it=5")),
+ };
- for (TestCase testCase : testCases) {
- String uri = testCase._originalUri;
- RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(new URI(uri)), true);
- int n = testCase._expectedUris.size();
- int previousIndex = -1;
- int currentIndex;
- for (int i = 0; i < 2 * n; i++) {
- String actualUri = uriProvider.next().toString();
- currentIndex = testCase._expectedUris.indexOf(actualUri);
- Assert.assertTrue(currentIndex != -1);
- if (previousIndex != -1) {
- Assert.assertEquals((previousIndex + 1) % n, currentIndex);
+ for (TestCase testCase : testCases) {
+ String uri = testCase._originalUri;
+ RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(new URI(uri)), true);
+ int n = testCase._expectedUris.size();
+ int previousIndex = -1;
+ int currentIndex;
+ for (int i = 0; i < 2 * n; i++) {
+ String actualUri = uriProvider.next().toString();
+ currentIndex = testCase._expectedUris.indexOf(actualUri);
+ Assert.assertTrue(currentIndex != -1);
+ if (previousIndex != -1) {
+ Assert.assertEquals((previousIndex + 1) % n, currentIndex);
+ }
+ previousIndex = currentIndex;
}
- previousIndex = currentIndex;
}
}
}
diff --git a/pinot-compatibility-verifier/pom.xml b/pinot-compatibility-verifier/pom.xml
index 9aeddb4f4cc6..e57a716edb50 100644
--- a/pinot-compatibility-verifier/pom.xml
+++ b/pinot-compatibility-verifier/pom.xml
@@ -24,7 +24,7 @@
pinot
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-compatibility-verifier
Pinot Compatibility Verifier
diff --git a/pinot-connectors/pinot-flink-connector/pom.xml b/pinot-connectors/pinot-flink-connector/pom.xml
index 66755a424dd0..c29afeb4b0f7 100644
--- a/pinot-connectors/pinot-flink-connector/pom.xml
+++ b/pinot-connectors/pinot-flink-connector/pom.xml
@@ -24,7 +24,7 @@
org.apache.pinot
pinot-connectors
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-flink-connector
Pinot Flink Connector
diff --git a/pinot-connectors/pinot-spark-2-connector/pom.xml b/pinot-connectors/pinot-spark-2-connector/pom.xml
index 5dffba4c2f89..3fef78440616 100644
--- a/pinot-connectors/pinot-spark-2-connector/pom.xml
+++ b/pinot-connectors/pinot-spark-2-connector/pom.xml
@@ -24,7 +24,7 @@
pinot-connectors
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-spark-2-connector
Pinot Spark 2 Connector
diff --git a/pinot-connectors/pinot-spark-3-connector/pom.xml b/pinot-connectors/pinot-spark-3-connector/pom.xml
index 39881b39547a..2f1ce1dec3a3 100644
--- a/pinot-connectors/pinot-spark-3-connector/pom.xml
+++ b/pinot-connectors/pinot-spark-3-connector/pom.xml
@@ -24,7 +24,7 @@
pinot-connectors
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-spark-3-connector
Pinot Spark 3 Connector
diff --git a/pinot-connectors/pinot-spark-common/pom.xml b/pinot-connectors/pinot-spark-common/pom.xml
index 745792d753a0..2f585cfeee62 100644
--- a/pinot-connectors/pinot-spark-common/pom.xml
+++ b/pinot-connectors/pinot-spark-common/pom.xml
@@ -24,7 +24,7 @@
pinot-connectors
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-spark-common
Pinot Spark Common
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index 0a7e0303b6ea..d97cfb24af9b 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -24,7 +24,7 @@
pinot
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-connectors
pom
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 4567ea36d7d4..a2919a549ccc 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -24,7 +24,7 @@
pinot
org.apache.pinot
- 1.3.0-SNAPSHOT
+ 1.4.0-SNAPSHOT
pinot-controller
Pinot Controller
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 342413d3559f..171e8506387a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -91,6 +91,7 @@
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.api.resources.ControllerFilePathProvider;
import org.apache.pinot.controller.api.resources.InvalidControllerConfigException;
+import org.apache.pinot.controller.cursors.ResponseStoreCleaner;
import org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
import org.apache.pinot.controller.helix.SegmentStatusChecker;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -257,7 +258,7 @@ public void init(PinotConfiguration pinotConfiguration)
// This executor service is used to do async tasks from multiget util or table rebalancing.
_executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d");
_tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
- "tenant-rebalance-thread-%d");
+ "tenant-rebalance-thread-%d");
_tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService);
}
@@ -272,7 +273,7 @@ public void init(PinotConfiguration pinotConfiguration)
private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory)
- : Executors.newFixedThreadPool(numThreadPool, threadFactory);
+ : Executors.newFixedThreadPool(numThreadPool, threadFactory);
}
private void inferHostnameIfNeeded(ControllerConf config) {
@@ -577,10 +578,12 @@ protected void configure() {
_helixResourceManager.getAllRealtimeTables().forEach(rt -> {
TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
if (tableConfig != null) {
- Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
+ List