Skip to content

Commit

Permalink
add routing policy in query errmsg upon unavailable segments for a bi…
Browse files Browse the repository at this point in the history
…t more context (#13706)

* add routing policy in query errmsg upon unavailable segments for a bit more context
  • Loading branch information
klsince authored Aug 8, 2024
1 parent d5c4f1c commit a9aad2d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
Expand Down Expand Up @@ -679,6 +680,9 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
} else {
errorMessage = String.format("%d segments unavailable: %s", numUnavailableSegments, unavailableSegments);
}
String realtimeRoutingPolicy = realtimeBrokerRequest != null ? getRoutingPolicy(realtimeTableConfig) : null;
String offlineRoutingPolicy = offlineBrokerRequest != null ? getRoutingPolicy(offlineTableConfig) : null;
errorMessage = addRoutingPolicyInErrMsg(errorMessage, realtimeRoutingPolicy, offlineRoutingPolicy);
exceptions.add(QueryException.getException(QueryException.BROKER_SEGMENT_UNAVAILABLE_ERROR, errorMessage));
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_UNAVAILABLE_SEGMENTS, 1);
}
Expand Down Expand Up @@ -836,6 +840,31 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
}
}

@VisibleForTesting
static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy,
String offlineRoutingPolicy) {
if (realtimeRoutingPolicy != null && offlineRoutingPolicy != null) {
return errorMessage + ", with routing policy: " + realtimeRoutingPolicy + " [realtime], " + offlineRoutingPolicy
+ " [offline]";
}
if (realtimeRoutingPolicy != null) {
return errorMessage + ", with routing policy: " + realtimeRoutingPolicy + " [realtime]";
}
if (offlineRoutingPolicy != null) {
return errorMessage + ", with routing policy: " + offlineRoutingPolicy + " [offline]";
}
return errorMessage;
}

private static String getRoutingPolicy(TableConfig tableConfig) {
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
if (routingConfig == null) {
return RoutingConfig.DEFAULT_INSTANCE_SELECTOR_TYPE;
}
String selectorType = routingConfig.getInstanceSelectorType();
return selectorType != null ? selectorType : RoutingConfig.DEFAULT_INSTANCE_SELECTOR_TYPE;
}

private BrokerResponseNative getEmptyBrokerOnlyResponse(PinotQuery pinotQuery, RequestContext requestContext,
String tableName, @Nullable RequesterIdentity requesterIdentity) {
if (pinotQuery.isExplain()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,15 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
Assert.assertEquals(servers.iterator().next().getAdminEndpoint(), "http://server01:8097");
latch.countDown();
}

@Test
public void testAddRoutingPolicyInErrMsg() {
Assert.assertEquals(BaseSingleStageBrokerRequestHandler.addRoutingPolicyInErrMsg("error1", null, null), "error1");
Assert.assertEquals(BaseSingleStageBrokerRequestHandler.addRoutingPolicyInErrMsg("error1", "rt_rp", null),
"error1, with routing policy: rt_rp [realtime]");
Assert.assertEquals(BaseSingleStageBrokerRequestHandler.addRoutingPolicyInErrMsg("error1", null, "off_rp"),
"error1, with routing policy: off_rp [offline]");
Assert.assertEquals(BaseSingleStageBrokerRequestHandler.addRoutingPolicyInErrMsg("error1", "rt_rp", "off_rp"),
"error1, with routing policy: rt_rp [realtime], off_rp [offline]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class RoutingConfig extends BaseJsonConfig {
public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition";
public static final String TIME_SEGMENT_PRUNER_TYPE = "time";
public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
public static final String DEFAULT_INSTANCE_SELECTOR_TYPE = "balanced";
public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "replicaGroup";
public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "strictReplicaGroup";
public static final String MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE = "multiStageReplicaGroup";
Expand Down

0 comments on commit a9aad2d

Please sign in to comment.