Skip to content

Commit

Permalink
[PAGOPA-2527] feat: minor improvements after test session in local
Browse files Browse the repository at this point in the history
  • Loading branch information
andrea-deri committed Jan 23, 2025
1 parent 227b9c7 commit 0b15114
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import it.gov.pagopa.fdr.repository.entity.common.Repository;
import it.gov.pagopa.fdr.repository.entity.common.RepositoryPagedResult;
import it.gov.pagopa.fdr.repository.entity.payment.FdrPaymentEntity;
import it.gov.pagopa.fdr.repository.exception.PersistenceFailureException;
import it.gov.pagopa.fdr.repository.exception.TransactionRollbackException;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.bson.types.ObjectId;
import org.eclipse.microprofile.faulttolerance.Retry;

@ApplicationScoped
public class FdrPaymentRepository extends Repository {
Expand Down Expand Up @@ -95,13 +98,6 @@ public List<FdrPaymentEntity> findByFlowObjectIdAndIndexes(ObjectId flowId, Set<
return resultPage.list();
}

public void deleteByFlowObjectId(ObjectId flowId) {

try (ClientSession session = this.mongoClient.startSession()) {
FdrPaymentEntity.deleteBulkInTransaction(session, "ref_fdr.id", flowId);
}
}

public void createEntityInTransaction(List<FdrPaymentEntity> entityBatch)
throws TransactionRollbackException {

Expand All @@ -110,6 +106,19 @@ public void createEntityInTransaction(List<FdrPaymentEntity> entityBatch)
}
}

// https://quarkus.io/guides/smallrye-fault-tolerance
@Retry(
delay = 1000,
maxRetries = -1,
maxDuration = 1,
durationUnit = ChronoUnit.MINUTES,
retryOn = PersistenceFailureException.class)
public long deleteByFlowObjectId(ObjectId flowId) throws PersistenceFailureException {

System.out.print("[AD] executing deleteByFilter try... ");
return FdrPaymentEntity.deleteByFilter("ref_fdr.id", flowId);
}

public void deleteEntityInTransaction(List<FdrPaymentEntity> entityBatch)
throws TransactionRollbackException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.DeleteManyModel;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
Expand All @@ -14,13 +13,15 @@
import io.quarkus.panache.common.Parameters;
import io.quarkus.panache.common.Sort;
import it.gov.pagopa.fdr.repository.enums.PaymentStatusEnum;
import it.gov.pagopa.fdr.repository.exception.PersistenceFailureException;
import it.gov.pagopa.fdr.repository.exception.TransactionRollbackException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.bson.codecs.pojo.annotations.BsonProperty;
import org.bson.types.ObjectId;

@Data
@EqualsAndHashCode(callSuper = true)
Expand Down Expand Up @@ -63,48 +64,50 @@ public static long countByQuery(String query, Parameters parameters) {
return count(query, parameters.map());
}

