Skip to content

Commit

Permalink
Troubleshooting.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Feb 5, 2024
1 parent 3871306 commit d8f855a
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -132,18 +132,29 @@ class InputService(
val responseMap = mutableMapOf<String, Map<String, Any>>()
client.threadPool().threadContext.stashContext().use {
scope.launch {
val singleThreadContext = newSingleThreadContext("ClusterMetricsInputThread")
withContext(singleThreadContext) {
input.clusters.forEach { cluster ->
val deferredResults = input.clusters.map { cluster ->
async {
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
cluster to response.toMap()
}
results += responseMap
}

val awaitedResults = deferredResults.awaitAll()
responseMap.putAll(awaitedResults)

// input.clusters.forEach { cluster ->
// val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
// val response = executeTransportAction(input, targetClient)
// // Not all supported API reference the cluster name in their response.
// // Mapping each response to the cluster name before adding to results.
// // Not adding this same logic for local-only monitors to avoid breaking existing monitors.
// responseMap[cluster] = response.toMap()
// }
results += responseMap
}
}
// todo hurneyt delete?
Expand Down

0 comments on commit d8f855a

Please sign in to comment.