Skip to content

Commit

Permalink
[RHCLOUD-37053] Simplify email connector internal logic
Browse files Browse the repository at this point in the history
  • Loading branch information
g-duval committed Jan 6, 2025
1 parent afbc44b commit 348210e
Show file tree
Hide file tree
Showing 17 changed files with 752 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.ClientWebApplicationException;

import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
Expand Down Expand Up @@ -39,20 +40,10 @@ public class HttpExceptionProcessor extends ExceptionProcessor {

@Override
protected void process(Throwable t, Exchange exchange) {
if (t instanceof HttpOperationFailedException e) {
exchange.setProperty(HTTP_STATUS_CODE, e.getStatusCode());
if (e.getStatusCode() >= 300 && e.getStatusCode() < 400) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange);
} else if (e.getStatusCode() >= 400 && e.getStatusCode() < 500 && e.getStatusCode() != SC_TOO_MANY_REQUESTS) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX);
logHttpError(connectorConfig.getClientErrorLogLevel(), e, exchange);
} else if (e.getStatusCode() == SC_TOO_MANY_REQUESTS || e.getStatusCode() >= 500) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange);
} else {
logHttpError(ERROR, e, exchange);
}
if (t instanceof ClientWebApplicationException e) {
manageReturnedStatusCode(exchange, e.getResponse().getStatus(), e.getResponse().readEntity(String.class));
} else if (t instanceof HttpOperationFailedException e) {
manageReturnedStatusCode(exchange, e.getStatusCode(), e.getResponseBody());
} else if (t instanceof ConnectTimeoutException) {
exchange.setProperty(HTTP_ERROR_TYPE, CONNECT_TIMEOUT);
} else if (t instanceof SocketTimeoutException) {
Expand All @@ -70,16 +61,32 @@ protected void process(Throwable t, Exchange exchange) {
}
}

private void logHttpError(Logger.Level level, HttpOperationFailedException e, Exchange exchange) {
private void manageReturnedStatusCode(Exchange exchange, int statusCode, String responseBody) {
exchange.setProperty(HTTP_STATUS_CODE, statusCode);
if (statusCode >= 300 && statusCode < 400) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange);
} else if (statusCode >= 400 && statusCode < 500 && statusCode != SC_TOO_MANY_REQUESTS) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX);
logHttpError(connectorConfig.getClientErrorLogLevel(), statusCode, responseBody, exchange);
} else if (statusCode == SC_TOO_MANY_REQUESTS || statusCode >= 500) {
exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX);
logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange);
} else {
logHttpError(ERROR, statusCode, responseBody, exchange);
}
}

private void logHttpError(Logger.Level level, int statusCode, String responseBody, Exchange exchange) {
Log.logf(
level,
HTTP_ERROR_LOG_MSG,
getRouteId(exchange),
getOrgId(exchange),
getExchangeId(exchange),
getTargetUrl(exchange),
e.getStatusCode(),
e.getResponseBody()
statusCode,
responseBody
);
}
}
17 changes: 17 additions & 0 deletions connector-email/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@
<scope>test</scope>
</dependency>

<!-- Retries -->
<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
<version>${failsafe.version}</version>
</dependency>

<!-- MockServer -->
<dependency>
<groupId>org.mock-server</groupId>
Expand All @@ -87,6 +94,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public void extract(final Exchange exchange, final JsonObject cloudEventData) {
exchange.setProperty(ExchangeProperty.EMAIL_RECIPIENTS, emails);
exchange.setProperty(ExchangeProperty.EMAIL_SENDER, emailNotification.emailSender());

exchange.setProperty(ExchangeProperty.USE_EMAIL_BOP_V1_SSL, emailConnectorConfig.isEnableBopServiceWithSslChecks());
exchange.setProperty(ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE, emailConnectorConfig.useSimplifiedEmailRoute(emailNotification.orgId()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.redhat.cloud.notifications.connector.email;

import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig;
import com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty;
import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings;
import com.redhat.cloud.notifications.connector.email.model.settings.User;
import com.redhat.cloud.notifications.connector.email.processors.bop.BOPManager;
import com.redhat.cloud.notifications.connector.email.processors.recipientsresolver.ExternalRecipientsResolver;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.quarkus.logging.Log;
import io.vertx.core.json.JsonObject;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID;
import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY;
import static java.util.stream.Collectors.toSet;

@ApplicationScoped
public class EmailManagementProcessor implements Processor {

@Inject
EmailConnectorConfig emailConnectorConfig;

@Inject
ExternalRecipientsResolver externalRecipientsResolver;

@Inject
BOPManager bopManager;

@Inject
MeterRegistry meterRegistry;

static final String BOP_RESPONSE_TIME_METRIC = "email.bop.response.time";
static final String RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC = "email.recipients_resolver.response.time";

@Override
public void process(final Exchange exchange) {
// fetch recipients
Set<String> recipientsList = fetchRecipients(exchange);

if (recipientsList.isEmpty()) {
Log.infof("Skipped Email notification because the recipients list was empty [orgId=$%s, historyId=%s]", exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class));
} else {
// send to bop
sendToBop(exchange, recipientsList);
}
}

private void sendToBop(Exchange exchange, Set<String> recipientsList) {
// split recipient list and send it ot BOP
List<List<String>> packedRecipients = partition(recipientsList, emailConnectorConfig.getMaxRecipientsPerEmail() - 1);
final String subject = exchange.getProperty(ExchangeProperty.RENDERED_SUBJECT, String.class);
final String body = exchange.getProperty(ExchangeProperty.RENDERED_BODY, String.class);
final String sender = exchange.getProperty(ExchangeProperty.EMAIL_SENDER, String.class);

for (int i = 0; i < packedRecipients.size(); i++) {
final Timer.Sample bopResponseTimeMetric = Timer.start(meterRegistry);
bopManager.sendToBop(packedRecipients.get(i), subject, body, sender);
bopResponseTimeMetric.stop(meterRegistry.timer(BOP_RESPONSE_TIME_METRIC));
Log.infof("Sent Email notification %d/%d [orgId=%s, historyId=%s]", i + 1, packedRecipients.size(), exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class));
}
}

private static List<List<String>> partition(Set<String> collection, int n) {
AtomicInteger counter = new AtomicInteger();
return collection.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / n))
.values().stream().toList();
}

