diff --git a/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/action/PropagateRetrieveFromColdStorageContentAction.java b/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/action/PropagateRetrieveFromColdStorageContentAction.java new file mode 100644 index 00000000..ede39701 --- /dev/null +++ b/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/action/PropagateRetrieveFromColdStorageContentAction.java @@ -0,0 +1,98 @@ +/* + * (C) Copyright 2024 Nuxeo (http://nuxeo.com/) and others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * Guillaume Renard + */ + +package org.nuxeo.coldstorage.action; + +import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM; +import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1; +import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.nuxeo.coldstorage.ColdStorageConstants; +import org.nuxeo.coldstorage.service.ColdStorageService; +import org.nuxeo.ecm.core.api.CoreSession; +import org.nuxeo.ecm.core.api.DocumentModel; +import org.nuxeo.ecm.core.api.DocumentModelList; +import org.nuxeo.ecm.core.api.NuxeoException; +import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation; +import org.nuxeo.lib.stream.computation.Topology; +import org.nuxeo.runtime.api.Framework; +import org.nuxeo.runtime.stream.StreamProcessorTopology; + +/** + * Bulk action in charge of marking documents being retrieved. + * + * @since 2023.4 + */ +public class PropagateRetrieveFromColdStorageContentAction implements StreamProcessorTopology { + + private static final Logger log = LogManager.getLogger(PropagateRetrieveFromColdStorageContentAction.class); + + public static final String ACTION_NAME = "propagateRetrieveFromColdStorage"; + + public static final String ACTION_FULL_NAME = "bulk/" + ACTION_NAME; + + @Override + public Topology getTopology(Map options) { + return Topology.builder() + .addComputation(PropagateRetrieveFromColdStorageContentComputation::new, // + List.of(INPUT_1 + ":" + ACTION_FULL_NAME, OUTPUT_1 + ":" + STATUS_STREAM)) + .build(); + } + + public static class PropagateRetrieveFromColdStorageContentComputation extends AbstractBulkComputation { + + public PropagateRetrieveFromColdStorageContentComputation() { + super(ACTION_FULL_NAME); + } + + @Override + protected void compute(CoreSession session, List ids, Map properties) { + log.debug("Start computing documents of which a retrieve from ColdStorage is ongoing {}", ids); + DocumentModelList documents = loadDocuments(session, ids); + + ColdStorageService service = Framework.getService(ColdStorageService.class); + + long errorCount = 0; + for (DocumentModel document : documents) { + if (!document.hasFacet(ColdStorageConstants.COLD_STORAGE_FACET_NAME)) { + log.info("The main content for document: {} is not in cold storage.", document::getId); + continue; + } + try { + service.proceedRetrieveMainContent(session, document); + } catch (NuxeoException e) { + errorCount++; + delta.inError(String.format("Cannot propagate retrieve from cold storage for document %s: %s", document.getId(), + e.getMessage())); + log.warn("Could not propagate retrieve from cold storage for document: {}", document::getId, + () -> e); + } + } + delta.setErrorCount(errorCount); + log.debug("End computing documents of which a retrieve from ColdStorage is ongoing"); + } + } + +} diff --git a/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageService.java b/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageService.java index a2acae81..2f83ecec 100644 --- a/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageService.java +++ b/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageService.java @@ -136,4 +136,11 @@ DocumentModel proceedRestoreMainContent(CoreSession session, DocumentModel docum */ void propagateRestoreFromColdStorage(CoreSession session, String blobDigest); + /** + * Internal use. + * + * @since 2023.4 + */ + void proceedRetrieveMainContent(CoreSession session, DocumentModel documentModel); + } diff --git a/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageServiceImpl.java b/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageServiceImpl.java index 8e227471..996ffef9 100644 --- a/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageServiceImpl.java +++ b/nuxeo-coldstorage/src/main/java/org/nuxeo/coldstorage/service/ColdStorageServiceImpl.java @@ -69,6 +69,7 @@ import org.nuxeo.coldstorage.action.CheckColdStorageAvailabilityAction; import org.nuxeo.coldstorage.action.PropagateMoveToColdStorageContentAction; import org.nuxeo.coldstorage.action.PropagateRestoreFromColdStorageContentAction; +import org.nuxeo.coldstorage.action.PropagateRetrieveFromColdStorageContentAction; import org.nuxeo.ecm.core.api.Blob; import org.nuxeo.ecm.core.api.CoreInstance; import org.nuxeo.ecm.core.api.CoreSession; @@ -280,7 +281,7 @@ public DocumentModel proceedMoveToColdStorage(CoreSession session, DocumentRef d BlobUpdateContext updateContext = new BlobUpdateContext(key).withColdStorageClass(true); Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext); } else { - log.warn("Main blob {} for document {} is already in cold storage with storage class {}", + log.debug("Main blob {} for document {} is already in cold storage with storage class {}", coldContent::getDigest, documentModel::getId, oldStatus::getStorageClass); } } catch (IOException e) { @@ -348,23 +349,27 @@ public DocumentModel retrieveFromColdStorage(CoreSession session, DocumentRef do documentModel.setPropertyValue(COLD_STORAGE_CONTENT_DOWNLOADABLE_UNTIL, Date.from(blobStatus.getDownloadableUntil())); doNotify = doc -> false; - } else if (blobStatus.isOngoingRestore()) { - documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true); - doNotify = doc -> true; } else { - try { - BlobUpdateContext updateContext = new BlobUpdateContext(key).withRestoreForDuration(restoreDuration); - Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext); - } catch (IOException e) { - log.error("Could not retrieve document {} for duration {} seconds", documentModel::getId, - restoreDuration::getSeconds); - throw new NuxeoException(e); + if (blobStatus.isOngoingRestore()) { + documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true); + doNotify = doc -> true; + } else { + try { + BlobUpdateContext updateContext = new BlobUpdateContext(key).withRestoreForDuration( + restoreDuration); + Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext); + } catch (IOException e) { + log.error("Could not retrieve document {} for duration {} seconds", documentModel::getId, + restoreDuration::getSeconds); + throw new NuxeoException(e); + } + documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true); + doNotify = doc -> CoreInstance.doPrivileged(session, s -> { + // The check retrieval may need to modify metadata of document too + return !checkIsRetrieved(s, doc); + }); } - documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true); - doNotify = doc -> CoreInstance.doPrivileged(session, s -> { - // The check retrieval may need to modify metadata of document too - return !checkIsRetrieved(s, doc); - }); + propagateRetrieveFromColdStorage(session, key); } docResult = CoreInstance.doPrivileged(session, s -> { // The retrieval is allowed for users with only READ access. @@ -531,7 +536,7 @@ public void propagateMoveToColdStorage(CoreSession session, String blobDigest) { * Restore from ColdStorage all documents referencing the given blob digests as main content. * * @param session the session - * @param blobDigests the blob digests + * @param blobDigest the blob digests */ public void propagateRestoreFromColdStorage(CoreSession session, String blobDigest) { String query = String.format("SELECT * FROM Document WHERE %s/digest = '%s'", COLD_STORAGE_CONTENT_PROPERTY, @@ -545,6 +550,25 @@ public void propagateRestoreFromColdStorage(CoreSession session, String blobDige () -> bulkService.getStatus(commandId)); } + /** + * Retrieve from ColdStorage all documents referencing the given blob digests as main content. + * + * @param session the session + * @param blobDigest the blob digests + * @since 2023.4 + */ + public void propagateRetrieveFromColdStorage(CoreSession session, String blobDigest) { + String query = String.format("SELECT * FROM Document WHERE %s/digest = '%s' AND (%s IS NULL OR %s = 0)", + COLD_STORAGE_CONTENT_PROPERTY, blobDigest, COLD_STORAGE_BEING_RETRIEVED_PROPERTY, COLD_STORAGE_BEING_RETRIEVED_PROPERTY); + + BulkService bulkService = Framework.getService(BulkService.class); + String username = SecurityConstants.SYSTEM_USERNAME; + String commandId = bulkService.submitTransactional(new BulkCommand.Builder( + PropagateRetrieveFromColdStorageContentAction.ACTION_NAME, query, username).build()); + log.debug("Retrieving documents referencing blob: {}, status: {}", () -> blobDigest, + () -> bulkService.getStatus(commandId)); + } + @Override public void checkDocToBeRetrieved(CoreSession session) { BulkService bulkService = Framework.getService(BulkService.class); @@ -634,4 +658,10 @@ protected void fireEvent(DocumentModel doc, CoreSession session, String eventNam eventService.fireEvent(event); } + @Override + public void proceedRetrieveMainContent(CoreSession session, DocumentModel documentModel) { + documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true); + fireEvent(documentModel, session, COLD_STORAGE_CONTENT_TO_RETRIEVE_EVENT_NAME); + } + } diff --git a/nuxeo-coldstorage/src/main/resources/OSGI-INF/coldstorage-bulk-contrib.xml b/nuxeo-coldstorage/src/main/resources/OSGI-INF/coldstorage-bulk-contrib.xml index affcdf71..79b21d53 100644 --- a/nuxeo-coldstorage/src/main/resources/OSGI-INF/coldstorage-bulk-contrib.xml +++ b/nuxeo-coldstorage/src/main/resources/OSGI-INF/coldstorage-bulk-contrib.xml @@ -8,6 +8,8 @@ bucketSize="100" batchSize="20" httpEnabled="false" /> + @@ -31,6 +33,12 @@ defaultPartitions="${nuxeo.bulk.action.propagateRestoreFromColdStorage.defaultPartitions:=4}"> + + +