Skip to content

Commit

Permalink
[RHCLOUD-37053] Simplify email connector internal logic
Browse files Browse the repository at this point in the history
  • Loading branch information
g-duval committed Jan 6, 2025
1 parent afbc44b commit 1b9033d
Show file tree
Hide file tree
Showing 14 changed files with 677 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.ClientWebApplicationException;

import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
Expand Down Expand Up @@ -39,20 +40,10 @@ public class HttpExceptionProcessor extends ExceptionProcessor {

@Override
protected void process(Throwable t, Exchange exchange) {
if (t instanceof HttpOperationFailedException e) {
exchange.setProperty(HTTP_STATUS_CODE, e.getStatusCode());
if (e.getStatusCode() >= 300 && e.getStatusCode() < 400) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange);
} else if (e.getStatusCode() >= 400 && e.getStatusCode() < 500 && e.getStatusCode() != SC_TOO_MANY_REQUESTS) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX);
logHttpError(connectorConfig.getClientErrorLogLevel(), e, exchange);
} else if (e.getStatusCode() == SC_TOO_MANY_REQUESTS || e.getStatusCode() >= 500) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange);
} else {
logHttpError(ERROR, e, exchange);
}
if (t instanceof ClientWebApplicationException e) {
manageReturnedStatusCode(exchange, e.getResponse().getStatus(), e.getResponse().readEntity(String.class));
} else if (t instanceof HttpOperationFailedException e) {
manageReturnedStatusCode(exchange, e.getStatusCode(), e.getResponseBody());
} else if (t instanceof ConnectTimeoutException) {
exchange.setProperty(HTTP_ERROR_TYPE, CONNECT_TIMEOUT);
} else if (t instanceof SocketTimeoutException) {
Expand All @@ -70,16 +61,32 @@ protected void process(Throwable t, Exchange exchange) {
}
}

private void logHttpError(Logger.Level level, HttpOperationFailedException e, Exchange exchange) {
private void manageReturnedStatusCode(Exchange exchange, int statusCode, String responseBody) {
exchange.setProperty(HTTP_STATUS_CODE, statusCode);
if (statusCode >= 300 && statusCode < 400) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange);
} else if (statusCode >= 400 && statusCode < 500 && statusCode != SC_TOO_MANY_REQUESTS) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX);
logHttpError(connectorConfig.getClientErrorLogLevel(), statusCode, responseBody, exchange);
} else if (statusCode == SC_TOO_MANY_REQUESTS || statusCode >= 500) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange);
} else {
logHttpError(ERROR, statusCode, responseBody, exchange);
}
}

private void logHttpError(Logger.Level level, int statusCode, String responseBody, Exchange exchange) {
Log.logf(
level,
HTTP_ERROR_LOG_MSG,
getRouteId(exchange),
getOrgId(exchange),
getExchangeId(exchange),
getTargetUrl(exchange),
e.getStatusCode(),
e.getResponseBody()
statusCode,
responseBody
);
}
}
17 changes: 17 additions & 0 deletions connector-email/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@
<scope>test</scope>
</dependency>

<!-- Retries -->
<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>