private Set<String> fetchRecipients(Exchange exchange) {
List<RecipientSettings> recipientSettings = exchange.getProperty(ExchangeProperty.RECIPIENT_SETTINGS, List.class);
Set<String> subscribers = exchange.getProperty(ExchangeProperty.SUBSCRIBERS, Set.class);
Set<String> unsubscribers = exchange.getProperty(ExchangeProperty.UNSUBSCRIBERS, Set.class);
JsonObject recipientsAuthorizationCriterion = exchange.getProperty(ExchangeProperty.RECIPIENTS_AUTHORIZATION_CRITERION, JsonObject.class);

boolean subscribedByDefault = exchange.getProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, boolean.class);
final String orgId = exchange.getProperty(ORG_ID, String.class);

final Timer.Sample recipientsResolverResponseTimeMetric = Timer.start(meterRegistry);
Set<String> recipientsList = externalRecipientsResolver.recipientUsers(
orgId,
Set.copyOf(recipientSettings),
subscribers,
unsubscribers,
subscribedByDefault,
recipientsAuthorizationCriterion)
.stream().map(User::getEmail).filter(email -> email != null && !email.isBlank()).collect(toSet());
recipientsResolverResponseTimeMetric.stop(meterRegistry.timer(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC));

Set<String> emails = exchange.getProperty(ExchangeProperty.EMAIL_RECIPIENTS, Set.of(), Set.class);
if (emailConnectorConfig.isEmailsInternalOnlyEnabled()) {
Set<String> forbiddenEmail = emails.stream().filter(email -> !email.trim().toLowerCase().endsWith("@redhat.com")).collect(Collectors.toSet());
if (!forbiddenEmail.isEmpty()) {
Log.warnf(" %s emails are forbidden for message historyId: %s ", forbiddenEmail, exchange.getProperty(com.redhat.cloud.notifications.connector.ExchangeProperty.ID, String.class));
}
emails.removeAll(forbiddenEmail);
}
recipientsList.addAll(emails);
exchange.setProperty(TOTAL_RECIPIENTS_KEY, recipientsList.size());
return recipientsList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
import org.apache.camel.support.jsse.KeyStoreParameters;
import org.apache.camel.support.jsse.SSLContextParameters;
import org.apache.camel.support.jsse.TrustManagersParameters;
import org.apache.http.conn.ssl.NoopHostnameVerifier;

import java.util.Set;

