Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore rate limit metric distinction #5658

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,11 @@ impl From<IngestFailure> for IngestServiceError {
IngestFailureReason::NoShardsAvailable => {
IngestServiceError::Unavailable("no shards available".to_string())
}
IngestFailureReason::ShardRateLimited => {
IngestServiceError::RateLimited(RateLimitingCause::ShardRateLimiting)
IngestFailureReason::AttemptedShardsRateLimited => {
IngestServiceError::RateLimited(RateLimitingCause::AttemptedShardsRateLimited)
}
IngestFailureReason::AllShardsRateLimited => {
IngestServiceError::RateLimited(RateLimitingCause::AllShardsRateLimited)
}
IngestFailureReason::WalFull => {
IngestServiceError::RateLimited(RateLimitingCause::WalFull)
Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub(crate) struct IngestResultMetrics {
pub source_not_found: IntCounter,
pub internal: IntCounter,
pub no_shards_available: IntCounter,
pub shard_rate_limited: IntCounter,
pub attempted_shards_rate_limited: IntCounter,
pub all_shards_rate_limited: IntCounter,
pub wal_full: IntCounter,
pub timeout: IntCounter,
pub router_timeout: IntCounter,
Expand All @@ -58,7 +59,10 @@ impl Default for IngestResultMetrics {
source_not_found: ingest_result_total_vec.with_label_values(["source_not_found"]),
internal: ingest_result_total_vec.with_label_values(["internal"]),
no_shards_available: ingest_result_total_vec.with_label_values(["no_shards_available"]),
shard_rate_limited: ingest_result_total_vec.with_label_values(["shard_rate_limited"]),
attempted_shards_rate_limited: ingest_result_total_vec
.with_label_values(["attempted_shards_rate_limited"]),
all_shards_rate_limited: ingest_result_total_vec
.with_label_values(["all_shards_rate_limited"]),
wal_full: ingest_result_total_vec.with_label_values(["wal_full"]),
timeout: ingest_result_total_vec.with_label_values(["timeout"]),
router_timeout: ingest_result_total_vec.with_label_values(["router_timeout"]),
Expand Down
118 changes: 107 additions & 11 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ impl IngestRouter {

// Subrequests for which no shards are available to route the subrequests to.
let mut no_shards_available_subrequest_ids: Vec<SubrequestId> = Vec::new();
// Subrequests for which the shards are rate limited.
let mut rate_limited_subrequest_ids: Vec<SubrequestId> = Vec::new();
// Subrequests for which all the shards are rate limited.
let mut all_shards_rate_limited_subrequest_ids: Vec<SubrequestId> = Vec::new();

let mut per_leader_persist_subrequests: HashMap<&LeaderId, Vec<PersistSubrequest>> =
HashMap::new();
Expand All @@ -385,7 +385,7 @@ impl IngestRouter {
let next_open_shard = match next_open_shard_res_opt {
Some(Ok(next_open_shard)) => next_open_shard,
Some(Err(NextOpenShardError::RateLimited)) => {
rate_limited_subrequest_ids.push(subrequest.subrequest_id);
all_shards_rate_limited_subrequest_ids.push(subrequest.subrequest_id);
continue;
}
Some(Err(NextOpenShardError::NoShardsAvailable)) | None => {
Expand Down Expand Up @@ -450,8 +450,8 @@ impl IngestRouter {
for subrequest_id in no_shards_available_subrequest_ids {
workbench.record_no_shards_available(subrequest_id);
}
for subrequest_id in rate_limited_subrequest_ids {
workbench.record_rate_limited(subrequest_id);
for subrequest_id in all_shards_rate_limited_subrequest_ids {
workbench.record_all_shards_rate_limited(subrequest_id);
}
self.process_persist_results(workbench, persist_futures)
.await;
Expand Down Expand Up @@ -538,8 +538,11 @@ fn update_ingest_metrics(ingest_result: &IngestV2Result<IngestResponseV2>, num_s
IngestFailureReason::NoShardsAvailable => {
ingest_results_metrics.no_shards_available.inc()
}
IngestFailureReason::ShardRateLimited => {
ingest_results_metrics.shard_rate_limited.inc()
IngestFailureReason::AttemptedShardsRateLimited => {
ingest_results_metrics.attempted_shards_rate_limited.inc()
}
IngestFailureReason::AllShardsRateLimited => {
ingest_results_metrics.all_shards_rate_limited.inc();
}
IngestFailureReason::WalFull => ingest_results_metrics.wal_full.inc(),
IngestFailureReason::Timeout => ingest_results_metrics.timeout.inc(),
Expand Down Expand Up @@ -568,9 +571,14 @@ fn update_ingest_metrics(ingest_result: &IngestV2Result<IngestResponseV2>, num_s
.circuit_breaker
.inc_by(num_subrequests);
}
RateLimitingCause::ShardRateLimiting => {
RateLimitingCause::AttemptedShardsRateLimited => {
ingest_results_metrics
.shard_rate_limited
.attempted_shards_rate_limited
.inc_by(num_subrequests);
}
RateLimitingCause::AllShardsRateLimited => {
ingest_results_metrics
.all_shards_rate_limited
.inc_by(num_subrequests);
}
RateLimitingCause::Unknown => {
Expand Down Expand Up @@ -2086,7 +2094,7 @@ mod tests {
}

#[tokio::test]
async fn test_router_returns_rate_limited_failure() {
async fn test_router_returns_all_shards_rate_limited_failure() {
let self_node_id = "test-router".into();
let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new());
let ingester_pool = IngesterPool::default();
Expand All @@ -2101,6 +2109,10 @@ mod tests {
let mut state_guard = router.state.lock().await;
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);

// We setup one shard that is rate limited. After 1 retry, it will be
// marked as rate limited, so all shards will be known to be rate
// limited.

state_guard.routing_table.replace_shards(
index_uid.clone(),
"test-source",
Expand Down Expand Up @@ -2163,7 +2175,91 @@ mod tests {
assert_eq!(ingest_response.failures.len(), 1);
assert_eq!(
ingest_response.failures[0].reason(),
IngestFailureReason::ShardRateLimited
IngestFailureReason::AllShardsRateLimited
);
}

#[tokio::test]
async fn test_router_returns_attempted_shards_rate_limited_failure() {
let self_node_id = "test-router".into();
let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new());
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
let router = IngestRouter::new(
self_node_id,
control_plane,
ingester_pool.clone(),
replication_factor,
EventBroker::default(),
);
let mut state_guard = router.state.lock().await;
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);

// We setup many rate limited shards. After the maximum number
// of retries the router will give up.

state_guard.routing_table.replace_shards(
index_uid.clone(),
"test-source",
(1..=100)
.map(|shard_id| Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(shard_id)),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester-0".to_string(),
..Default::default()
})
.collect(),
);

drop(state_guard);

let mut mock_ingester_0 = MockIngesterService::new();
mock_ingester_0.expect_persist().returning(move |request| {
assert_eq!(request.leader_id, "test-ingester-0");
assert_eq!(request.commit_type(), CommitTypeV2::Auto);
assert_eq!(request.subrequests.len(), 1);
let subrequest = &request.subrequests[0];
assert_eq!(subrequest.subrequest_id, 0);
let index_uid = subrequest.index_uid().clone();
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(
subrequest.doc_batch,
Some(DocBatchV2::for_test(["test-doc-foo"]))
);

let response = PersistResponse {
leader_id: request.leader_id,
successes: Vec::new(),
failures: vec![PersistFailure {
subrequest_id: 0,
index_uid: Some(index_uid),
source_id: "test-source".to_string(),
shard_id: Some(subrequest.shard_id().clone()),
reason: PersistFailureReason::ShardRateLimited as i32,
}],
};
Ok(response)
});
let ingester_0 = IngesterServiceClient::from_mock(mock_ingester_0);
ingester_pool.insert("test-ingester-0".into(), ingester_0.clone());

let ingest_request = IngestRequestV2 {
subrequests: vec![IngestSubrequest {
subrequest_id: 0,
index_id: "test-index-0".to_string(),
source_id: "test-source".to_string(),
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
}],
commit_type: CommitTypeV2::Auto as i32,
};
let ingest_response = router.ingest(ingest_request).await.unwrap();
assert_eq!(ingest_response.successes.len(), 0);
assert_eq!(ingest_response.failures.len(), 1);
assert_eq!(
ingest_response.failures[0].reason(),
IngestFailureReason::AttemptedShardsRateLimited
);
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ impl RoutingTableEntry {

#[derive(Debug, PartialEq, Eq)]
pub(super) enum NextOpenShardError {
/// no open shard
NoShardsAvailable,
/// all open shards are rate limited
RateLimited,
}

Expand Down
11 changes: 8 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ impl IngestWorkbench {
self.record_failure(subrequest_id, SubworkbenchFailure::NoShardsAvailable);
}

pub fn record_rate_limited(&mut self, subrequest_id: SubrequestId) {
pub fn record_all_shards_rate_limited(&mut self, subrequest_id: SubrequestId) {
self.record_failure(
subrequest_id,
SubworkbenchFailure::RateLimited(RateLimitingCause::ShardRateLimiting),
SubworkbenchFailure::RateLimited(RateLimitingCause::AllShardsRateLimited),
);
}

Expand Down Expand Up @@ -359,7 +359,12 @@ impl SubworkbenchFailure {
RateLimitingCause::LoadShedding => IngestFailureReason::RouterLoadShedding,
RateLimitingCause::WalFull => IngestFailureReason::WalFull,
RateLimitingCause::CircuitBreaker => IngestFailureReason::CircuitBreaker,
RateLimitingCause::ShardRateLimiting => IngestFailureReason::ShardRateLimited,
RateLimitingCause::AttemptedShardsRateLimited => {
IngestFailureReason::AttemptedShardsRateLimited
}
RateLimitingCause::AllShardsRateLimited => {
IngestFailureReason::AllShardsRateLimited
}
RateLimitingCause::Unknown => IngestFailureReason::Unspecified,
},
Self::Persist(persist_failure_reason) => (*persist_failure_reason).into(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-proto/protos/quickwit/router.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ enum IngestFailureReason {
INGEST_FAILURE_REASON_SOURCE_NOT_FOUND = 2;
INGEST_FAILURE_REASON_INTERNAL = 3;
INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 4;
INGEST_FAILURE_REASON_SHARD_RATE_LIMITED = 5;
INGEST_FAILURE_REASON_ATTEMPTED_SHARDS_RATE_LIMITED = 5;
INGEST_FAILURE_REASON_WAL_FULL = 6;
INGEST_FAILURE_REASON_TIMEOUT = 7;
INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING = 8;
INGEST_FAILURE_REASON_LOAD_SHEDDING = 9;
INGEST_FAILURE_REASON_CIRCUIT_BREAKER = 10;
INGEST_FAILURE_REASON_ALL_SHARDS_RATE_LIMITED = 11;
}

message IngestFailure {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ pub enum RateLimitingCause {
WalFull,
#[error("circuit breaker")]
CircuitBreaker,
#[error("shard rate limiting")]
ShardRateLimiting,
#[error("attempted shards rate limited")]
AttemptedShardsRateLimited,
#[error("all shards rate limited")]
AllShardsRateLimited,
#[error("unknown")]
Unknown,
}
Expand Down Expand Up @@ -312,7 +314,9 @@ impl From<PersistFailureReason> for IngestFailureReason {
PersistFailureReason::ShardNotFound => IngestFailureReason::NoShardsAvailable,
PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable,
PersistFailureReason::WalFull => IngestFailureReason::WalFull,
PersistFailureReason::ShardRateLimited => IngestFailureReason::ShardRateLimited,
PersistFailureReason::ShardRateLimited => {
IngestFailureReason::AttemptedShardsRateLimited
}
PersistFailureReason::Timeout => IngestFailureReason::Timeout,
}
}
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ fn make_elastic_bulk_response_v2(
format!("timeout [{}]", failure.index_id),
StatusCode::REQUEST_TIMEOUT,
),
IngestFailureReason::ShardRateLimited => (
IngestFailureReason::AttemptedShardsRateLimited
| IngestFailureReason::AllShardsRateLimited => (
ElasticException::RateLimited,
format!("shard rate limiting [{}]", failure.index_id),
StatusCode::TOO_MANY_REQUESTS,
Expand Down