Skip to content

Commit

Permalink
feat: adding is_latest field for better filtering latest flows (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrea-deri authored Jan 29, 2025
1 parent edabfd8 commit a39c61b
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public interface IOrganizationsController {
summary = "Get all published flow by creditor institution",
description =
"""
This API permits to search all published flows for a specific creditor institution,
formatted in a paginated view. The search can be enhanced including the PSP identifier
in order to filter only the flows for certain PSP.<br>
Before executing the query, the search filters are validated against entities configured for
<i>Nodo dei Pagamenti</i> environment, in particular on PSP (if that search filter is defined).<br>
The result of the query is paginated and contains all the metadata needed for pagination purposes.<br>
""")
This API permits to search all published flows for a specific creditor institution,
formatted in a paginated view. The search can be enhanced including the PSP identifier
in order to filter only the flows for certain PSP. The only flows retrieved are the latest
revision, as same as "nodoChiediElencoFlussiRendicontazione" primitive does.<br>
Before executing the query, the search filters are validated against entities configured for
<i>Nodo dei Pagamenti</i> environment, in particular on PSP (if that search filter is defined).<br>
The result of the query is paginated and contains all the metadata needed for pagination purposes.<br>
""")
@APIResponses(
value = {
@APIResponse(
Expand Down Expand Up @@ -139,17 +140,17 @@ PaginatedFlowsResponse getAllPublishedFlows(
summary = "Get single flow by creditor institution, searching by name and revision",
description =
"""
This API permits to search a single flows for a specific creditor institution.
In order to do so, it is required to add the following search filters:
- Creditor institution identifier: for filtering by specific organization
- PSP identifier: for filtering by flow-related PSP
- Flow name: for filtering by specific instance of the flow
- Revision: for filtering by flow revision
This API permits to search a single flows for a specific creditor institution.
In order to do so, it is required to add the following search filters:
- Creditor institution identifier: for filtering by specific organization
- PSP identifier: for filtering by flow-related PSP
- Flow name: for filtering by specific instance of the flow
- Revision: for filtering by flow revision
Before executing the query, the search filters are validated against entities configured for
<i>Nodo dei Pagamenti</i> environment, in particular on creditor institution and PSP. Also,
the name of the flow is validated against a specific standard format.<br>
""")
Before executing the query, the search filters are validated against entities configured for
<i>Nodo dei Pagamenti</i> environment, in particular on creditor institution and PSP. Also,
the name of the flow is validated against a specific standard format.<br>
""")
@APIResponses(
value = {
@APIResponse(
Expand Down Expand Up @@ -230,18 +231,18 @@ SingleFlowResponse getSinglePublishedFlow(
"Get all payments of single flow by creditor institution, searching by name and revision",
description =
"""
This API permits to search all the payments of single flows for a specific creditor institution,
formatted in a paginated view. In order to do so, it is required to add the following search filters:
- Creditor institution identifier: for filtering by specific organization
- PSP identifier: for filtering by flow-related PSP
- Flow name: for filtering by specific instance of the flow
- Revision: for filtering by flow revision
This API permits to search all the payments of single flows for a specific creditor institution,
formatted in a paginated view. In order to do so, it is required to add the following search filters:
- Creditor institution identifier: for filtering by specific organization
- PSP identifier: for filtering by flow-related PSP
- Flow name: for filtering by specific instance of the flow
- Revision: for filtering by flow revision
Before executing the query, the search filters are validated against entities configured for
<i>Nodo dei Pagamenti</i> environment, in particular on creditor institution and PSP. Also,
the name of the flow is validated against a specific standard format.<br>
The result of the query is paginated and contains all the metadata needed for pagination purposes.<br>
""")
Before executing the query, the search filters are validated against entities configured for
<i>Nodo dei Pagamenti</i> environment, in particular on creditor institution and PSP. Also,
the name of the flow is validated against a specific standard format.<br>
The result of the query is paginated and contains all the metadata needed for pagination purposes.<br>
""")
@APIResponses(
value = {
@APIResponse(
Expand Down
34 changes: 29 additions & 5 deletions src/main/java/it/gov/pagopa/fdr/repository/FdrFlowRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import it.gov.pagopa.fdr.util.common.StringUtil;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.microprofile.faulttolerance.Retry;

@ApplicationScoped
public class FdrFlowRepository extends Repository {
Expand Down Expand Up @@ -45,10 +47,11 @@ public class FdrFlowRepository extends Repository {
+ " and receiver.organization_id = :organizationId"
+ " and status != 'PUBLISHED'";

public static final String QUERY_GET_PUBLISHED_BY_PSP_AND_NAME =
"sender.psp_id = :pspId and name = :flowName and status = 'PUBLISHED'";
public static final String QUERY_GET_LAST_PUBLISHED_BY_PSP_AND_NAME =
"sender.psp_id = :pspId and name = :flowName and status = 'PUBLISHED' and is_latest ="
+ " :isLatest";

public RepositoryPagedResult<FdrFlowEntity> findPublishedByOrganizationIdAndOptionalPspId(
public RepositoryPagedResult<FdrFlowEntity> findLatestPublishedByOrganizationIdAndOptionalPspId(
String organizationId, String pspId, Instant publishedGt, int pageNumber, int pageSize) {

Parameters parameters = new Parameters();
Expand All @@ -73,6 +76,10 @@ public RepositoryPagedResult<FdrFlowEntity> findPublishedByOrganizationIdAndOpti
// setting mandatory field: flow status
queryBuilder.add("status = :status");
parameters.and("status", FlowStatusEnum.PUBLISHED);

// setting mandatory field: is_latest flag as true
queryBuilder.add("is_latest = :isLatest");
parameters.and("isLatest", true);
String queryString = String.join(" and ", queryBuilder);

Page page = Page.of(pageNumber - 1, pageSize);
Expand Down Expand Up @@ -137,14 +144,15 @@ public FdrFlowEntity findPublishedByOrganizationIdAndPspIdAndName(
.orElse(null);
}

public FdrFlowEntity findPublishedByPspIdAndName(String pspId, String flowName) {
public FdrFlowEntity findLastPublishedByPspIdAndName(String pspId, String flowName) {

Parameters parameters = new Parameters();
parameters.and("pspId", pspId);
parameters.and("flowName", flowName);
parameters.and("isLatest", true);

return FdrFlowEntity.findByQuery(
FdrFlowRepository.QUERY_GET_PUBLISHED_BY_PSP_AND_NAME, parameters)
FdrFlowRepository.QUERY_GET_LAST_PUBLISHED_BY_PSP_AND_NAME, parameters)
.project(FdrFlowEntity.class)
.firstResultOptional()
.orElse(null);
Expand Down Expand Up @@ -230,11 +238,27 @@ public FdrFlowIdProjection findUnpublishedIdByPspIdAndNameAndOrganization(
.orElse(null);
}

public void updateLastPublishedAsNotLatest(String pspId, String flowName) {

FdrFlowEntity entity = findLastPublishedByPspIdAndName(pspId, flowName);
if (entity != null) {
entity.setIsLatest(false);
updateEntity(entity);
}
}

public void createEntity(FdrFlowEntity entity) {
entity.setTimestamp(Instant.now());
entity.persist();
}

// https://quarkus.io/guides/smallrye-fault-tolerance
@Retry(
delay = 1000,
maxRetries = -1,
maxDuration = 1,
durationUnit = ChronoUnit.MINUTES,
retryOn = Exception.class)
public void updateEntity(FdrFlowEntity entity) {
entity.setTimestamp(Instant.now());
entity.update();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class FdrFlowEntity extends PanacheMongoEntity {

private Instant published;

@BsonProperty("is_latest")
private Boolean isLatest;

@BsonProperty("tot_amount")
private Double totAmount;

Expand Down
28 changes: 19 additions & 9 deletions src/main/java/it/gov/pagopa/fdr/service/FlowService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import it.gov.pagopa.fdr.util.error.exception.common.AppException;
import it.gov.pagopa.fdr.util.error.exception.persistence.PersistenceFailureException;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import org.bson.types.ObjectId;
Expand Down Expand Up @@ -82,7 +83,7 @@ public PaginatedFlowsResponse getPaginatedPublishedFlowsForCI(FindFlowsByFilters
SemanticValidator.validateGetPaginatedFlowsRequestForOrganizations(configData, args);

RepositoryPagedResult<FdrFlowEntity> paginatedResult =
this.flowRepository.findPublishedByOrganizationIdAndOptionalPspId(
this.flowRepository.findLatestPublishedByOrganizationIdAndOptionalPspId(
organizationId, pspId, args.getPublishedGt(), (int) pageNumber, (int) pageSize);
log.debugf(
"Found [%s] entities in [%s] pages. Mapping data to final response.",
Expand Down Expand Up @@ -245,7 +246,8 @@ public GenericResponse createEmptyFlow(String pspId, String flowName, CreateFlow
}

// retrieve the last published flow, in order to take its revision and increment it
FdrFlowEntity lastPublishedFlow = flowRepository.findPublishedByPspIdAndName(pspId, flowName);
FdrFlowEntity lastPublishedFlow =
flowRepository.findLastPublishedByPspIdAndName(pspId, flowName);
Long revision = lastPublishedFlow != null ? (lastPublishedFlow.getRevision() + 1) : 1L;

// finally, persist the newly generated entity
Expand Down Expand Up @@ -294,13 +296,7 @@ public GenericResponse publishFlow(String pspId, String flowName, boolean isInte

// check if retrieved flow can be published
SemanticValidator.validatePublishingFlow(publishingFlow);

// update the publishing flow in order to set its status to PUBLISHED
Instant now = Instant.now();
publishingFlow.setUpdated(now);
publishingFlow.setPublished(now);
publishingFlow.setStatus(FlowStatusEnum.PUBLISHED);
this.flowRepository.updateEntity(publishingFlow);
publishNewRevision(pspId, flowName, publishingFlow);

// TODO do this in transactional way
// FdrFlowToHistoryEntity flowToHistoryEntity = flowMapper.toEntity(publishingFlow,
Expand Down Expand Up @@ -358,4 +354,18 @@ private void deleteFlowPaymentsInAsync(ObjectId flowObjectId) {
return null;
});
}

@Transactional(rollbackOn = Exception.class)
public void publishNewRevision(String pspId, String flowName, FdrFlowEntity publishingFlow) {

// update the publishing flow in order to set its status to PUBLISHED
Instant now = Instant.now();
publishingFlow.setUpdated(now);
publishingFlow.setPublished(now);
publishingFlow.setIsLatest(true);
publishingFlow.setStatus(FlowStatusEnum.PUBLISHED);

this.flowRepository.updateLastPublishedAsNotLatest(pspId, flowName);
this.flowRepository.updateEntity(publishingFlow);
}
}

0 comments on commit a39c61b

Please sign in to comment.