import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.SUCCESS;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID;
import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.FILTERED_USERS;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_EMAIL_BOP_V1_SSL;
import static com.redhat.cloud.notifications.connector.http.SslTrustAllManager.getSslContextParameters;
import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE;
import static org.apache.camel.LoggingLevel.DEBUG;
import static org.apache.camel.LoggingLevel.INFO;
import static org.apache.camel.builder.endpoint.dsl.HttpEndpointBuilderFactory.HttpEndpointBuilder;
Expand Down Expand Up @@ -61,6 +59,9 @@ public class EmailRouteBuilder extends EngineToConnectorRouteBuilder {
@Inject
EmailMetricsProcessor emailMetricsProcessor;

@Inject
EmailManagementProcessor emailManagementProcessor;

/**
* Configures the flow for this connector.
*/
Expand All @@ -74,23 +75,27 @@ public void configureRoutes() {
.to(direct(ENTRYPOINT));
}

/*
* Prepares the payload accepted by BOP and sends the request to
* the service.
*/
final HttpEndpointBuilder bopEndpointV1 = setUpBOPEndpointV1();

from(seda(ENGINE_TO_CONNECTOR))
.routeId(emailConnectorConfig.getConnectorName())
.process(recipientsResolverRequestPreparer)
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(setupRecipientResolverEndpoint())
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.process(recipientsResolverResponseProcessor)
.choice().when(shouldSkipEmail())
.log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.otherwise()
.to(direct(Routes.SPLIT_AND_SEND))
.choice()
.when(shouldUseSimplifiedEmailManagement())
.process(emailManagementProcessor)
.endChoice()
.otherwise()
.process(recipientsResolverRequestPreparer)
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(setupRecipientResolverEndpoint())
.to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.process(recipientsResolverResponseProcessor)
.choice()
.when(shouldSkipEmail())
.log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.endChoice()
.otherwise()
.to(direct(Routes.SPLIT_AND_SEND))
.endChoice()
.end()
.endChoice()
.end()
.to(direct(SUCCESS));

Expand All @@ -105,16 +110,10 @@ public void configureRoutes() {
// Clear all the headers that may come from the previous route.
.removeHeaders("*")
.process(this.BOPRequestPreparer)
.choice().when(shouldUseBopEmailServiceWithSslChecks())
.log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]")
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(emailConnectorConfig.getBopURL())
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.otherwise()
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(bopEndpointV1)
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.end()
.log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]")
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START)
.to(emailConnectorConfig.getBopURL())
.to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP)
.log(INFO, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]")
.process(emailMetricsProcessor);
}
Expand All @@ -123,27 +122,8 @@ private Predicate shouldSkipEmail() {
return exchange -> exchange.getProperty(FILTERED_USERS, Set.class).isEmpty();
}

private Predicate shouldUseBopEmailServiceWithSslChecks() {
return exchange -> exchange.getProperty(USE_EMAIL_BOP_V1_SSL, Boolean.class);
}

/**
* Creates the endpoint for the BOP service. It makes Apache Camel trust
* BOP service's certificate.
* @return the created endpoint.
*/
protected HttpEndpointBuilder setUpBOPEndpointV1() {
// Remove the schema from the url to avoid the
// "ResolveEndpointFailedException", which complaints about specifying
// the schema twice.
final String fullURL = this.emailConnectorConfig.getBopURL();
if (fullURL.startsWith("https")) {
return https(fullURL.replace("https://", ""))
.sslContextParameters(getSslContextParameters())
.x509HostnameVerifier(NoopHostnameVerifier.INSTANCE);
} else {
return http(fullURL.replace("http://", ""));
}
private Predicate shouldUseSimplifiedEmailManagement() {
return exchange -> exchange.getProperty(USE_SIMPLIFIED_EMAIL_ROUTE, false, Boolean.class);
}

private HttpEndpointBuilder setupRecipientResolverEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.redhat.cloud.notifications.connector.email.config;

import com.redhat.cloud.notifications.connector.http.HttpConnectorConfig;
import com.redhat.cloud.notifications.unleash.UnleashContextBuilder;
import io.quarkus.runtime.LaunchMode;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -74,13 +75,13 @@ public class EmailConnectorConfig extends HttpConnectorConfig {
@ConfigProperty(name = RECIPIENTS_RESOLVER_TRUST_STORE_TYPE)
Optional<String> recipientsResolverTrustStoreType;

private String enableBopEmailServiceWithSslChecks;
private String toggleKafkaIncomingHighVolumeTopic;
private String toggleUseSimplifiedEmailRoute;

@PostConstruct
void emailConnectorPostConstruct() {
enableBopEmailServiceWithSslChecks = toggleRegistry.register("enable-bop-service-ssl-checks", true);
toggleKafkaIncomingHighVolumeTopic = toggleRegistry.register("kafka-incoming-high-volume-topic", true);
toggleUseSimplifiedEmailRoute = toggleRegistry.register("use-simplified-email-route", true);
}

@Override
Expand All @@ -102,7 +103,7 @@ protected Map<String, Object> getLoggedConfiguration() {
config.put(RECIPIENTS_RESOLVER_USER_SERVICE_URL, recipientsResolverServiceURL);
config.put(MAX_RECIPIENTS_PER_EMAIL, maxRecipientsPerEmail);
config.put(NOTIFICATIONS_EMAILS_INTERNAL_ONLY_ENABLED, emailsInternalOnlyEnabled);
config.put(enableBopEmailServiceWithSslChecks, isEnableBopServiceWithSslChecks());
config.put(toggleUseSimplifiedEmailRoute, useSimplifiedEmailRoute(null));
config.put(toggleKafkaIncomingHighVolumeTopic, isIncomingKafkaHighVolumeTopicEnabled());

/*
Expand Down Expand Up @@ -182,8 +183,12 @@ public Optional<String> getRecipientsResolverTrustStoreType() {
return recipientsResolverTrustStoreType;
}

public boolean isEnableBopServiceWithSslChecks() {
return true;
public boolean useSimplifiedEmailRoute(String orgId) {
if (unleashEnabled) {
return unleash.isEnabled(toggleUseSimplifiedEmailRoute, UnleashContextBuilder.buildUnleashContextWithOrgId(orgId), false);
} else {
return false;
}
}

/**
Expand Down
Loading

0 comments on commit 348210e

Please sign in to comment.