<!-- MockServer -->
<dependency>
<groupId>org.mock-server</groupId>
Expand All @@ -87,6 +94,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public void extract(final Exchange exchange, final JsonObject cloudEventData) {
exchange.setProperty(ExchangeProperty.EMAIL_RECIPIENTS, emails);
exchange.setProperty(ExchangeProperty.EMAIL_SENDER, emailNotification.emailSender());

exchange.setProperty(ExchangeProperty.USE_EMAIL_BOP_V1_SSL, emailConnectorConfig.isEnableBopServiceWithSslChecks());
exchange.setProperty(ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE, emailConnectorConfig.useSimplifiedEmailRoute(emailNotification.orgId()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.redhat.cloud.notifications.connector.email;

import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
import com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty;
import com.redhat.cloud.notifications.connector.email.model.bop.Email;
import com.redhat.cloud.notifications.connector.email.model.bop.SendEmailsRequest;
import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
import com.redhat.cloud.notifications.connector.email.model.settings.User;
import com.redhat.cloud.notifications.connector.email.processors.bop.BOPService;
import com.redhat.cloud.notifications.connector.email.processors.recipientsresolver.ExternalRecipientsResolver;
import io.quarkus.logging.Log;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID;
import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY;
import static java.util.stream.Collectors.toSet;

@ApplicationScoped
public class EmailManagementProcessor implements Processor {

@Inject
EmailConnectorConfig emailConnectorConfig;

@Inject
ExternalRecipientsResolver externalRecipientsResolver;

@Inject
@RestClient
BOPService bopService;

@Override
public void process(final Exchange exchange) {
// fetch recipients
Set<String> recipientsList = fetchRecipients(exchange);

if (recipientsList.isEmpty()) {
Log.infof("Skipped Email notification because the recipients list was empty [orgId=$%s, historyId=%s]", exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class));
} else {
// send to bop
sendToBop(exchange, recipientsList);
}
}

protected void sendToBop(Exchange exchange, Set<String> recipientsList) {
// split recipient list and send it ot BOP
List<List<String>> packedRecipients = partition(recipientsList, emailConnectorConfig.getMaxRecipientsPerEmail() - 1);
final String subject = exchange.getProperty(ExchangeProperty.RENDERED_SUBJECT, String.class);
final String body = exchange.getProperty(ExchangeProperty.RENDERED_BODY, String.class);
final String sender = exchange.getProperty(ExchangeProperty.EMAIL_SENDER, String.class);

for (int i = 0; i < packedRecipients.size(); i++) {
sendToBop(exchange, packedRecipients.get(i), subject, body, sender, i, packedRecipients.size());
}
}

protected void sendToBop(Exchange exchange, List<String> recipients, String subject, String body, String sender, int currentIteration, int totalIterations) {

// Prepare the email to be sent
final Email email = new Email(
subject,
body,
Set.copyOf(recipients)
);

final SendEmailsRequest request = new SendEmailsRequest(
Set.of(email),
sender,
sender
);

bopService.sendEmail(emailConnectorConfig.getBopApiToken(),
emailConnectorConfig.getBopClientId(),
emailConnectorConfig.getBopEnv(),
request);
Log.infof("Sent Email notification %d/%d [orgId=%s, historyId=%s]", currentIteration + 1, totalIterations, exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class));
}

private Set<String> fetchRecipients(Exchange exchange) {
List<RecipientSettings> recipientSettings = exchange.getProperty(ExchangeProperty.RECIPIENT_SETTINGS, List.class);
Set<String> subscribers = exchange.getProperty(ExchangeProperty.SUBSCRIBERS, Set.class);
Set<String> unsubscribers = exchange.getProperty(ExchangeProperty.UNSUBSCRIBERS, Set.class);
JsonObject recipientsAuthorizationCriterion = exchange.getProperty(ExchangeProperty.RECIPIENTS_AUTHORIZATION_CRITERION, JsonObject.class);

boolean subscribedByDefault = exchange.getProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, boolean.class);
final String orgId = exchange.getProperty(ORG_ID, String.class);

Set<String> recipientsList = externalRecipientsResolver.recipientUsers(
orgId,
Set.copyOf(recipientSettings),
subscribers,
unsubscribers,
subscribedByDefault,
recipientsAuthorizationCriterion)
.stream().map(User::getEmail).filter(email -> email != null && !email.isBlank()).collect(toSet());

Set<String> emails = exchange.getProperty(ExchangeProperty.EMAIL_RECIPIENTS, Set.of(), Set.class);
if (emailConnectorConfig.isEmailsInternalOnlyEnabled()) {
Set<String> forbiddenEmail = emails.stream().filter(email -> !email.trim().toLowerCase().endsWith("@redhat.com")).collect(Collectors.toSet());
if (!forbiddenEmail.isEmpty()) {
Log.warnf(" %s emails are forbidden for message historyId: %s ", forbiddenEmail, exchange.getProperty(com.redhat.cloud.notifications.connector.ExchangeProperty.ID, String.class));
}
emails.removeAll(forbiddenEmail);
}
recipientsList.addAll(emails);
exchange.setProperty(TOTAL_RECIPIENTS_KEY, recipientsList.size());
return recipientsList;
}

private static List<List<String>> partition(Set<String> collection, int n) {
AtomicInteger counter = new AtomicInteger();
return collection.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / n))
.values().stream().collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
import org.apache.camel.support.jsse.KeyStoreParameters;
import org.apache.camel.support.jsse.SSLContextParameters;
import org.apache.camel.support.jsse.TrustManagersParameters;
import org.apache.http.conn.ssl.NoopHostnameVerifier;

