Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing up events aggregation integration test. #20158

Merged
merged 4 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,64 @@
import org.graylog.testing.containermatrix.annotations.ContainerMatrixTest;
import org.graylog.testing.containermatrix.annotations.ContainerMatrixTestsConfiguration;
import org.graylog2.plugin.streams.StreamRuleType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.graylog2.plugin.streams.Stream.DEFAULT_STREAM_ID;

@ContainerMatrixTestsConfiguration(serverLifecycle = Lifecycle.CLASS, searchVersions = {SearchServer.ES7, SearchServer.OS2_LATEST, SearchServer.DATANODE_DEV}, withWebhookServerEnabled = true)
public class PivotAggregationSearchIT {

private static final Logger LOG = LoggerFactory.getLogger(PivotAggregationSearchIT.class);
private static final String indexSetPrefix = "pivot-search-test";

private final GraylogApis graylogApis;
private final GraylogApis apis;
private final WebhookServerInstance webhookTester;
private String indexSetId;
private String streamId;
private String isolatedStreamId;

public PivotAggregationSearchIT(GraylogApis apis) {
this.apis = apis;
this.webhookTester = apis.backend().getWebhookServerInstance().orElseThrow(() -> new IllegalStateException("Webhook tester not found!"));
}

public PivotAggregationSearchIT(GraylogApis graylogApis) {
this.graylogApis = graylogApis;
this.webhookTester = graylogApis.backend().getWebhookServerInstance().orElseThrow(() -> new IllegalStateException("Webhook tester not found!"));
@BeforeEach
void setUp() {
this.indexSetId = apis.indices().createIndexSet("Pivot Aggregation Search Test", "", indexSetPrefix);
apis.indices().waitFor(() -> apis.backend().searchServerInstance().client().indicesExists(indexSetPrefix + "_0", indexSetPrefix + "_deflector"),
"Timed out waiting for index/deflector to be created.");
this.streamId = apis.streams().createStream(
"Stream for Pivot Aggregation Search Test",
this.indexSetId,
true,
DefaultStreamMatches.REMOVE,
new Streams.StreamRule(StreamRuleType.EXACT.toInteger(), "example.org", "source", false)
);
}

@AfterEach
void tearDown() {
apis.streams().deleteStream(this.streamId);
if (this.isolatedStreamId != null) {
apis.streams().deleteStream(this.isolatedStreamId);
this.isolatedStreamId = null;
}
apis.indices().deleteIndexSet(this.indexSetId, true);
apis.indices().waitFor(() -> !apis.backend().searchServerInstance().client().indicesExists(indexSetPrefix + "_0")
&& !apis.backend().searchServerInstance().client().indicesExists(indexSetPrefix + "_deflector"),
"Timed out waiting for index/deflector to be deleted.");
}

@ContainerMatrixTest
void testPivotAggregationSearchAllKnownFields() throws ExecutionException, RetryException {
graylogApis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());
apis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());

final String notificationID = graylogApis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());
final String notificationID = apis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());

final String eventDefinitionID = graylogApis.eventDefinitions().createEventDefinition(notificationID, List.of(
final String eventDefinitionID = apis.eventDefinitions().createEventDefinition(notificationID, List.of(
"http_response_code",
"type"
));
Expand All @@ -66,17 +94,17 @@ void testPivotAggregationSearchAllKnownFields() throws ExecutionException, Retry

waitForWebHook(eventDefinitionID, "my alert def: 200|ssh - count()=3.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
apis.eventsNotifications().deleteNotification(notificationID);
apis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

@ContainerMatrixTest
void testPivotAggregationSearchOneUnknownField() throws ExecutionException, RetryException {
graylogApis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());
apis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());

final String notificationID = graylogApis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());
final String notificationID = apis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());

