diff --git a/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java b/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java
index 9ea23804a3..b642c7ad65 100644
--- a/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java
+++ b/connector-common/src/test/java/com/redhat/cloud/notifications/connector/ConnectorRoutesTest.java
@@ -42,7 +42,7 @@
public abstract class ConnectorRoutesTest extends CamelQuarkusTestSupport {
- private static final String KAFKA_SOURCE_MOCK = "direct:kafka-source-mock";
+ public static final String KAFKA_SOURCE_MOCK = "direct:kafka-source-mock";
@Inject
protected ConnectorConfig connectorConfig;
diff --git a/connector-email/pom.xml b/connector-email/pom.xml
index cb07982dc2..366af3d5ae 100644
--- a/connector-email/pom.xml
+++ b/connector-email/pom.xml
@@ -66,6 +66,12 @@
+
+ io.quarkus
+ quarkus-junit5-mockito
+ test
+
+
org.apache.camel.quarkus
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/CloudEventHistoryBuilder.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/CloudEventHistoryBuilder.java
new file mode 100644
index 0000000000..f5f055b6ca
--- /dev/null
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/CloudEventHistoryBuilder.java
@@ -0,0 +1,31 @@
+package com.redhat.cloud.notifications.connector.email;
+
+import com.redhat.cloud.notifications.connector.http.HttpOutgoingCloudEventBuilder;
+import io.vertx.core.json.JsonObject;
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Alternative;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+@ApplicationScoped
+@Alternative
+@Priority(0) // The value doesn't matter.
+public class CloudEventHistoryBuilder extends HttpOutgoingCloudEventBuilder {
+
+ public static final String TOTAL_RECIPIENTS_KEY = "total_recipients";
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ super.process(exchange);
+ int totalRecipients = exchange.getProperty(TOTAL_RECIPIENTS_KEY, 0, Integer.class);
+
+ Message in = exchange.getIn();
+ JsonObject cloudEvent = new JsonObject(in.getBody(String.class));
+ JsonObject data = new JsonObject(cloudEvent.getString("data"));
+ data.getJsonObject("details").put(TOTAL_RECIPIENTS_KEY, totalRecipients);
+
+ cloudEvent.put("data", data.encode());
+ in.setBody(cloudEvent.encode());
+ }
+}
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java
index d7c5bbef69..304c78fc54 100644
--- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java
@@ -3,14 +3,12 @@
import com.redhat.cloud.notifications.connector.CloudEventDataExtractor;
import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
import com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty;
-import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
-import io.vertx.core.json.JsonArray;
+import com.redhat.cloud.notifications.connector.email.model.EmailNotification;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.camel.Exchange;
-import java.util.List;
import java.util.Set;
import static java.util.stream.Collectors.toSet;
@@ -30,38 +28,21 @@ public class EmailCloudEventDataExtractor extends CloudEventDataExtractor {
@Override
public void extract(final Exchange exchange, final JsonObject cloudEventData) {
- final List recipientSettings = cloudEventData.getJsonArray("recipient_settings")
- .stream()
- .map(JsonObject.class::cast)
- .map(jsonSetting -> jsonSetting.mapTo(RecipientSettings.class))
- .toList();
-
- final Set subscribers = cloudEventData.getJsonArray("subscribers", JsonArray.of())
- .stream()
- .map(String.class::cast)
- .collect(toSet());
-
- final Set unsubscribers = cloudEventData.getJsonArray("unsubscribers", JsonArray.of())
- .stream()
- .map(String.class::cast)
- .collect(toSet());
-
- final JsonObject recipientsAuthorizationCriterion = cloudEventData.getJsonObject("recipients_authorization_criterion");
-
- final Set emails = recipientSettings.stream()
+ EmailNotification emailNotification = cloudEventData.mapTo(EmailNotification.class);
+ final Set emails = emailNotification.recipientSettings().stream()
.filter(settings -> settings.getEmails() != null)
.flatMap(settings -> settings.getEmails().stream())
.collect(toSet());
- exchange.setProperty(ExchangeProperty.RENDERED_BODY, cloudEventData.getString("email_body"));
- exchange.setProperty(ExchangeProperty.RENDERED_SUBJECT, cloudEventData.getString("email_subject"));
- exchange.setProperty(ExchangeProperty.RECIPIENT_SETTINGS, recipientSettings);
- exchange.setProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, cloudEventData.getBoolean("subscribed_by_default"));
- exchange.setProperty(ExchangeProperty.SUBSCRIBERS, subscribers);
- exchange.setProperty(ExchangeProperty.UNSUBSCRIBERS, unsubscribers);
- exchange.setProperty(ExchangeProperty.RECIPIENTS_AUTHORIZATION_CRITERION, recipientsAuthorizationCriterion);
+ exchange.setProperty(ExchangeProperty.RENDERED_BODY, emailNotification.emailBody());
+ exchange.setProperty(ExchangeProperty.RENDERED_SUBJECT, emailNotification.emailSubject());
+ exchange.setProperty(ExchangeProperty.RECIPIENT_SETTINGS, emailNotification.recipientSettings());
+ exchange.setProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, emailNotification.subscribedByDefault());
+ exchange.setProperty(ExchangeProperty.SUBSCRIBERS, emailNotification.subscribers());
+ exchange.setProperty(ExchangeProperty.UNSUBSCRIBERS, emailNotification.unsubscribers());
+ exchange.setProperty(ExchangeProperty.RECIPIENTS_AUTHORIZATION_CRITERION, emailNotification.recipientsAuthorizationCriterion());
exchange.setProperty(ExchangeProperty.EMAIL_RECIPIENTS, emails);
- exchange.setProperty(ExchangeProperty.EMAIL_SENDER, cloudEventData.getString("email_sender"));
+ exchange.setProperty(ExchangeProperty.EMAIL_SENDER, emailNotification.emailSender());
exchange.setProperty(ExchangeProperty.USE_EMAIL_BOP_V1_SSL, emailConnectorConfig.isEnableBopServiceWithSslChecks());
}
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/model/EmailNotification.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/model/EmailNotification.java
new file mode 100644
index 0000000000..10103a3347
--- /dev/null
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/model/EmailNotification.java
@@ -0,0 +1,39 @@
+package com.redhat.cloud.notifications.connector.email.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
+import io.vertx.core.json.JsonObject;
+import java.util.Collection;
+
+/**
+ * Represents the data structure that the email connector is expecting.
+ * @param emailBody the rendered body of the email to be sent.
+ * @param emailSubject the rendered subject of the email to be sent.
+ * @param emailSender the sender that will appear in the email when
+ * the user receives it.
+ * @param orgId the organization ID associated with the
+ * triggered event.
+ * @param recipientSettings the collection of recipient settings extracted
+ * from both the event and the related endpoints
+ * to the event.
+ * @param subscribers the list of usernames who subscribed to the
+ * event type.
+ * @param unsubscribers the list of usernames who unsubscribed from the
+ * event type.
+ * @param subscribedByDefault true if the event type is subscribed by
+ * default.
+ * @param recipientsAuthorizationCriterion forward received authorization criterion.
+ *
+ */
+public record EmailNotification(
+ @JsonProperty("email_body") String emailBody,
+ @JsonProperty("email_subject") String emailSubject,
+ @JsonProperty("email_sender") String emailSender,
+ @JsonProperty("orgId") String orgId,
+ @JsonProperty("recipient_settings") Collection recipientSettings,
+ @JsonProperty("subscribers") Collection subscribers,
+ @JsonProperty("unsubscribers") Collection unsubscribers,
+ @JsonProperty("subscribed_by_default") boolean subscribedByDefault,
+ @JsonProperty("recipients_authorization_criterion") JsonObject recipientsAuthorizationCriterion
+) { }
+
diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java
index 7a3693f59c..3562e23172 100644
--- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java
+++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipients/RecipientsResolverResponseProcessor.java
@@ -16,6 +16,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY;
import static java.util.stream.Collectors.toSet;
@ApplicationScoped
@@ -49,6 +50,7 @@ public void process(final Exchange exchange) throws JsonProcessingException {
emails.removeAll(forbiddenEmail);
}
recipientsList.addAll(emails);
+ exchange.setProperty(TOTAL_RECIPIENTS_KEY, recipientsList.size());
// We have to remove one from the limit, because a default recipient (like noreply@redhat.com) will be automatically added
exchange.setProperty(ExchangeProperty.FILTERED_USERS, partition(recipientsList, emailConnectorConfig.getMaxRecipientsPerEmail() - 1));
diff --git a/connector-email/src/main/resources/application.properties b/connector-email/src/main/resources/application.properties
index 43e19327a7..3f88025f7a 100644
--- a/connector-email/src/main/resources/application.properties
+++ b/connector-email/src/main/resources/application.properties
@@ -11,6 +11,8 @@ notifications.connector.seda.concurrent-consumers=20
# The following value matches the default value of the `connectionsPerRoute` option from the Camel `http` component.
notifications.connector.seda.queue-size=20
notifications.connector.supported-connector-headers=${notifications.connector.name}
+# Re-injections should not be enabled for this connector
+notifications.connector.kafka.maximum-reinjections=0
quarkus.http.port=9003
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java
new file mode 100644
index 0000000000..2f97467c7d
--- /dev/null
+++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java
@@ -0,0 +1,59 @@
+package com.redhat.cloud.notifications.connector.email;
+
+import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
+import com.redhat.cloud.notifications.connector.http.SslTrustAllManager;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.mockito.InjectSpy;
+import jakarta.inject.Inject;
+import org.apache.camel.Endpoint;
+import org.apache.camel.quarkus.test.CamelQuarkusTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.when;
+
+@QuarkusTest
+public class BuildBopEndpointTest extends CamelQuarkusTestSupport {
+
+ @InjectSpy
+ EmailConnectorConfig emailConnectorConfig;
+
+ @Inject
+ EmailRouteBuilder emailRouteBuilder;
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ /**
+ * Disables the route builder to ensure that the Camel Context does not get
+ * started before the routes have been advised. More information is
+ * available at the dkulp's Apache Camel Test documentation page.
+ * @return {@code false} in order to stop the Camel Context from booting
+ * before the routes have been advised.
+ */
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ /**
+ * Tests that the function under test creates the BOP endpoint with the
+ * {@link SslTrustAllManager} class as the SSL context parameters, and that
+ * that class is essentially a NOOP class.
+ * @throws Exception if the endpoint could not be created.
+ */
+ @Test
+ void testBuildBOPEndpoint() throws Exception {
+ String initialBopUrl = emailConnectorConfig.getBopURL();
+ when(emailConnectorConfig.getBopURL()).thenReturn("https://test.com");
+
+ Endpoint bopEndpoint = this.emailRouteBuilder.setUpBOPEndpointV1().resolve(this.context);
+ Assertions.assertEquals(this.emailConnectorConfig.getBopURL(), bopEndpoint.getEndpointBaseUri(), "the base URI of the endpoint is not the same as the one set through the properties");
+
+ final String bopEndpointURI = bopEndpoint.getEndpointUri();
+ Assertions.assertTrue(bopEndpointURI.contains("trustManager%3Dcom.redhat.cloud.notifications.connector.http.SslTrustAllManager"), "the endpoint does not contain a reference to the SslTrustAllManager");
+ Assertions.assertTrue(bopEndpointURI.contains("x509HostnameVerifier=NO_OP"), "the base URI does not contain a mention to the NO_OP hostname verifier");
+ }
+}
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java
index 9169a965bf..17cfb1cc8c 100644
--- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java
+++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderTest.java
@@ -1,53 +1,383 @@
package com.redhat.cloud.notifications.connector.email;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.redhat.cloud.notifications.MockServerLifecycleManager;
import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
-import com.redhat.cloud.notifications.connector.http.SslTrustAllManager;
+import com.redhat.cloud.notifications.connector.email.model.EmailNotification;
+import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
+import com.redhat.cloud.notifications.connector.email.model.settings.User;
+import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
+import io.vertx.core.json.JsonObject;
import jakarta.inject.Inject;
-import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AdviceWithRouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.quarkus.test.CamelQuarkusTestSupport;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockserver.mock.action.ExpectationResponseCallback;
+import org.mockserver.model.HttpRequest;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static com.redhat.cloud.notifications.connector.ConnectorRoutesTest.KAFKA_SOURCE_MOCK;
+import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.CONNECTOR_TO_ENGINE;
+import static com.redhat.cloud.notifications.connector.EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventFilter.X_RH_NOTIFICATIONS_CONNECTOR_HEADER;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventProcessor.CLOUD_EVENT_DATA;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventProcessor.CLOUD_EVENT_ID;
+import static com.redhat.cloud.notifications.connector.IncomingCloudEventProcessor.CLOUD_EVENT_TYPE;
+import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY;
+import static com.redhat.cloud.notifications.connector.email.constants.Routes.SEND_EMAIL_BOP;
+import static com.redhat.cloud.notifications.connector.email.constants.Routes.SPLIT_AND_SEND;
+import static org.apache.camel.builder.AdviceWith.adviceWith;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockserver.model.HttpResponse.response;
@QuarkusTest
+@QuarkusTestResource(TestLifecycleManager.class)
public class EmailRouteBuilderTest extends CamelQuarkusTestSupport {
@Inject
EmailConnectorConfig emailConnectorConfig;
@Inject
- EmailRouteBuilder emailRouteBuilder;
-
- @Override
- public boolean isUseAdviceWith() {
- return true;
- }
+ ObjectMapper objectMapper;
- /**
- * Disables the route builder to ensure that the Camel Context does not get
- * started before the routes have been advised. More information is
- * available at the dkulp's Apache Camel Test documentation page.
- * @return {@code false} in order to stop the Camel Context from booting
- * before the routes have been advised.
- */
@Override
public boolean isUseRouteBuilder() {
return false;
}
- /**
- * Tests that the function under test creates the BOP endpoint with the
- * {@link SslTrustAllManager} class as the SSL context parameters, and that
- * that class is essentially a NOOP class.
- * @throws Exception if the endpoint could not be created.
- */
+ static boolean camelRoutesInitialised = false;
+
+ static MockEndpoint splitRoute;
+ static MockEndpoint bopRoute;
+ static MockEndpoint kafkaConnectorToEngine;
+ static MockEndpoint kafkaEngineToConnector;
+
+ void initCamelRoutes() throws Exception {
+
+ adviceWith(emailConnectorConfig.getConnectorName(), context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpoints(
+ "direct:" + CONNECTOR_TO_ENGINE,
+ "direct:" + SPLIT_AND_SEND
+ );
+ mockEndpointsAndSkip("kafka:" + emailConnectorConfig.getOutgoingKafkaTopic());
+ }
+ });
+
+ adviceWith(CONNECTOR_TO_ENGINE, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpointsAndSkip("kafka:" + emailConnectorConfig.getOutgoingKafkaTopic());
+ }
+ });
+
+ adviceWith(SPLIT_AND_SEND, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ mockEndpoints("direct:" + SEND_EMAIL_BOP);
+ }
+ });
+
+ adviceWith(ENGINE_TO_CONNECTOR, context(), new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() {
+ replaceFromWith(KAFKA_SOURCE_MOCK);
+ }
+ });
+
+ splitRoute = getMockEndpoint("mock:direct:" + SPLIT_AND_SEND);
+ bopRoute = getMockEndpoint("mock:direct:" + SEND_EMAIL_BOP);
+ kafkaConnectorToEngine = getMockEndpoint("mock:kafka:" + emailConnectorConfig.getOutgoingKafkaTopic());
+ kafkaEngineToConnector = getMockEndpoint("mock:" + KAFKA_SOURCE_MOCK);
+ }
+
+ void initMocks(ExpectationResponseCallback verifyEmptyRequest, ExpectationResponseCallback bopResponse) throws Exception {
+
+ MockServerLifecycleManager.getClient().reset();
+ getMockHttpRequest("/internal/recipients-resolver", "PUT", verifyEmptyRequest);
+ getMockHttpRequest("/v1/sendEmails", "POST", bopResponse);
+ if (!camelRoutesInitialised) {
+ initCamelRoutes();
+ camelRoutesInitialised = true;
+ }
+ }
+
+ @Test
+ void testEmptyRecipients() throws Exception {
+
+ ExpectationResponseCallback recipientsResolverResponse = req -> response().withBody("[]").withStatusCode(200);
+ ExpectationResponseCallback bopResponse = req -> response().withStatusCode(200);
+ initMocks(recipientsResolverResponse, bopResponse);
+
+ splitRoute.expectedMessageCount(0);
+ kafkaConnectorToEngine.expectedMessageCount(1);
+
+ buildCloudEventAndSendIt(null);
+
+ splitRoute.assertIsSatisfied();
+ kafkaConnectorToEngine.assertIsSatisfied();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWithRecipients(boolean emailsInternalOnlyEnabled) throws Exception {
+ try {
+ emailConnectorConfig.setEmailsInternalOnlyEnabled(emailsInternalOnlyEnabled);
+ Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7");
+ String strUsers = objectMapper.writeValueAsString(users);
+ ExpectationResponseCallback recipientsResolverResponse = req -> response().withBody(strUsers).withStatusCode(200);
+ ExpectationResponseCallback bopResponse = req -> response().withStatusCode(200);
+ initMocks(recipientsResolverResponse, bopResponse);
+
+ Set additionalEmails = Set.of("redhat_user@redhat.com", "external_user@noway.com");
+ int usersAndRecipientsTotalNumber = users.size() + additionalEmails.size();
+
+ splitRoute.expectedMessageCount(1);
+ bopRoute.expectedMessageCount(3);
+ kafkaConnectorToEngine.expectedMessageCount(1);
+
+ buildCloudEventAndSendIt(additionalEmails);
+
+ splitRoute.assertIsSatisfied();
+ bopRoute.assertIsSatisfied();
+ kafkaConnectorToEngine.assertIsSatisfied(2000);
+
+ checkRecipientsAndHistory(usersAndRecipientsTotalNumber, usersAndRecipientsTotalNumber, bopRoute, kafkaConnectorToEngine, emailsInternalOnlyEnabled, "external_user@noway.com");
+ } finally {
+ emailConnectorConfig.setEmailsInternalOnlyEnabled(false);
+ }
+ }
+
+ @Test
+ void testFailureFetchingRecipientsInternalError() throws Exception {
+
+ ExpectationResponseCallback recipientsResolverResponse = req -> response().withStatusCode(500);
+ initMocks(recipientsResolverResponse, null);
+
+ splitRoute.expectedMessageCount(0);
+ kafkaConnectorToEngine.expectedMessageCount(1);
+
+ buildCloudEventAndSendIt(null);
+
+ kafkaConnectorToEngine.assertIsSatisfied();
+ splitRoute.assertIsSatisfied();
+ List responseDetails = checkRecipientsAndHistory(false, bopRoute, kafkaConnectorToEngine, 0);
+ for (JsonObject responseDetail : responseDetails) {
+ assertEquals(500, responseDetail.getJsonObject("error").getInteger("http_status_code"));
+ assertEquals("HTTP_5XX", responseDetail.getJsonObject("error").getString("error_type"));
+ }
+ }
+
@Test
- void testBuildBOPEndpoint() throws Exception {
- try (Endpoint bopEndpoint = this.emailRouteBuilder.setUpBOPEndpointV1().resolve(this.context)) {
- Assertions.assertEquals(this.emailConnectorConfig.getBopURL(), bopEndpoint.getEndpointBaseUri(), "the base URI of the endpoint is not the same as the one set through the properties");
+ void testFailureFetchingRecipientsTimeout() throws Exception {
+ initMocks(null, null);
+
+ splitRoute.expectedMessageCount(0);
+ kafkaConnectorToEngine.expectedMessageCount(1);
+
+ buildCloudEventAndSendIt(null);
- final String bopEndpointURI = bopEndpoint.getEndpointUri();
- Assertions.assertTrue(bopEndpointURI.contains("trustManager%3Dcom.redhat.cloud.notifications.connector.http.SslTrustAllManager"), "the endpoint does not contain a reference to the SslTrustAllManager");
- Assertions.assertTrue(bopEndpointURI.contains("x509HostnameVerifier=NO_OP"), "the base URI does not contain a mention to the NO_OP hostname verifier");
+ kafkaConnectorToEngine.assertIsSatisfied();
+ splitRoute.assertIsSatisfied();
+ List responseDetails = checkRecipientsAndHistory(false, bopRoute, kafkaConnectorToEngine, 0);
+ for (JsonObject responseDetail : responseDetails) {
+ assertFalse(responseDetail.getJsonObject("error").containsKey("http_status_code"));
+ assertEquals("SOCKET_TIMEOUT", responseDetail.getJsonObject("error").getString("error_type"));
}
}
+
+ @Test
+ void testFailureBopInternalError() throws Exception {
+
+ Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7");
+ String strUsers = objectMapper.writeValueAsString(users);
+ ExpectationResponseCallback recipientsResolverResponse = req -> response().withBody(strUsers).withStatusCode(200);
+ ExpectationResponseCallback bopInternalError = req -> response().withStatusCode(500);
+ initMocks(recipientsResolverResponse, bopInternalError);
+
+ // The split route should result of 3 iterations:
+ // 7 recipients / (4 as max recipients per email - 1 for the default recipient no-reply@redhat.com = 3) = 3 iterations
+ // each iteration create its own route, since in case of bop server issue, each single iteration will send its own error message to engine.
+ // when all iterations are completed, the main route continue, triggering a success message
+ splitRoute.expectedMessageCount(1);
+ kafkaConnectorToEngine.expectedMessageCount(4);
+ buildCloudEventAndSendIt(null);
+
+ splitRoute.assertIsSatisfied();
+ kafkaConnectorToEngine.assertIsSatisfied();
+ checkErrorResultsInSplitLoop(1, 3, "HTTP_5XX", 500);
+ }
+
+ @Test
+ void testFailureBopRecipientsTimeout() throws Exception {
+
+ Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7");
+ String strUsers = objectMapper.writeValueAsString(users);
+ ExpectationResponseCallback recipientsResolverResponse = req -> response().withBody(strUsers).withStatusCode(200);
+
+ initMocks(recipientsResolverResponse, null);
+
+ // The split route should result of 3 iterations:
+ // 7 recipients / (4 as max recipients per email - 1 for the default recipient no-reply@redhat.com = 3) = 3 iterations
+ // each iteration create its own route, since in case of bop server issue, each single iteration will send its own error message to engine.
+ // when all iterations are completed, the main route continue, triggering a success message
+ splitRoute.expectedMessageCount(1);
+ kafkaConnectorToEngine.expectedMessageCount(4);
+ buildCloudEventAndSendIt(null);
+
+ splitRoute.assertIsSatisfied();
+ kafkaConnectorToEngine.assertIsSatisfied();
+
+ checkErrorResultsInSplitLoop(1, 3, "SOCKET_TIMEOUT", null);
+ }
+
+ private static void checkErrorResultsInSplitLoop(int expectedSuccess, int expectedFailure, String errorType, Integer httpStatusCode) {
+ int successfullMessages = 0;
+ int failureMessages = 0;
+ for (Exchange kafkaMessage : kafkaConnectorToEngine.getReceivedExchanges()) {
+ JsonObject payload = new JsonObject(kafkaMessage.getIn().getBody(String.class));
+ JsonObject data = new JsonObject(payload.getString("data"));
+
+ if (data.getBoolean("successful")) {
+ successfullMessages++;
+ assertTrue(data.getBoolean("successful"));
+ assertTrue(data.containsKey("details"));
+ assertEquals(7, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY));
+ } else {
+ failureMessages++;
+ assertEquals(false, data.getBoolean("successful"));
+ assertTrue(data.containsKey("details"));
+ assertEquals(7, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY));
+ assertFalse(data.getJsonObject("details").getString("outcome").isBlank());
+ assertTrue(data.containsKey("error"));
+ assertFalse(data.getJsonObject("error").getString("error_type").isBlank());
+
+ if (null == httpStatusCode) {
+ assertFalse(data.getJsonObject("error").containsKey("http_status_code"));
+ } else {
+ assertEquals(500, data.getJsonObject("error").getInteger("http_status_code"));
+ }
+ assertEquals(errorType, data.getJsonObject("error").getString("error_type"));
+ }
+ }
+
+ assertEquals(expectedSuccess, successfullMessages);
+ assertEquals(expectedFailure, failureMessages);
+ }
+
+ private static List checkRecipientsAndHistory(boolean success, MockEndpoint bopRoute, MockEndpoint kafkaEndpoint, int expectedRecipientNumber) {
+ // check recipients sent to bop
+ List receivedExchanges = bopRoute.getReceivedExchanges();
+ Set receivedEmails = new HashSet<>();
+ for (Exchange receivedExchange : receivedExchanges) {
+ Set receivedEmailsOnExchangeMsg = receivedExchange.getIn().getBody(Set.class);
+ assertTrue(receivedEmailsOnExchangeMsg.size() <= 3);
+ receivedEmails.addAll(receivedEmailsOnExchangeMsg);
+ }
+
+ List dataToReturn = new ArrayList<>();
+ // check metrics sent to engine
+ for (Exchange kafkaMessage : kafkaEndpoint.getReceivedExchanges()) {
+ JsonObject payload = new JsonObject(kafkaMessage.getIn().getBody(String.class));
+ JsonObject data = new JsonObject(payload.getString("data"));
+
+ assertEquals(success, data.getBoolean("successful"));
+ assertTrue(data.containsKey("details"));
+ assertEquals(expectedRecipientNumber, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY));
+ assertFalse(data.getJsonObject("details").getString("outcome").isBlank());
+ assertTrue(data.containsKey("error"));
+ assertFalse(data.getJsonObject("error").getString("error_type").isBlank());
+ dataToReturn.add(data);
+ }
+ return dataToReturn;
+ }
+
+ private static void checkRecipientsAndHistory(int usersAndRecipientsTotalNumber, int recipientsReceivedByBopTotalNumber, MockEndpoint bopRoute, MockEndpoint kafkaEndpoint, boolean emailsInternalOnlyEnabled, String filteredRecipient) {
+ // check recipients sent to bop
+ List receivedExchanges = bopRoute.getReceivedExchanges();
+ Set receivedEmails = new HashSet<>();
+ for (Exchange receivedExchange : receivedExchanges) {
+ Set receivedEmailsOnExchangeMsg = receivedExchange.getIn().getBody(Set.class);
+ assertTrue(receivedEmailsOnExchangeMsg.size() <= 3);
+ receivedEmails.addAll(receivedEmailsOnExchangeMsg);
+ }
+
+ if (emailsInternalOnlyEnabled) {
+ assertFalse(receivedEmails.contains(filteredRecipient));
+ assertEquals(recipientsReceivedByBopTotalNumber - 1, receivedEmails.size());
+ } else {
+ assertTrue(receivedEmails.contains(filteredRecipient));
+ assertEquals(recipientsReceivedByBopTotalNumber, receivedEmails.size());
+ }
+
+ // check metrics sent to engine
+ Exchange kafkaMessage = kafkaEndpoint.getReceivedExchanges().stream().findFirst().get();
+ JsonObject payload = new JsonObject(kafkaMessage.getIn().getBody(String.class));
+ JsonObject data = new JsonObject(payload.getString("data"));
+
+ assertTrue(data.getBoolean("successful"));
+
+ if (emailsInternalOnlyEnabled) {
+ assertEquals(usersAndRecipientsTotalNumber - 1, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY));
+ } else {
+ assertEquals(usersAndRecipientsTotalNumber, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY));
+ }
+ }
+
+ private HttpRequest getMockHttpRequest(String path, String method, ExpectationResponseCallback expectationResponseCallback) {
+ HttpRequest postReq = new HttpRequest()
+ .withPath(path)
+ .withMethod(method);
+ MockServerLifecycleManager.getClient()
+ .withSecure(false)
+ .when(postReq)
+ .respond(expectationResponseCallback);
+ return postReq;
+ }
+
+ private void buildCloudEventAndSendIt(Set emailRecipients) {
+ final JsonObject cloudEvent = generateIncomingCloudEvent(emailRecipients);
+
+ final Map headers = new HashMap<>();
+ headers.put(X_RH_NOTIFICATIONS_CONNECTOR_HEADER, emailConnectorConfig.getConnectorName());
+ template.sendBodyAndHeaders(KAFKA_SOURCE_MOCK, cloudEvent.encode(), headers);
+ }
+
+ private JsonObject generateIncomingCloudEvent(Set emailRecipients) {
+ RecipientSettings recipientSettings = new RecipientSettings(false, false, null, null, emailRecipients);
+
+ final EmailNotification emailNotification = new EmailNotification(
+ "test email body",
+ "test email subject",
+ "Not used",
+ "123456",
+ List.of(recipientSettings),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ false,
+ null
+ );
+ final JsonObject payload = JsonObject.mapFrom(emailNotification);
+
+ final String cloudEventId = UUID.randomUUID().toString();
+
+ final JsonObject cloudEvent = new JsonObject();
+ cloudEvent.put(CLOUD_EVENT_ID, cloudEventId);
+ cloudEvent.put(CLOUD_EVENT_TYPE, "com.redhat.console.notification.toCamel." + emailConnectorConfig.getConnectorName());
+ cloudEvent.put(CLOUD_EVENT_DATA, JsonObject.mapFrom(payload));
+ return cloudEvent;
+ }
}
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/RecipientsListTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/RecipientsListTest.java
deleted file mode 100644
index a02cf45eb4..0000000000
--- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/RecipientsListTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package com.redhat.cloud.notifications.connector.email;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.redhat.cloud.notifications.MockServerLifecycleManager;
-import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
-import com.redhat.cloud.notifications.connector.email.model.settings.User;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusTest;
-import jakarta.inject.Inject;
-import org.apache.camel.Exchange;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.AdviceWithRouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.quarkus.test.CamelQuarkusTestSupport;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockserver.mock.action.ExpectationResponseCallback;
-import org.mockserver.model.HttpRequest;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.SUCCESS;
-import static com.redhat.cloud.notifications.connector.EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR;
-import static com.redhat.cloud.notifications.connector.ExchangeProperty.START_TIME;
-import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.EMAIL_RECIPIENTS;
-import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.RECIPIENT_SETTINGS;
-import static com.redhat.cloud.notifications.connector.email.constants.Routes.SEND_EMAIL_BOP;
-import static com.redhat.cloud.notifications.connector.email.constants.Routes.SPLIT_AND_SEND;
-import static org.apache.camel.builder.AdviceWith.adviceWith;
-import static org.apache.camel.test.junit5.TestSupport.createExchangeWithBody;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockserver.model.HttpResponse.response;
-
-@QuarkusTest
-@QuarkusTestResource(TestLifecycleManager.class)
-public class RecipientsListTest extends CamelQuarkusTestSupport {
-
- @Inject
- EmailConnectorConfig emailConnectorConfig;
-
- @Inject
- ProducerTemplate producerTemplate;
-
- @Inject
- ObjectMapper objectMapper;
-
- @Override
- public boolean isUseRouteBuilder() {
- return false;
- }
-
- Exchange test(ExpectationResponseCallback verifyEmptyRequest) throws Exception {
-
- getMockHttpRequest("/internal/recipients-resolver", "PUT", verifyEmptyRequest);
-
- Exchange exchange = createExchangeWithBody(context, new HashSet<>());
- exchange.setProperty(EMAIL_RECIPIENTS, new HashSet<>());
- exchange.setProperty(RECIPIENT_SETTINGS, new ArrayList<>());
- exchange.setProperty(START_TIME, System.currentTimeMillis());
-
- adviceWith(SPLIT_AND_SEND, context(), new AdviceWithRouteBuilder() {
- @Override
- public void configure() throws Exception {
- mockEndpointsAndSkip(
- "direct:" + SEND_EMAIL_BOP
- );
- }
- });
-
- adviceWith(emailConnectorConfig.getConnectorName(), context(), new AdviceWithRouteBuilder() {
- @Override
- public void configure() throws Exception {
- mockEndpoints(
- "direct:" + SUCCESS,
- "direct:" + SPLIT_AND_SEND
- );
- }
- });
-
- return exchange;
-
- }
-
- @Test
- void testEmpty() throws Exception {
-
- ExpectationResponseCallback verifyEmptyRequest = req -> response().withBody("[]").withStatusCode(200);
-
- Exchange exchange = test(verifyEmptyRequest);
- MockEndpoint successEndpoint = getMockEndpoint("mock:direct:" + SUCCESS);
- MockEndpoint splitRoute = getMockEndpoint("mock:direct:" + SPLIT_AND_SEND);
-
- splitRoute.expectedMessageCount(0);
- successEndpoint.expectedMessageCount(1);
-
- producerTemplate.send("seda:" + ENGINE_TO_CONNECTOR, exchange);
- splitRoute.assertIsSatisfied();
- successEndpoint.assertIsSatisfied();
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testNotEmpty(boolean emailsInternalOnlyEnabled) throws Exception {
- try {
- emailConnectorConfig.setEmailsInternalOnlyEnabled(emailsInternalOnlyEnabled);
- Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7");
- String strUsers = objectMapper.writeValueAsString(users);
- ExpectationResponseCallback verifyEmptyRequest = req -> response().withBody(strUsers).withStatusCode(200);
-
- Exchange exchange = test(verifyEmptyRequest);
- Set emailRecipients = new HashSet<>();
- emailRecipients.add("redhat_user@redhat.com");
- emailRecipients.add("external_user@noway.com");
- exchange.setProperty(EMAIL_RECIPIENTS, emailRecipients);
- int usersAndRecipientsTotalNumber = emailRecipients.size() + users.size();
-
- MockEndpoint successEndpoint = getMockEndpoint("mock:direct:" + SUCCESS);
- MockEndpoint splitRoute = getMockEndpoint("mock:direct:" + SPLIT_AND_SEND);
- MockEndpoint bopRoute = getMockEndpoint("mock:direct:" + SEND_EMAIL_BOP);
-
- splitRoute.expectedMessageCount(1);
- successEndpoint.expectedMessageCount(1);
- bopRoute.expectedMessageCount(3);
-
- producerTemplate.send("seda:" + ENGINE_TO_CONNECTOR, exchange);
- splitRoute.assertIsSatisfied();
- successEndpoint.assertIsSatisfied();
- bopRoute.assertIsSatisfied();
- List receivedExchanges = bopRoute.getReceivedExchanges();
- Set receivedEmails = new HashSet<>();
- for (Exchange receivedExchange : receivedExchanges) {
- Set receivedEmailsOnExchangeMsg = receivedExchange.getIn().getBody(Set.class);
- assertTrue(receivedEmailsOnExchangeMsg.size() <= 3);
- receivedEmails.addAll(receivedEmailsOnExchangeMsg);
- }
- if (emailsInternalOnlyEnabled) {
- assertFalse(receivedEmails.contains("external_user@noway.com"));
- assertEquals(usersAndRecipientsTotalNumber - 1, receivedEmails.size());
- } else {
- assertTrue(receivedEmails.contains("external_user@noway.com"));
- assertEquals(usersAndRecipientsTotalNumber, receivedEmails.size());
- }
- } finally {
- emailConnectorConfig.setEmailsInternalOnlyEnabled(false);
- }
- }
-
- private HttpRequest getMockHttpRequest(String path, String method, ExpectationResponseCallback expectationResponseCallback) {
- HttpRequest postReq = new HttpRequest()
- .withPath(path)
- .withMethod(method);
- MockServerLifecycleManager.getClient()
- .withSecure(false)
- .when(postReq)
- .respond(expectationResponseCallback);
- return postReq;
- }
-}
diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java
index 78921537aa..9e3418f885 100644
--- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java
+++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java
@@ -15,6 +15,7 @@ public Map start() {
MockServerLifecycleManager.start();
Map properties = new HashMap<>();
properties.put("notifications.connector.recipients-resolver.url", getMockServerUrl());
+ properties.put("notifications.connector.user-provider.bop.url", getMockServerUrl());
return properties;
}
diff --git a/connector-email/src/test/resources/application.properties b/connector-email/src/test/resources/application.properties
new file mode 100644
index 0000000000..57cd56b61d
--- /dev/null
+++ b/connector-email/src/test/resources/application.properties
@@ -0,0 +1,3 @@
+notifications.connector.http.connect-timeout-ms=200
+notifications.connector.http.socket-timeout-ms=200
+notifications.connector.redelivery.delay=5