import java.util.Set;

import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.SUCCESS;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.FILTERED_USERS;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_EMAIL_BOP_V1_SSL;
import static com.redhat.cloud.notifications.connector.http.SslTrustAllManager.getSslContextParameters;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE;
import static org.apache.camel.LoggingLevel.DEBUG;
import static org.apache.camel.LoggingLevel.INFO;
import static org.apache.camel.builder.endpoint.dsl.HttpEndpointBuilderFactory.HttpEndpointBuilder;
Expand Down Expand Up @@ -61,6 +59,9 @@ public class EmailRouteBuilder extends EngineToConnectorRouteBuilder {
@Inject
EmailMetricsProcessor emailMetricsProcessor;

@Inject
EmailManagementProcessor emailManagementProcessor;

/**
* Configures the flow for this connector.
*/
Expand All @@ -74,23 +75,27 @@ public void configureRoutes() {
.to(direct(ENTRYPOINT));
}

/*
* Prepares the payload accepted by BOP and sends the request to
* the service.
*/
final HttpEndpointBuilder bopEndpointV1 = setUpBOPEndpointV1();

from(seda(ENGINE_TO_CONNECTOR))
.routeId(emailConnectorConfig.getConnectorName())
.process(recipientsResolverRequestPreparer)
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(setupRecipientResolverEndpoint())
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.process(recipientsResolverResponseProcessor)
.choice().when(shouldSkipEmail())
.log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.otherwise()
.to(direct(Routes.SPLIT_AND_SEND))
.choice()
.when(shouldUseSimplifiedEmailManagement())
.process(emailManagementProcessor)
.endChoice()
.otherwise()
.process(recipientsResolverRequestPreparer)
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(setupRecipientResolverEndpoint())
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.process(recipientsResolverResponseProcessor)
.choice()
.when(shouldSkipEmail())
.log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.endChoice()
.otherwise()
.to(direct(Routes.SPLIT_AND_SEND))
.endChoice()
.end()
.endChoice()
.end()
.to(direct(SUCCESS));

Expand All @@ -105,16 +110,10 @@ public void configureRoutes() {
// Clear all the headers that may come from the previous route.
.removeHeaders("*")
.process(this.BOPRequestPreparer)
.choice().when(shouldUseBopEmailServiceWithSslChecks())
.log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]")
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(emailConnectorConfig.getBopURL())
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.otherwise()
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(bopEndpointV1)
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.end()
.log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]")
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(emailConnectorConfig.getBopURL())
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.log(INFO, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.process(emailMetricsProcessor);
}
Expand All @@ -123,27 +122,8 @@ private Predicate shouldSkipEmail() {
return exchange -> exchange.getProperty(FILTERED_USERS, Set.class).isEmpty();
}

private Predicate shouldUseBopEmailServiceWithSslChecks() {
return exchange -> exchange.getProperty(USE_EMAIL_BOP_V1_SSL, Boolean.class);
}

/**
* Creates the endpoint for the BOP service. It makes Apache Camel trust
* BOP service's certificate.
* @return the created endpoint.
*/
protected HttpEndpointBuilder setUpBOPEndpointV1() {
// Remove the schema from the url to avoid the
// "ResolveEndpointFailedException", which complaints about specifying
// the schema twice.
final String fullURL = this.emailConnectorConfig.getBopURL();
if (fullURL.startsWith("https")) {
return https(fullURL.replace("https://", ""))
.sslContextParameters(getSslContextParameters())
.x509HostnameVerifier(NoopHostnameVerifier.INSTANCE);
} else {
return http(fullURL.replace("http://", ""));
}
private Predicate shouldUseSimplifiedEmailManagement() {
return exchange -> exchange.getProperty(USE_SIMPLIFIED_EMAIL_ROUTE, false, Boolean.class);
}

private HttpEndpointBuilder setupRecipientResolverEndpoint() {
Expand Down
Loading

0 comments on commit 1b9033d

Please sign in to comment.