diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 27bbb966a..bf88b2b13 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -7,9 +7,9 @@ package org.opensearch.alerting import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.launch -import kotlinx.coroutines.newSingleThreadContext -import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -132,18 +132,29 @@ class InputService( val responseMap = mutableMapOf>() client.threadPool().threadContext.stashContext().use { scope.launch { - val singleThreadContext = newSingleThreadContext("ClusterMetricsInputThread") - withContext(singleThreadContext) { - input.clusters.forEach { cluster -> + val deferredResults = input.clusters.map { cluster -> + async { val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) val response = executeTransportAction(input, targetClient) // Not all supported API reference the cluster name in their response. // Mapping each response to the cluster name before adding to results. // Not adding this same logic for local-only monitors to avoid breaking existing monitors. - responseMap[cluster] = response.toMap() + cluster to response.toMap() } - results += responseMap } + + val awaitedResults = deferredResults.awaitAll() + responseMap.putAll(awaitedResults) + +// input.clusters.forEach { cluster -> +// val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) +// val response = executeTransportAction(input, targetClient) +// // Not all supported API reference the cluster name in their response. +// // Mapping each response to the cluster name before adding to results. +// // Not adding this same logic for local-only monitors to avoid breaking existing monitors. +// responseMap[cluster] = response.toMap() +// } + results += responseMap } } // todo hurneyt delete?