Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NXP-32515: Propagate retrieve requests #286

Open
wants to merge 1 commit into
base: lts-2023
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* (C) Copyright 2024 Nuxeo (http://nuxeo.com/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Guillaume Renard
*/

package org.nuxeo.coldstorage.action;

import static org.nuxeo.ecm.core.bulk.BulkServiceImpl.STATUS_STREAM;
import static org.nuxeo.lib.stream.computation.AbstractComputation.INPUT_1;
import static org.nuxeo.lib.stream.computation.AbstractComputation.OUTPUT_1;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.coldstorage.ColdStorageConstants;
import org.nuxeo.coldstorage.service.ColdStorageService;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentModelList;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.bulk.action.computation.AbstractBulkComputation;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.stream.StreamProcessorTopology;

/**
* Bulk action in charge of marking documents being retrieved.
*
* @since 2023.4
*/
public class PropagateRetrieveFromColdStorageContentAction implements StreamProcessorTopology {

private static final Logger log = LogManager.getLogger(PropagateRetrieveFromColdStorageContentAction.class);

public static final String ACTION_NAME = "propagateRetrieveFromColdStorage";

public static final String ACTION_FULL_NAME = "bulk/" + ACTION_NAME;

@Override
public Topology getTopology(Map<String, String> options) {
return Topology.builder()
.addComputation(PropagateRetrieveFromColdStorageContentComputation::new, //
List.of(INPUT_1 + ":" + ACTION_FULL_NAME, OUTPUT_1 + ":" + STATUS_STREAM))
.build();
}

public static class PropagateRetrieveFromColdStorageContentComputation extends AbstractBulkComputation {

public PropagateRetrieveFromColdStorageContentComputation() {
super(ACTION_FULL_NAME);
}

@Override
protected void compute(CoreSession session, List<String> ids, Map<String, Serializable> properties) {
log.debug("Start computing documents of which a retrieve from ColdStorage is ongoing {}", ids);
DocumentModelList documents = loadDocuments(session, ids);

ColdStorageService service = Framework.getService(ColdStorageService.class);

long errorCount = 0;
for (DocumentModel document : documents) {
if (!document.hasFacet(ColdStorageConstants.COLD_STORAGE_FACET_NAME)) {
log.info("The main content for document: {} is not in cold storage.", document::getId);
continue;
}
try {
service.proceedRetrieveMainContent(session, document);
} catch (NuxeoException e) {
errorCount++;
delta.inError(String.format("Cannot propagate retrieve from cold storage for document %s: %s", document.getId(),
e.getMessage()));
log.warn("Could not propagate retrieve from cold storage for document: {}", document::getId,
() -> e);
}
}
delta.setErrorCount(errorCount);
log.debug("End computing documents of which a retrieve from ColdStorage is ongoing");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,11 @@ DocumentModel proceedRestoreMainContent(CoreSession session, DocumentModel docum
*/
void propagateRestoreFromColdStorage(CoreSession session, String blobDigest);

/**
* Internal use.
*
* @since 2023.4
*/
void proceedRetrieveMainContent(CoreSession session, DocumentModel documentModel);

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.nuxeo.coldstorage.action.CheckColdStorageAvailabilityAction;
import org.nuxeo.coldstorage.action.PropagateMoveToColdStorageContentAction;
import org.nuxeo.coldstorage.action.PropagateRestoreFromColdStorageContentAction;
import org.nuxeo.coldstorage.action.PropagateRetrieveFromColdStorageContentAction;
import org.nuxeo.ecm.core.api.Blob;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
Expand Down Expand Up @@ -280,7 +281,7 @@ public DocumentModel proceedMoveToColdStorage(CoreSession session, DocumentRef d
BlobUpdateContext updateContext = new BlobUpdateContext(key).withColdStorageClass(true);
Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext);
} else {
log.warn("Main blob {} for document {} is already in cold storage with storage class {}",
log.debug("Main blob {} for document {} is already in cold storage with storage class {}",
coldContent::getDigest, documentModel::getId, oldStatus::getStorageClass);
}
} catch (IOException e) {
Expand Down Expand Up @@ -348,23 +349,27 @@ public DocumentModel retrieveFromColdStorage(CoreSession session, DocumentRef do
documentModel.setPropertyValue(COLD_STORAGE_CONTENT_DOWNLOADABLE_UNTIL,
Date.from(blobStatus.getDownloadableUntil()));
doNotify = doc -> false;
} else if (blobStatus.isOngoingRestore()) {
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> true;
} else {
try {
BlobUpdateContext updateContext = new BlobUpdateContext(key).withRestoreForDuration(restoreDuration);
Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext);
} catch (IOException e) {
log.error("Could not retrieve document {} for duration {} seconds", documentModel::getId,
restoreDuration::getSeconds);
throw new NuxeoException(e);
if (blobStatus.isOngoingRestore()) {
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> true;
} else {
try {
BlobUpdateContext updateContext = new BlobUpdateContext(key).withRestoreForDuration(
restoreDuration);
Framework.getService(BlobManager.class).getBlobProvider(coldContent).updateBlob(updateContext);
} catch (IOException e) {
log.error("Could not retrieve document {} for duration {} seconds", documentModel::getId,
restoreDuration::getSeconds);
throw new NuxeoException(e);
}
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> CoreInstance.doPrivileged(session, s -> {
// The check retrieval may need to modify metadata of document too
return !checkIsRetrieved(s, doc);
});
}
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
doNotify = doc -> CoreInstance.doPrivileged(session, s -> {
// The check retrieval may need to modify metadata of document too
return !checkIsRetrieved(s, doc);
});
propagateRetrieveFromColdStorage(session, key);
}
docResult = CoreInstance.doPrivileged(session, s -> {
// The retrieval is allowed for users with only READ access.
Expand Down Expand Up @@ -531,7 +536,7 @@ public void propagateMoveToColdStorage(CoreSession session, String blobDigest) {
* Restore from ColdStorage all documents referencing the given blob digests as main content.
*
* @param session the session
* @param blobDigests the blob digests
* @param blobDigest the blob digests
*/
public void propagateRestoreFromColdStorage(CoreSession session, String blobDigest) {
String query = String.format("SELECT * FROM Document WHERE %s/digest = '%s'", COLD_STORAGE_CONTENT_PROPERTY,
Expand All @@ -545,6 +550,25 @@ public void propagateRestoreFromColdStorage(CoreSession session, String blobDige
() -> bulkService.getStatus(commandId));
}

/**
* Retrieve from ColdStorage all documents referencing the given blob digests as main content.
*
* @param session the session
* @param blobDigest the blob digests
* @since 2023.4
*/
public void propagateRetrieveFromColdStorage(CoreSession session, String blobDigest) {
String query = String.format("SELECT * FROM Document WHERE %s/digest = '%s' AND (%s IS NULL OR %s = 0)",
COLD_STORAGE_CONTENT_PROPERTY, blobDigest, COLD_STORAGE_BEING_RETRIEVED_PROPERTY, COLD_STORAGE_BEING_RETRIEVED_PROPERTY);

BulkService bulkService = Framework.getService(BulkService.class);
String username = SecurityConstants.SYSTEM_USERNAME;
String commandId = bulkService.submitTransactional(new BulkCommand.Builder(
PropagateRetrieveFromColdStorageContentAction.ACTION_NAME, query, username).build());
log.debug("Retrieving documents referencing blob: {}, status: {}", () -> blobDigest,
() -> bulkService.getStatus(commandId));
}

@Override
public void checkDocToBeRetrieved(CoreSession session) {
BulkService bulkService = Framework.getService(BulkService.class);
Expand Down Expand Up @@ -634,4 +658,10 @@ protected void fireEvent(DocumentModel doc, CoreSession session, String eventNam
eventService.fireEvent(event);
}

@Override
public void proceedRetrieveMainContent(CoreSession session, DocumentModel documentModel) {
documentModel.setPropertyValue(COLD_STORAGE_BEING_RETRIEVED_PROPERTY, true);
fireEvent(documentModel, session, COLD_STORAGE_CONTENT_TO_RETRIEVE_EVENT_NAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
bucketSize="100" batchSize="20" httpEnabled="false" />
<action name="propagateRestoreFromColdStorage" inputStream="bulk/propagateRestoreFromColdStorage"
bucketSize="100" batchSize="20" httpEnabled="false" />
<action name="propagateRetrieveFromColdStorage" inputStream="bulk/propagateRetrieveFromColdStorage"
bucketSize="100" batchSize="20" httpEnabled="false" />
<action name="checkColdStorageAvailability" inputStream="bulk/checkColdStorageAvailability"
bucketSize="100" batchSize="20" httpEnabled="false" />
</extension>
Expand All @@ -31,6 +33,12 @@
defaultPartitions="${nuxeo.bulk.action.propagateRestoreFromColdStorage.defaultPartitions:=4}">
<policy name="default" maxRetries="3" delay="1s" maxDelay="10s" continueOnFailure="true" />
</streamProcessor>
<streamProcessor name="propagateRetrieveFromColdStorage"
class="org.nuxeo.coldstorage.action.PropagateRetrieveFromColdStorageContentAction"
defaultConcurrency="${nuxeo.bulk.action.propagateRetrieveFromColdStorage.defaultConcurrency:=2}"
defaultPartitions="${nuxeo.bulk.action.propagateRetrieveFromColdStorage.defaultPartitions:=4}">
<policy name="default" maxRetries="3" delay="1s" maxDelay="10s" continueOnFailure="true" />
</streamProcessor>
<streamProcessor name="checkColdStorageAvailability"
class="org.nuxeo.coldstorage.action.CheckColdStorageAvailabilityAction"
defaultScroller="${nuxeo.bulk.action.checkColdStorageAvailability.scroller:=default}"
Expand Down