Skip to content

Commit

Permalink
Interim commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
arcshiftsolutions committed Oct 17, 2024
1 parent c2c61b1 commit f35bded
Show file tree
Hide file tree
Showing 12 changed files with 407 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
*/
public enum EventOutcome {
INITIATE_SUCCESS,
STUDENT_REGISTRATION_FOUND,
STUDENT_REGISTRATION_NOT_FOUND,
SAGA_COMPLETED,
STUDENT_ALREADY_EXIST
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package ca.bc.gov.educ.eas.api.constants;

/**
* The enum Event type.
*/
public enum EventType {
READ_FROM_TOPIC,
INITIATED,
MARK_SAGA_COMPLETE,
GET_PAGINATED_SCHOOLS
CREATE_STUDENT_REGISTRATION,
GET_STUDENT_REGISTRATION
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,27 @@

import ca.bc.gov.educ.eas.api.helpers.LogHelper;
import ca.bc.gov.educ.eas.api.orchestrator.base.EventHandler;
import ca.bc.gov.educ.eas.api.service.v1.events.EventHandlerDelegatorService;
import ca.bc.gov.educ.eas.api.struct.Event;
import ca.bc.gov.educ.eas.api.util.JsonUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import jakarta.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jboss.threads.EnhancedQueueExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

import static ca.bc.gov.educ.eas.api.constants.TopicsEnum.EAS_API_TOPIC;
import static lombok.AccessLevel.PRIVATE;

@Component
Expand All @@ -28,37 +35,31 @@ public class MessageSubscriber {
@Getter(PRIVATE)
private final Map<String, EventHandler> handlerMap = new HashMap<>();
private final Connection connection;
private final Executor messageProcessingThreads;
private final EventHandlerDelegatorService eventHandlerDelegatorServiceV1;

@Autowired
public MessageSubscriber(final Connection con, final List<EventHandler> eventHandlers) {
public MessageSubscriber(final Connection con, EventHandlerDelegatorService eventHandlerDelegatorServiceV1) {
this.connection = con;
eventHandlers.forEach(handler -> {
this.handlerMap.put(handler.getTopicToSubscribe(), handler);
this.subscribe(handler.getTopicToSubscribe(), handler);
});
this.eventHandlerDelegatorServiceV1 = eventHandlerDelegatorServiceV1;
messageProcessingThreads = new EnhancedQueueExecutor.Builder().setThreadFactory(new ThreadFactoryBuilder().setNameFormat("nats-message-subscriber-%d").build()).setCorePoolSize(10).setMaximumPoolSize(10).setKeepAliveTime(Duration.ofSeconds(60)).build();
}

public void subscribe(final String topic, final EventHandler eventHandler) {
this.handlerMap.computeIfAbsent(topic, k -> eventHandler);
final String queue = topic.replace("_", "-");
final var dispatcher = this.connection.createDispatcher(this.onMessage(eventHandler));
dispatcher.subscribe(topic, queue);
@PostConstruct
public void subscribe() {
String queue = EAS_API_TOPIC.toString().replace("_", "-");
var dispatcher = connection.createDispatcher(onMessage());
dispatcher.subscribe(EAS_API_TOPIC.toString(), queue);
}

/**
* On message message handler.
*
* @return the message handler
*/
public MessageHandler onMessage(final EventHandler eventHandler) {
private MessageHandler onMessage() {
return (Message message) -> {
if (message != null) {
log.debug("Message received subject :: {}, replyTo :: {}, subscriptionID :: {}", message.getSubject(), message.getReplyTo(), message.getSID());
try {
final var eventString = new String(message.getData());
var eventString = new String(message.getData());
LogHelper.logMessagingEventDetails(eventString);
final var event = JsonUtil.getJsonObjectFromString(Event.class, eventString);
eventHandler.handleEvent(event);
var event = JsonUtil.getJsonObjectFromString(Event.class, eventString);
messageProcessingThreads.execute(() -> eventHandlerDelegatorServiceV1.handleEvent(event, message));
} catch (final Exception e) {
log.error("Exception ", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package ca.bc.gov.educ.eas.api.model.v1;

import jakarta.persistence.*;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.PastOrPresent;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.DynamicUpdate;
import org.hibernate.annotations.UuidGenerator;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.UUID;

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Entity
@Table(name = "EAS_EVENT")
@Data
@DynamicUpdate
public class EasEventEntity {

@Id
@UuidGenerator
@Column(name = "EVENT_ID", unique = true, updatable = false, columnDefinition = "BINARY(16)")
private UUID eventId;
@NotNull(message = "eventPayload cannot be null")
@Lob
@Column(name = "EVENT_PAYLOAD")
private byte[] eventPayloadBytes;
@NotNull(message = "eventStatus cannot be null")
@Column(name = "EVENT_STATUS")
private String eventStatus;
@NotNull(message = "eventType cannot be null")
@Column(name = "EVENT_TYPE")
private String eventType;
@Column(name = "CREATE_USER", updatable = false)
String createUser;
@Column(name = "CREATE_DATE", updatable = false)
@PastOrPresent
LocalDateTime createDate;
@Column(name = "UPDATE_USER")
String updateUser;
@Column(name = "UPDATE_DATE")
@PastOrPresent
LocalDateTime updateDate;
@Column(name = "SAGA_ID", updatable = false)
private UUID sagaId;
@NotNull(message = "eventOutcome cannot be null.")
@Column(name = "EVENT_OUTCOME")
private String eventOutcome;
@Column(name = "REPLY_CHANNEL")
private String replyChannel;

public String getEventPayload() {
return new String(getEventPayloadBytes(), StandardCharsets.UTF_8);
}

public void setEventPayload(String eventPayload) {
setEventPayloadBytes(eventPayload.getBytes(StandardCharsets.UTF_8));
}

public static class EasEventBuilder {

byte[] eventPayloadBytes;

public EasEventEntity.EasEventBuilder eventPayload(String eventPayload) {
this.eventPayloadBytes = eventPayload.getBytes(StandardCharsets.UTF_8);
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@


import ca.bc.gov.educ.eas.api.model.v1.AssessmentStudentEntity;
import ca.bc.gov.educ.eas.api.model.v1.EasEventEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;

import java.util.Optional;
import java.util.UUID;

@Repository
public interface AssessmentStudentRepository extends JpaRepository<AssessmentStudentEntity, UUID>, JpaSpecificationExecutor<AssessmentStudentEntity> {

Optional<AssessmentStudentEntity> findByAssessmentIDAndStudentID(UUID assessmentID, String studentID);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ca.bc.gov.educ.eas.api.repository.v1;


import ca.bc.gov.educ.eas.api.model.v1.EasEventEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.Optional;
import java.util.UUID;


@Repository
public interface EasEventRepository extends JpaRepository<EasEventEntity, UUID> {

Optional<EasEventEntity> findBySagaIdAndEventType(UUID sagaId, String eventType);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ca.bc.gov.educ.eas.api.repository.v1;


import ca.bc.gov.educ.eas.api.model.v1.EasEventEntity;
import ca.bc.gov.educ.eas.api.model.v1.SagaEventStatesEntity;
import ca.bc.gov.educ.eas.api.model.v1.EasSagaEntity;
import org.springframework.data.jpa.repository.JpaRepository;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package ca.bc.gov.educ.eas.api.service.v1.events;


import ca.bc.gov.educ.eas.api.messaging.MessagePublisher;
import ca.bc.gov.educ.eas.api.messaging.jetstream.Publisher;
import ca.bc.gov.educ.eas.api.model.v1.EasEventEntity;
import ca.bc.gov.educ.eas.api.struct.Event;
import io.nats.client.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import static ca.bc.gov.educ.eas.api.service.v1.events.EventHandlerService.PAYLOAD_LOG;


/**
* The type Event handler service.
*/
@Service
@Slf4j
@SuppressWarnings({"java:S3864", "java:S3776"})
public class EventHandlerDelegatorService {

/**
* The constant RESPONDING_BACK_TO_NATS_ON_CHANNEL.
*/
public static final String RESPONDING_BACK_TO_NATS_ON_CHANNEL = "responding back to NATS on {} channel ";
private final MessagePublisher messagePublisher;
private final EventHandlerService eventHandlerService;
private final Publisher publisher;

/**
* Instantiates a new Event handler delegator service.
*
* @param messagePublisher the message publisher
* @param eventHandlerService the event handler service
* @param publisher the publisher
*/
@Autowired
public EventHandlerDelegatorService(MessagePublisher messagePublisher, EventHandlerService eventHandlerService, Publisher publisher) {
this.messagePublisher = messagePublisher;
this.eventHandlerService = eventHandlerService;
this.publisher = publisher;
}

/**
* Handle event.
*
* @param event the event
* @param message the message
*/
public void handleEvent(final Event event, final Message message) {
byte[] response;
Pair<byte[], EasEventEntity> pair;
boolean isSynchronous = message.getReplyTo() != null;
try {
switch (event.getEventType()) {
case GET_STUDENT_REGISTRATION:
log.info("received GET_STUDENT event :: {}", event.getSagaId());
log.trace(PAYLOAD_LOG, event.getEventPayload());
response = eventHandlerService.handleGetStudentRegistrationEvent(event, isSynchronous);
log.info(RESPONDING_BACK_TO_NATS_ON_CHANNEL, message.getReplyTo() != null ? message.getReplyTo() : event.getReplyTo());
publishToNATS(event, message, isSynchronous, response);
break;
case CREATE_STUDENT_REGISTRATION:
log.info("received create student event :: {}", event.getSagaId());
log.trace(PAYLOAD_LOG, event.getEventPayload());
pair = eventHandlerService.handleCreateStudentRegistrationEvent(event);
log.info(RESPONDING_BACK_TO_NATS_ON_CHANNEL, message.getReplyTo() != null ? message.getReplyTo() : event.getReplyTo());
publishToNATS(event, message, isSynchronous, pair.getLeft());
publishToJetStream(pair.getRight());
break;
default:
log.info("silently ignoring other events :: {}", event);
break;
}
} catch (final Exception e) {
log.error("Exception", e);
}
}

private void publishToNATS(Event event, Message message, boolean isSynchronous, byte[] left) {
if (isSynchronous) { // sync, req/reply pattern of nats
messagePublisher.dispatchMessage(message.getReplyTo(), left);
} else { // async, pub/sub
messagePublisher.dispatchMessage(event.getReplyTo(), left);
}
}

private void publishToJetStream(final EasEventEntity event) {
publisher.dispatchChoreographyEvent(event);
}
}
Loading

0 comments on commit f35bded

Please sign in to comment.