public static long deleteByQuery(String query, Parameters parameters) {
return delete(query, parameters.map());
}
public static long deleteByFilter(String filterKey, Object filterValue)
throws PersistenceFailureException {

public static void persistBulkInTransaction(
ClientSession session, Iterable<FdrPaymentEntity> entityBatch)
throws TransactionRollbackException {
long deletedEntities = 0;

try {
Instant now = Instant.now();
session.startTransaction();
MongoCollection<FdrPaymentEntity> collection = mongoCollection();

List<WriteModel<FdrPaymentEntity>> bulkOperations = new ArrayList<>();
for (FdrPaymentEntity entity : entityBatch) {
entity.setTimestamp(now);
bulkOperations.add(new InsertOneModel<>(entity));
boolean areThereMoreEntities = true;
while (areThereMoreEntities) {
List<ObjectId> idsToDelete =
collection
.find(Filters.eq(filterKey, filterValue))
.limit(500)
.map(document -> document.id)
.into(new ArrayList<>());

if (idsToDelete.isEmpty()) {
areThereMoreEntities = false;
} else {
collection.deleteMany(Filters.in("_id", idsToDelete));
deletedEntities += idsToDelete.size();
}
}
collection.bulkWrite(session, bulkOperations);

session.commitTransaction();

} catch (Exception e) {

if (session.hasActiveTransaction()) {
session.abortTransaction();
}
throw new TransactionRollbackException(e);
throw new PersistenceFailureException(e);
}

return deletedEntities;
}

public static void deleteBulkInTransaction(
public static void persistBulkInTransaction(
ClientSession session, Iterable<FdrPaymentEntity> entityBatch)
throws TransactionRollbackException {

try {
Instant now = Instant.now();
session.startTransaction();
MongoCollection<FdrPaymentEntity> collection = mongoCollection();

List<WriteModel<FdrPaymentEntity>> bulkOperations = new ArrayList<>();
for (FdrPaymentEntity entity : entityBatch) {
bulkOperations.add(new DeleteOneModel<>(Filters.eq("_id", entity.id)));
entity.setTimestamp(now);
bulkOperations.add(new InsertOneModel<>(entity));
}
collection.bulkWrite(session, bulkOperations);

Expand All @@ -120,15 +123,17 @@ public static void deleteBulkInTransaction(
}

public static void deleteBulkInTransaction(
ClientSession session, String filterKey, Object filterValue)
ClientSession session, Iterable<FdrPaymentEntity> entityBatch)
throws TransactionRollbackException {

try {
session.startTransaction();
MongoCollection<FdrPaymentEntity> collection = mongoCollection();

List<WriteModel<FdrPaymentEntity>> bulkOperations = new ArrayList<>();
bulkOperations.add(new DeleteManyModel<>(Filters.eq(filterKey, filterValue)));
for (FdrPaymentEntity entity : entityBatch) {
bulkOperations.add(new DeleteOneModel<>(Filters.eq("_id", entity.id)));
}
collection.bulkWrite(session, bulkOperations);

session.commitTransaction();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package it.gov.pagopa.fdr.repository.exception;

public class PersistenceFailureException extends Exception {

public PersistenceFailureException(Exception e) {
super(e);
}
}
40 changes: 26 additions & 14 deletions src/main/java/it/gov/pagopa/fdr/service/FlowService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import it.gov.pagopa.fdr.repository.entity.common.RepositoryPagedResult;
import it.gov.pagopa.fdr.repository.entity.flow.FdrFlowEntity;
import it.gov.pagopa.fdr.repository.enums.FlowStatusEnum;
import it.gov.pagopa.fdr.repository.exception.TransactionRollbackException;
import it.gov.pagopa.fdr.repository.exception.PersistenceFailureException;
import it.gov.pagopa.fdr.service.middleware.mapper.FlowMapper;
import it.gov.pagopa.fdr.service.middleware.validator.SemanticValidator;
import it.gov.pagopa.fdr.service.model.arguments.FindFlowsByFiltersArgs;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import org.bson.types.ObjectId;
import org.jboss.logging.Logger;
import org.openapi.quarkus.api_config_cache_json.model.ConfigDataV1;

Expand Down Expand Up @@ -329,21 +331,31 @@ public GenericResponse deleteExistingFlow(String pspId, String flowName) {
throw new AppException(AppErrorCodeMessageEnum.REPORTING_FLOW_NOT_FOUND, flowName);
}

// delete flow and if there are multiple payments related to it yet, delete them
// delete flow and if there are multiple payments related to it yet, delete them in async mode
this.flowRepository.deleteEntity(publishingFlow);
if (publishingFlow.getComputedTotPayments() > 0) {

// try to delete payments using a transaction: if it does not end successfully, it throws an
// accepted Exception that will cause a compensation operation in order to execute the
// rollback.
try {
this.paymentRepository.deleteByFlowObjectId(publishingFlow.id);
} catch (TransactionRollbackException e) {
this.flowRepository.createEntity(publishingFlow);
throw e;
}
}
deleteFlowPaymentsInAsync(publishingFlow.id);

return GenericResponse.builder().message(String.format("Fdr [%s] deleted", flowName)).build();
}

private void deleteFlowPaymentsInAsync(ObjectId flowObjectId) {
CompletableFuture.supplyAsync(
() -> {
try {
log.infof(
"Asynchronously deleting payments related to flow with id [%s]", flowObjectId);
long deletedPayments = this.paymentRepository.deleteByFlowObjectId(flowObjectId);
log.debugf("Deleted existing flows and all [%s] related payments", deletedPayments);
} catch (PersistenceFailureException e) {
log.error(
String.format(
"Deleted existing flows but not all related payments were deleted. If the"
+ " deletion is required, you can find them searching by fdr_ref.id = [%s]."
+ " Error cause: ",
flowObjectId),
e);
}
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ public class ControllerConstants {
+ ControllerConstants.URL_PARAMETER_ORGANIZATION;

public static final String URL_API_GET_PAYMENTS_FOR_NOT_PUBLISHED_FLOW =
ControllerConstants.URL_API_GET_SINGLE_NOT_PUBLISHED_FLOW + "}/payments";
ControllerConstants.URL_API_GET_SINGLE_NOT_PUBLISHED_FLOW + "/payments";

public static final String URL_API_GET_PAYMENTS_FOR_PUBLISHED_FLOW =
ControllerConstants.URL_API_GET_SINGLE_PUBLISHED_FLOW + "}/payments";
ControllerConstants.URL_API_GET_SINGLE_PUBLISHED_FLOW + "/payments";
public static final String URL_API_ADD_PAYMENT_IN_FLOW =
"/" + ControllerConstants.URL_PARAMETER_FDR + "/payments/add";

Expand Down

0 comments on commit 0b15114

Please sign in to comment.