final String eventDefinitionID = graylogApis.eventDefinitions().createEventDefinition(notificationID, List.of(
final String eventDefinitionID = apis.eventDefinitions().createEventDefinition(notificationID, List.of(
"http_response_code",
"unknown_field",
"type"
Expand All @@ -86,17 +114,17 @@ void testPivotAggregationSearchOneUnknownField() throws ExecutionException, Retr

waitForWebHook(eventDefinitionID, "my alert def: 200|(Empty Value)|ssh - count()=3.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
apis.eventsNotifications().deleteNotification(notificationID);
apis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

@ContainerMatrixTest
void testPivotAggregationSearchAllUnknownFields() throws ExecutionException, RetryException {
graylogApis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());
apis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());

final String notificationID = graylogApis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());
final String notificationID = apis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());

final String eventDefinitionID = graylogApis.eventDefinitions().createEventDefinition(notificationID, List.of(
final String eventDefinitionID = apis.eventDefinitions().createEventDefinition(notificationID, List.of(
"unknown_field_1",
"unknown_field_2",
"unknown_field_3"
Expand All @@ -106,34 +134,58 @@ void testPivotAggregationSearchAllUnknownFields() throws ExecutionException, Ret

waitForWebHook(eventDefinitionID, "my alert def: (Empty Value)|(Empty Value)|(Empty Value) - count()=3.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
apis.eventsNotifications().deleteNotification(notificationID);
apis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

@ContainerMatrixTest
void testPivotAggregationIsolatedToStream() throws ExecutionException, RetryException {
graylogApis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());
apis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());

final String notificationID = apis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());

this.isolatedStreamId = apis.streams().createStream(
"Stream for testing event definition isolation",
this.indexSetId,
true,
DefaultStreamMatches.REMOVE,
new Streams.StreamRule(StreamRuleType.EXACT.toInteger(), "stream_isolation_test", "facility", false)
);

final String eventDefinitionID = apis.eventDefinitions().createEventDefinition(notificationID, List.of(), List.of(isolatedStreamId));

postMessagesToOtherStream();
postMessages();

final String notificationID = graylogApis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());
waitForWebHook(eventDefinitionID, "my alert def: count()=1.0");

final String defaultStreamIndexSetId = graylogApis.streams().getStream(DEFAULT_STREAM_ID).extract().path("index_set_id");
final var streamId = graylogApis.streams().createStream(
apis.eventsNotifications().deleteNotification(notificationID);
apis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

@ContainerMatrixTest
void testPivotAggregationWithGroupingIsIsolatedToStream() throws ExecutionException, RetryException {
apis.system().urlWhitelist(webhookTester.getContainerizedCollectorURI());

final String notificationID = apis.eventsNotifications().createHttpNotification(webhookTester.getContainerizedCollectorURI());

this.isolatedStreamId = apis.streams().createStream(
"Stream for testing event definition isolation",
defaultStreamIndexSetId,
this.indexSetId,
true,
DefaultStreamMatches.KEEP,
new Streams.StreamRule(StreamRuleType.EXACT.toInteger(), "stream_isolation_test", "facility", true)
DefaultStreamMatches.REMOVE,
new Streams.StreamRule(StreamRuleType.EXACT.toInteger(), "stream_isolation_test", "facility", false)
);

final String eventDefinitionID = graylogApis.eventDefinitions().createEventDefinition(notificationID, List.of("http_response_code"), List.of(streamId));
final String eventDefinitionID = apis.eventDefinitions().createEventDefinition(notificationID, List.of("http_response_code"), List.of(isolatedStreamId));

postMessagesToOtherStream();
postMessages();

waitForWebHook(eventDefinitionID, "my alert def: 200 - count()=3.0");
waitForWebHook(eventDefinitionID, "my alert def: 500 - count()=1.0");

graylogApis.eventsNotifications().deleteNotification(notificationID);
graylogApis.eventDefinitions().deleteDefinition(eventDefinitionID);
apis.eventsNotifications().deleteNotification(notificationID);
apis.eventDefinitions().deleteDefinition(eventDefinitionID);
}

private void waitForWebHook(String eventDefinitionID, String eventMessage) throws ExecutionException, RetryException {
Expand All @@ -147,13 +199,13 @@ private void waitForWebHook(String eventDefinitionID, String eventMessage) throw
});

} catch (ExecutionException | RetryException e) {
LOG.error(this.graylogApis.backend().getLogs());
LOG.error(this.apis.backend().getLogs());
throw e;
}
}

private void postMessages() {
graylogApis.gelf().createGelfHttpInput()
apis.gelf().createGelfHttpInput()
.postMessage("""
{
"short_message":"pivot-aggregation-search-test-1",
Expand Down Expand Up @@ -181,21 +233,23 @@ private void postMessages() {
"http_response_code":200,
"resource": "posts"
}""");
graylogApis.search().waitForMessagesCount(3);
apis.search().waitForMessagesCount(3);
apis.backend().searchServerInstance().client().refreshNode();
}

private void postMessagesToOtherStream() {
graylogApis.gelf().createGelfHttpInput()
apis.gelf().createGelfHttpInput()
.postMessage("""
{
"short_message":"pivot-aggregation-search-test-1",
"host":"example.org",
"type":"ssh",
"source":"example.org",
"http_response_code":200,
"http_response_code":500,
"resource": "posts",
"facility": "stream_isolation_test"
}""");
graylogApis.search().waitForMessagesCount(1);
apis.search().waitForMessagesCount(1);
apis.backend().searchServerInstance().client().refreshNode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,18 @@
package org.graylog.plugins.views.aggregations;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import org.graylog.testing.completebackend.apis.DefaultStreamMatches;
import org.graylog.testing.completebackend.apis.GraylogApiResponse;
import org.graylog.testing.completebackend.apis.GraylogApis;
import org.graylog.testing.completebackend.apis.Streams;
import org.graylog.testing.completebackend.apis.inputs.PortBoundGelfInputApi;
import org.graylog.testing.containermatrix.MongodbServer;
import org.graylog.testing.containermatrix.annotations.ContainerMatrixTest;
import org.graylog.testing.containermatrix.annotations.ContainerMatrixTestsConfiguration;
import org.graylog2.plugin.streams.StreamRuleType;
import org.junit.jupiter.api.BeforeEach;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.graylog.testing.containermatrix.SearchServer.ES7;
import static org.graylog.testing.containermatrix.SearchServer.OS1;
Expand All @@ -58,8 +53,8 @@ void setUp() throws ExecutionException, RetryException {
final String streamA = api.streams().createStream("Stream A", indexSetA, DefaultStreamMatches.REMOVE, new Streams.StreamRule(StreamRuleType.EXACT.toInteger(), "streamA", "target_stream", false));
final String streamB = api.streams().createStream("Stream B", indexSetB, DefaultStreamMatches.REMOVE, new Streams.StreamRule(StreamRuleType.EXACT.toInteger(), "streamB", "target_stream", false));

final List<String> indexNamesA = waitForIndexNames(indexSetA);
final List<String> indexNamesB = waitForIndexNames(indexSetB);
final List<String> indexNamesA = api.indices().waitForIndexNames(indexSetA);
final List<String> indexNamesB = api.indices().waitForIndexNames(indexSetB);

final String indexA = indexNamesA.iterator().next();
final String indexB = indexNamesB.iterator().next();
Expand All @@ -78,15 +73,6 @@ void setUp() throws ExecutionException, RetryException {
api.search().waitForMessages("compound-field-test-a", "compound-field-test-b");
}

private List<String> waitForIndexNames(String indexSetName) throws ExecutionException, RetryException {
return RetryerBuilder.<List<String>>newBuilder()
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(30))
.retryIfResult(List::isEmpty)
.build()
.call(() -> api.indices().listOpenIndices(indexSetName).properJSONPath().read("indices.*.index_name"));
}

@ContainerMatrixTest
void aggregate() {
final GraylogApiResponse responseAsc =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.graylog.testing.completebackend.apis;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import io.restassured.response.ValidatableResponse;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategyConfig;
import org.graylog2.indexer.rotation.strategies.TimeBasedRotationStrategyConfig;
Expand All @@ -25,6 +29,9 @@

import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -107,6 +114,15 @@ public GraylogApiResponse listOpenIndices(String indexSetId) {
return new GraylogApiResponse(response);
}

public List<String> waitForIndexNames(String indexSetId) throws ExecutionException, RetryException {
return RetryerBuilder.<List<String>>newBuilder()
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(30))
.retryIfResult(List::isEmpty)
.build()
.call(() -> listOpenIndices(indexSetId).properJSONPath().read("indices.*.index_name"));
}

public void rotateIndexSet(String indexSetId) {
given()
.spec(api.requestSpecification())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public String createStream(String title, String indexSetId, DefaultStreamMatches
return waitForStreamRouterRefresh(() -> createStream(title, indexSetId, true, defaultStreamMatches, streamRules));
}


public String createStream(String title, String indexSetId, boolean started, DefaultStreamMatches defaultStreamMatches, StreamRule... streamRules) {
final CreateStreamRequest body = new CreateStreamRequest(title, List.of(streamRules), indexSetId, defaultStreamMatches == DefaultStreamMatches.REMOVE);
final String streamId = given()
Expand All @@ -95,6 +94,16 @@ public String createStream(String title, String indexSetId, boolean started, Def
return streamId;
}

public void deleteStream(String streamId) {
waitForStreamRouterRefresh(() -> given()
.spec(api.requestSpecification())
.when()
.delete("/streams/" + streamId)
.then()
.log().ifError()
.statusCode(204));
}

public ValidatableResponse getStream(String streamId) {
return given()
.spec(api.requestSpecification())
Expand Down
Loading