From 8f37a7755c0e96c7e8e436cd2d778a621b5f91b8 Mon Sep 17 00:00:00 2001 From: Riya Saxena Date: Mon, 16 Dec 2024 21:45:26 -0800 Subject: [PATCH 1/6] shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena --- .../alerting/DocumentLevelMonitorRunner.kt | 14 ++++- .../alerting/DocumentMonitorRunnerIT.kt | 59 +++++++++++++++++++ .../org/opensearch/alerting/TestHelpers.kt | 20 +++++++ .../SampleRemoteMonitorRestHandler.java | 9 ++- 4 files changed, 98 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 16272fd99..15ffeb57a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -239,7 +239,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() { shards.remove("index") shards.remove("shards_count") - val nodeMap = getNodes(monitorCtx) + /** + * if fanout flag is disabled and force assign all shards to local node + * thus effectively making the fan-out a single node operation. + * This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules + **/ + val localNode = monitorCtx.clusterService!!.localNode() + val nodeMap: Map = if (monitor.fanoutEnabled == true) { + getNodes(monitorCtx) + } else { + logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}") + mapOf(localNode.id to localNode) + } + val nodeShardAssignments = distributeShards( monitorCtx, nodeMap.keys.toList(), diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 769c20ead..a769fc7af 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2750,6 +2750,65 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { deleteDataStream(aliasName) } + fun `test document-level monitor fanout disabled approach when aliases contain indices with multiple shards`() { + val aliasName = "test-alias" + createIndexAlias( + aliasName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + "\"index.number_of_shards\": 7" + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES), + fanoutEnabled = false + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(aliasName, "1", testDoc) + indexDoc(aliasName, "2", testDoc) + indexDoc(aliasName, "4", testDoc) + indexDoc(aliasName, "5", testDoc) + indexDoc(aliasName, "6", testDoc) + indexDoc(aliasName, "7", testDoc) + OpenSearchTestCase.waitUntil( + { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES + ) + + rolloverDatastream(aliasName) + indexDoc(aliasName, "11", testDoc) + indexDoc(aliasName, "12", testDoc) + indexDoc(aliasName, "14", testDoc) + indexDoc(aliasName, "15", testDoc) + indexDoc(aliasName, "16", testDoc) + indexDoc(aliasName, "17", testDoc) + OpenSearchTestCase.waitUntil( + { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES + ) + + deleteDataStream(aliasName) + } + fun `test execute monitor generates alerts and findings with renewable locks`() { val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 2330974f4..77cf6a538 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -227,6 +227,26 @@ fun randomDocumentLevelMonitor( ) } +fun randomDocumentLevelMonitor( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + inputs: List = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false, + fanoutEnabled: Boolean? = true, +): Monitor { + return Monitor( + name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), + fanoutEnabled = fanoutEnabled + ) +} + fun randomWorkflow( id: String = Workflow.NO_ID, monitorIds: List, diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 085a8db80..f03dbb017 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -96,7 +96,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient new DataSources(), false, false, - "sample-remote-monitor-plugin" + "sample-remote-monitor-plugin", + true ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( Monitor.NO_ID, @@ -158,7 +159,8 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin" + "sample-remote-monitor-plugin", + true ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( Monitor.NO_ID, @@ -243,7 +245,8 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin" + "sample-remote-monitor-plugin", + true ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest( Monitor.NO_ID, From 96e415b202a1c88b5886140a11c1cc4262220eaa Mon Sep 17 00:00:00 2001 From: Riya Saxena Date: Mon, 16 Dec 2024 21:56:42 -0800 Subject: [PATCH 2/6] shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena --- .../org/opensearch/alerting/DocumentLevelMonitorRunner.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 15ffeb57a..c00d1ac21 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -243,7 +243,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * if fanout flag is disabled and force assign all shards to local node * thus effectively making the fan-out a single node operation. * This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules - **/ + **/ val localNode = monitorCtx.clusterService!!.localNode() val nodeMap: Map = if (monitor.fanoutEnabled == true) { getNodes(monitorCtx) From c6f985a9913e1abaaa48ed54dc7d323630da8338 Mon Sep 17 00:00:00 2001 From: Riya Saxena Date: Tue, 17 Dec 2024 09:48:10 -0800 Subject: [PATCH 3/6] shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena --- .../org/opensearch/alerting/DocumentLevelMonitorRunner.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index c00d1ac21..5d5f8dbb4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -245,7 +245,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules **/ val localNode = monitorCtx.clusterService!!.localNode() - val nodeMap: Map = if (monitor.fanoutEnabled == true) { + val nodeMap: Map = if (monitor?.fanoutEnabled == true) { getNodes(monitorCtx) } else { logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}") From 779afc3ef9c5d19bd5fee18cad32f0ad3d5a4427 Mon Sep 17 00:00:00 2001 From: Riya Saxena Date: Tue, 17 Dec 2024 11:27:44 -0800 Subject: [PATCH 4/6] shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena --- .../alerting/DocumentLevelMonitorRunner.kt | 2 +- .../alerting/DocumentMonitorRunnerIT.kt | 5 ++--- .../org/opensearch/alerting/TestHelpers.kt | 20 ------------------- .../SampleRemoteMonitorRestHandler.java | 13 +++++------- 4 files changed, 8 insertions(+), 32 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5d5f8dbb4..903f2ecc7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -245,7 +245,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules **/ val localNode = monitorCtx.clusterService!!.localNode() - val nodeMap: Map = if (monitor?.fanoutEnabled == true) { + val nodeMap: Map = if (docLevelMonitorInput?.fanoutEnabled == true) { getNodes(monitorCtx) } else { logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index a769fc7af..f6ef9309a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2765,7 +2765,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") - val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery)) + val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery), false) val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) val monitor = createMonitor( @@ -2773,8 +2773,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { inputs = listOf(docLevelInput), triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))), enabled = true, - schedule = IntervalSchedule(1, ChronoUnit.MINUTES), - fanoutEnabled = false + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) ) ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 77cf6a538..2330974f4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -227,26 +227,6 @@ fun randomDocumentLevelMonitor( ) } -fun randomDocumentLevelMonitor( - name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - user: User? = randomUser(), - inputs: List = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())), - schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), - enabled: Boolean = randomBoolean(), - triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, - enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, - lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), - withMetadata: Boolean = false, - fanoutEnabled: Boolean? = true, -): Monitor { - return Monitor( - name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, - schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), - fanoutEnabled = fanoutEnabled - ) -} - fun randomWorkflow( id: String = Workflow.NO_ID, monitorIds: List, diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index f03dbb017..91c97a636 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -96,8 +96,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient new DataSources(), false, false, - "sample-remote-monitor-plugin", - true + "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( Monitor.NO_ID, @@ -137,7 +136,7 @@ public void onFailure(Exception e) { }; } else if (runMonitorParam.equals("multiple")) { SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello", - new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())))); + new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())), true)); BytesStreamOutput out1 = new BytesStreamOutput(); input2.writeTo(out1); BytesReference input1Serialized1 = out1.bytes(); @@ -159,8 +158,7 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin", - true + "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( Monitor.NO_ID, @@ -222,7 +220,7 @@ public void onFailure(Exception e) { sampleRemoteDocLevelMonitorInput.writeTo(out2); BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes(); - DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList()); + DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList(), true); RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput); Monitor remoteDocLevelMonitor = new Monitor( @@ -245,8 +243,7 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin", - true + "sample-remote-monitor-plugin" ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest( Monitor.NO_ID, From 0fce04411bcba9891e094a18fa7e5ef5513ff05e Mon Sep 17 00:00:00 2001 From: Riya Saxena Date: Tue, 17 Dec 2024 12:12:50 -0800 Subject: [PATCH 5/6] tests fix Signed-off-by: Riya Saxena --- .../kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index f6ef9309a..53bb54a53 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2791,7 +2791,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(aliasName, "6", testDoc) indexDoc(aliasName, "7", testDoc) OpenSearchTestCase.waitUntil( - { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES + { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, 2, TimeUnit.MINUTES ) rolloverDatastream(aliasName) @@ -2802,7 +2802,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(aliasName, "16", testDoc) indexDoc(aliasName, "17", testDoc) OpenSearchTestCase.waitUntil( - { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES + { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, 2, TimeUnit.MINUTES ) deleteDataStream(aliasName) From c9e50b551e0e8cd30cfd8c270d4b8bf6d7bc4b80 Mon Sep 17 00:00:00 2001 From: Riya Saxena Date: Tue, 17 Dec 2024 12:18:17 -0800 Subject: [PATCH 6/6] tests fix Signed-off-by: Riya Saxena --- .../org/opensearch/alerting/DocumentMonitorRunnerIT.kt | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 53bb54a53..0c32ecf0b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2791,7 +2791,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(aliasName, "6", testDoc) indexDoc(aliasName, "7", testDoc) OpenSearchTestCase.waitUntil( - { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, 2, TimeUnit.MINUTES + { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, + 2, + TimeUnit.MINUTES ) rolloverDatastream(aliasName) @@ -2802,7 +2804,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(aliasName, "16", testDoc) indexDoc(aliasName, "17", testDoc) OpenSearchTestCase.waitUntil( - { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, 2, TimeUnit.MINUTES + { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, + 2, + TimeUnit.MINUTES ) deleteDataStream(aliasName)