Skip to content

Commit

Permalink
fix(datastore): merge incoming mutation with existing update mutation (
Browse files Browse the repository at this point in the history
…#1379)

* chore(datastore): merge incoming mutation with existing update pending mutation in the queue

* chore(datastore): address PR comments

* add comments

* move anotation inline

* rename saveIncomingAndNotify to saveAndNotify
  • Loading branch information
sdhuka authored Jun 17, 2021
1 parent b72c7eb commit f2bc5ac
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.model.Model;
import com.amplifyframework.core.model.ModelSchema;
import com.amplifyframework.core.model.SerializedModel;
import com.amplifyframework.core.model.query.Where;
import com.amplifyframework.core.model.query.predicate.QueryPredicate;
import com.amplifyframework.core.model.query.predicate.QueryPredicates;
Expand Down Expand Up @@ -329,13 +330,29 @@ private Completable handleIncomingUpdate() {
// we're simply performing the create (with the updated item item contents)
return overwriteExistingAndNotify(PendingMutation.Type.CREATE, QueryPredicates.all());
case UPDATE:
// If the incoming update does not have a condition, we want to delete any
// existing mutations for the modelId before saving the incoming one.
if (QueryPredicates.all().equals(incoming.getPredicate())) {
// If the incoming update does not have a condition, we want to delete any
// existing mutations for the modelId before saving the incoming one.
return removeNotLocking(existing.getMutationId()).andThen(saveIncomingAndNotify());
// If the incoming & existing update is of type serializedModel
// then merge the existing model data into incoming.
if (incoming.getMutatedItem() instanceof SerializedModel
&& existing.getMutatedItem() instanceof SerializedModel) {
SerializedModel mergedSerializedModel = SerializedModel.merge(
(SerializedModel) incoming.getMutatedItem(),
(SerializedModel) existing.getMutatedItem(),
incoming.getModelSchema());
@SuppressWarnings("unchecked") // cast SerializedModel to Model
PendingMutation<T> mergedPendingMutation = (PendingMutation<T>) PendingMutation.update(
mergedSerializedModel,
incoming.getModelSchema());
return removeNotLocking(existing.getMutationId())
.andThen(saveAndNotify(mergedPendingMutation));
} else {
return removeNotLocking(existing.getMutationId()).andThen(saveAndNotify(incoming));
}
} else {
// If it has a condition, we want to just add it to the queue
return saveIncomingAndNotify();
return saveAndNotify(incoming);
}
case DELETE:
// Incoming update after a delete -> throw exception
Expand Down Expand Up @@ -381,7 +398,7 @@ private Completable overwriteExistingAndNotify(@NonNull PendingMutation.Type typ
.andThen(notifyContentAvailable());
}

private Completable saveIncomingAndNotify() {
private Completable saveAndNotify(PendingMutation<T> incoming) {
return save(incoming)
.andThen(notifyContentAvailable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.amplifyframework.AmplifyException;
import com.amplifyframework.core.model.Model;
import com.amplifyframework.core.model.ModelSchema;
import com.amplifyframework.core.model.SerializedModel;
import com.amplifyframework.core.model.query.Where;
import com.amplifyframework.core.model.query.predicate.QueryPredicates;
import com.amplifyframework.datastore.DataStoreException;
Expand Down Expand Up @@ -630,6 +631,70 @@ public void existingUpdateIncomingUpdateWithoutConditionRewritesExistingMutation
);
}

/**
* When there is an existing SerializedModel update mutation, and a new SerializedModel update mutation comes in,
* then we need to merge any existing mutations for that modelId and create the new one.
* @throws AmplifyException On failure to find the serializedModel difference.
* @throws InterruptedException If interrupted while awaiting terminal result in test observer
*/
@Test
public void existingSerializedModelUpdateIncomingUpdateWithoutConditionMergesWithExistingMutation()
throws AmplifyException, InterruptedException {
// Arrange an existing update mutation
BlogOwner modelInSqlLite = BlogOwner.builder()
.name("Papa Tony")
.wea("Something")
.build();

BlogOwner initialUpdate = BlogOwner.builder()
.name("Tony Jr")
.id(modelInSqlLite.getId())
.build();

PendingMutation<SerializedModel> initialUpdatePendingMutation =
PendingMutation.update(SerializedModel.difference(initialUpdate, modelInSqlLite, schema), schema);
String existingUpdateId = initialUpdatePendingMutation.getMutationId().toString();
mutationOutbox.enqueue(initialUpdatePendingMutation).blockingAwait();

// Act: try to enqueue a new update mutation when there already is one
BlogOwner incomingUpdatedModel = BlogOwner.builder()
.name("Papa Tony")
.wea("something else")
.id(modelInSqlLite.getId())
.build();
PendingMutation<SerializedModel> incomingUpdate = PendingMutation.update(
SerializedModel.difference(incomingUpdatedModel, modelInSqlLite, schema),
schema);
String incomingUpdateId = incomingUpdate.getMutationId().toString();
TestObserver<Void> enqueueObserver = mutationOutbox.enqueue(incomingUpdate).test();

// Assert: OK. The new mutation is accepted
enqueueObserver.await(TIMEOUT_MS, TimeUnit.MILLISECONDS);
enqueueObserver.assertComplete();

// Assert: the existing mutation has been removed
assertRecordCountForMutationId(existingUpdateId, 0);

// And the new one has been added to the queue
assertRecordCountForMutationId(incomingUpdateId, 0);

List<PersistentRecord> pendingMutationsFromStorage = getAllPendingMutationRecordFromStorage();
for (PersistentRecord record : pendingMutationsFromStorage) {
if (!record.getContainedModelId().equals(incomingUpdate.getMutatedItem().getId())) {
pendingMutationsFromStorage.remove(record);
}
}
// Ensure the new one is in storage.
PendingMutation<SerializedModel> storedMutation =
converter.fromRecord(pendingMutationsFromStorage.get(0));
// This is the name from the second model, not the first!!
assertEquals(initialUpdate.getName(),
storedMutation.getMutatedItem().getSerializedData().get("name"));
// wea got merged from existing model!!
assertEquals(incomingUpdatedModel.getWea(),
storedMutation.getMutatedItem().getSerializedData().get("wea"));
}

/**
* When there is an existing creation mutation, and an update comes in,
* the exiting creation should be updated with the contents of the incoming
Expand Down Expand Up @@ -1032,4 +1097,8 @@ private void assertRecordCountForMutationId(String mutationId, int expectedCount
private List<PersistentRecord> getPendingMutationRecordFromStorage(String mutationId) throws DataStoreException {
return storage.query(PersistentRecord.class, Where.id(mutationId));
}

private List<PersistentRecord> getAllPendingMutationRecordFromStorage() throws DataStoreException {
return storage.query(PersistentRecord.class, Where.matchesAll());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ public static <T extends Model> SerializedModel difference(T updated, T original
.build();
}

/**
* Merge the serialized data from existing to incoming model.
* @param incoming the incoming Model to which serialized data fields will be added.
* @param existing the original Model to compare against.
* @param modelSchema ModelSchema for the Models between compared.
* @return a SerializedModel, containing the values from the incoming Model and existing Model.
*/
public static SerializedModel merge(SerializedModel incoming, SerializedModel existing, ModelSchema modelSchema) {
Map<String, Object> mergedSerializedData = new HashMap<>(incoming.serializedData);
for (String key : existing.getSerializedData().keySet()) {
if (!mergedSerializedData.containsKey(key)) {
mergedSerializedData.put(key, existing.getSerializedData().get(key));
}
}
return SerializedModel.builder()
.serializedData(mergedSerializedData)
.modelSchema(modelSchema)
.build();
}

/**
* Return a builder of {@link SerializedModel}.
* @return A serialized model builder
Expand Down

0 comments on commit f2bc5ac

Please sign in to comment.