Skip to content

Commit

Permalink
#28, Removing redundancy between transaction time and transaction ID
Browse files Browse the repository at this point in the history
  • Loading branch information
gsvarovsky committed Nov 25, 2020
1 parent 1ddb4ea commit b65c411
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 82 deletions.
10 changes: 5 additions & 5 deletions src/engine/MeldEncoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ export class MeldEncoding {
newDelta = async (delta: Omit<MeldDelta, 'encoded'>): Promise<MeldDelta> => {
const [del, ins] = await Promise.all([delta.delete, delta.insert]
.map(triples => this.jsonFromTriples(triples).then(EncodedDelta.encode)));
return { ...delta, encoded: [0, delta.tid, del, ins] };
return { ...delta, encoded: [1, del, ins] };
}

asDelta = async (delta: EncodedDelta): Promise<MeldDelta> => {
const [ver, tid, del, ins] = delta;
if (ver !== 0)
const [ver, del, ins] = delta;
if (ver !== 1)
throw new Error(`Encoded delta version ${ver} not supported`);
const jsons = await Promise.all([del, ins].map(EncodedDelta.decode));
const [delTriples, insTriples] = await Promise.all(jsons.map(this.triplesFromJson));
return ({
tid, insert: insTriples, delete: delTriples, encoded: delta,
toString: () => `${tid}: ${JSON.stringify(jsons)}`
insert: insTriples, delete: delTriples, encoded: delta,
toString: () => `${JSON.stringify(jsons)}`
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/engine/dataset/DatasetEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ export class DatasetEngine extends AbstractMeld implements CloneEngine, MeldLoca
}
});
// The applys will enqueue in order on the dataset's transaction lock
await Promise.all(applys.map(async ([msg, arrivalTime, localTime]) => {
const cxUpdate = await this.dataset.apply(msg, arrivalTime, localTime);
await Promise.all(applys.map(async ([msg, localTime, cxnTime]) => {
const cxUpdate = await this.dataset.apply(msg, localTime, cxnTime);
if (cxUpdate != null)
this.nextUpdate(cxUpdate);
msg.delivered.resolve();
Expand Down
46 changes: 20 additions & 26 deletions src/engine/dataset/SuSetDataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import { MeldEncoding, unreify, toDomainQuad, reifyTriplesTids } from '../MeldEn
import { Observable, from, Subject as Source, EMPTY } from 'rxjs';
import { bufferCount, mergeMap, reduce, map, filter, takeWhile, expand } from 'rxjs/operators';
import { flatten, Future, tapComplete, getIdLogger, check } from '../util';
import { generate as uuid } from 'short-uuid';
import { Logger } from 'loglevel';
import { MeldError } from '../MeldError';
import { LocalLock } from '../local';
import { SUSET_CONTEXT, qsName, tripleId } from './SuSetGraph';
import { SUSET_CONTEXT, qsName, tripleId, txnId } from './SuSetGraph';
import { SuSetJournalDataset, SuSetJournalEntry } from './SuSetJournal';
import { MeldConfig, Read } from '../..';
import { QuadMap, TripleMap, Triple } from '../quads';
Expand Down Expand Up @@ -183,7 +182,6 @@ export class SuSetDataset extends JrqlGraph {
@SuSetDataset.checkNotClosed.async
async transact(prepare: () => Promise<[TreeClock, PatchQuads]>): Promise<DeltaMessage | null> {
return this.dataset.transact<DeltaMessage | null>({
id: uuid(), // New transaction ID
prepare: async txc => {
const [time, patch] = await prepare();
if (patch.isEmpty)
Expand All @@ -195,11 +193,11 @@ export class SuSetDataset extends JrqlGraph {

txc.sw.next('find-tids');
const deletedTriplesTids = await this.findTriplesTids(patch.oldQuads);
const delta = await this.txnDelta(txc.id, patch.newQuads, asTriplesTids(deletedTriplesTids));
const delta = await this.txnDelta(patch.newQuads, asTriplesTids(deletedTriplesTids));

// Include tid changes in final patch
txc.sw.next('new-tids');
const tidPatch = await this.txnTidPatch(txc.id, patch.newQuads, deletedTriplesTids);
const tidPatch = await this.txnTidPatch(txnId(time), patch.newQuads, deletedTriplesTids);

// Include journaling in final patch
txc.sw.next('journal');
Expand Down Expand Up @@ -230,55 +228,57 @@ export class SuSetDataset extends JrqlGraph {
return tidPatch;
}

private txnDelta(tid: string, insert: Quad[], deletedTriplesTids: TripleMap<UUID[]>) {
private txnDelta(insert: Quad[], deletedTriplesTids: TripleMap<UUID[]>) {
return this.meldEncoding.newDelta({
tid, insert,
insert,
// Delta has reifications of old quads, which we infer from found triple tids
delete: reifyTriplesTids(deletedTriplesTids)
});
}

@SuSetDataset.checkNotClosed.async
async apply(msg: DeltaMessage, arrivalTime: TreeClock, localTime: TreeClock): Promise<DeltaMessage | null> {
async apply(msg: DeltaMessage, localTime: TreeClock, cxnTime: TreeClock): Promise<DeltaMessage | null> {
return this.dataset.transact<DeltaMessage | null>({
id: msg.data[1],
prepare: async txc => {
// Check we haven't seen this transaction before
txc.sw.next('find-tids');
this.log.debug(`Applying tid: ${txc.id} @ ${arrivalTime}`);
this.log.debug(`Applying delta: ${msg.time} @ ${localTime}`);

txc.sw.next('unreify');
const delta = await this.meldEncoding.asDelta(msg.data);
const patch = new PatchQuads([], delta.insert.map(toDomainQuad));
const tidPatch = await this.processSuDeletions(delta.delete, patch);

txc.sw.next('apply-cx'); // "cx" = constraint
let { cxn, update } = await this.constrainUpdate(patch, arrivalTime, delta.tid);
const tid = txnId(msg.time);
let update = patch.isEmpty ? null : await this.asUpdate(localTime, patch);
const cxn = update == null ? null :
await this.applyConstraint({ update, patch, tid }, cxnTime);
// After applying the constraint, patch new quads might have changed
tidPatch.append(await this.newTriplesTid(patch.newQuads, delta.tid));
tidPatch.append(await this.newTriplesTid(patch.newQuads, tid));

// Done determining the applied delta patch. At this point we could
// have an empty patch, but we still need to complete the journal
// entry for it.
txc.sw.next('journal');
const journal = await this.journalData.journal(), tail = await journal.tail();
const journaling = tail.builder(
journal, { delta, localTime: arrivalTime, remoteTime: msg.time });
journal, { delta, localTime, remoteTime: msg.time });

// If the constraint has done anything, we need to merge its work
if (cxn != null) {
tidPatch.append(cxn.tidPatch);
patch.append(cxn.patch);
// Re-create the update with the constraint resolution included
update = patch.isEmpty ? null : await this.asUpdate(localTime, patch)
update = patch.isEmpty ? null : await this.asUpdate(cxnTime, patch)
// Also create a journal entry for the constraint "transaction"
journaling.next({ delta: cxn.delta, localTime });
journaling.next({ delta: cxn.delta, localTime: cxnTime });
}
return {
patch: this.transactionPatch(patch, tidPatch),
kvps: journaling.commit,
// FIXME: If this delta message exceeds max size, what to do?
return: cxn != null ? this.deltaMessage(tail, localTime, cxn.delta) : null,
return: cxn != null ? this.deltaMessage(tail, cxnTime, cxn.delta) : null,
after: () => update && this.updateSource.next(update)
};
}
Expand All @@ -300,19 +300,14 @@ export class SuSetDataset extends JrqlGraph {
}, Promise.resolve(new PatchQuads()));
}

private async constrainUpdate(patch: PatchQuads, arrivalTime: TreeClock, tid: string) {
const update = patch.isEmpty ? null : await this.asUpdate(arrivalTime, patch);
const cxn = update == null ? null : await this.applyConstraint({ patch, update, tid });
return { cxn, update };
}

/**
* Caution: mutates to.patch
* @param to transaction details to apply the patch to
* @param localTime local clock time
*/
private async applyConstraint(
to: { patch: PatchQuads, update: MeldUpdate, tid: string }) {
to: { update: MeldUpdate, patch: PatchQuads, tid: string },
cxnTime: TreeClock) {
const patch = new PatchQuads();
const mutableUpdate: MutableMeldUpdate = {
...to.update,
Expand All @@ -322,7 +317,6 @@ export class SuSetDataset extends JrqlGraph {
}
await this.constraint.apply(this.state, mutableUpdate);
if (!patch.isEmpty) {
const tid = uuid();
// Triples that were inserted in the applied transaction may have been
// deleted by the constraint - these need to be removed from the applied
// transaction patch but still published in the constraint delta
Expand All @@ -335,8 +329,8 @@ export class SuSetDataset extends JrqlGraph {
patch.remove('oldQuads', quad => deletedExistingTidQuads.get(quad) == null);
return {
patch,
delta: await this.txnDelta(tid, patch.newQuads, deletedTriplesTids),
tidPatch: await this.txnTidPatch(tid, patch.newQuads, deletedExistingTidQuads)
delta: await this.txnDelta(patch.newQuads, deletedTriplesTids),
tidPatch: await this.txnTidPatch(txnId(cxnTime), patch.newQuads, deletedExistingTidQuads)
};
}
return null;
Expand Down
26 changes: 10 additions & 16 deletions src/engine/dataset/SuSetGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,16 @@ import { NamedNode } from 'rdf-js';
import { Iri } from 'jsonld/jsonld-spec';
import { Triple, tripleKey } from '../quads';
import { createHash } from 'crypto';
import { TreeClock } from '../clocks';
import { MsgPack } from '../util';

/**
* Context for SU-Set Dataset code to manipulate control content.
*/
export const SUSET_CONTEXT: Context = {
qs: 'http://qs.m-ld.org/',
tid: 'qs:#tid', // Property of journal entry AND triple hash
body: 'qs:#body', // Journal and journal entry body
thash: 'qs:thash/', // Namespace for triple hashes
tail: { '@id': 'qs:#tail', '@type': '@id' }, // Property of the journal
lastDelivered: { '@id': 'qs:#lastDelivered', '@type': '@id' }, // Property of the journal
entry: 'qs:journal/entry/', // Namespace for journal entries
hash: 'qs:#hash', // Property of a journal entry
delta: 'qs:#delta', // Property of a journal entry
remote: 'qs:#remote', // Property of a journal entry
time: 'qs:#time', // Property of journal AND a journal entry
ticks: 'qs:#ticks', // Property of a journal entry
next: { '@id': 'qs:#next', '@type': '@id' } // Property of a journal entry
tid: 'qs:#tid', // Property of triple hash
thash: 'qs:thash/' // Namespace for triple hashes
}

export function qsName(name: string): NamedNode {
Expand All @@ -32,12 +24,14 @@ export function toPrefixedId(prefix: string, ...path: string[]): Iri {
return `${prefix}:${path.map(encodeURIComponent).join('/')}`;
}

export function fromPrefixedId(prefix: string, id: Iri): string[] {
return id.match(`^${prefix}:(.+)`)?.[1]?.split('/').map(decodeURIComponent) ?? [];
}

export function tripleId(triple: Triple): string {
const hash = createHash('sha1'); // Fastest
tripleKey(triple).forEach(key => hash.update(key));
return toPrefixedId('thash', hash.digest('base64'));
}

export function txnId(time: TreeClock): string {
return createHash('sha1')
.update(MsgPack.encode(time.toJson()))
.digest('base64');
}
5 changes: 2 additions & 3 deletions src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ export interface Meld {
}

export interface MeldDelta extends Object {
readonly tid: UUID;
readonly insert: Triple[];
readonly delete: Triple[];
/**
Expand All @@ -92,12 +91,12 @@ export interface MeldDelta extends Object {
}

/**
* A tuple containing version, tid, delete and insert components of a
* A tuple containing encoding version, delete and insert components of a
* {@link MeldDelta}. The delete and insert components are UTF-8 encoded JSON-LD
* strings, which may be GZIP compressed into a Buffer if bigger than a
* threshold. Intended to be efficiently serialised with MessagePack.
*/
export type EncodedDelta = [0, string, string | Buffer, string | Buffer];
export type EncodedDelta = [1, string | Buffer, string | Buffer];

export namespace EncodedDelta {
export async function encode(json: any): Promise<Buffer | string> {
Expand Down
13 changes: 6 additions & 7 deletions test/DatasetEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { comesAlive } from '../src/engine/AbstractMeld';
import { first, take, toArray, map, observeOn, count } from 'rxjs/operators';
import { TreeClock } from '../src/engine/clocks';
import { DeltaMessage, MeldRemotes, Snapshot } from '../src/engine';
import { uuid } from 'short-uuid';
import { MeldConfig, Subject, Describe, Update } from '../src';
import MemDown from 'memdown';
import { AbstractLevelDOWN } from 'abstract-leveldown';
Expand Down Expand Up @@ -149,7 +148,7 @@ describe('Dataset engine', () => {
'http://test.m-ld.org/#name': 'Fred'
} as Subject);
remoteUpdates.next(new DeltaMessage(remoteTime.ticks, remoteTime.ticked(),
[0, uuid(), '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
[1, '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
// Note extra tick for constraint application in remote update
await expect(updates).resolves.toEqual([1, 3]);
});
Expand All @@ -160,7 +159,7 @@ describe('Dataset engine', () => {
test('answers rev-up from next new clone after apply', async () => {
const updated = clone.dataUpdates.pipe(take(1)).toPromise();
remoteUpdates.next(new DeltaMessage(remoteTime.ticks, remoteTime.ticked(),
[0, uuid(), '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
[1, '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
await updated;
const thirdTime = await clone.newClock();
await expect(clone.revupFrom(thirdTime)).resolves.toBeDefined();
Expand Down Expand Up @@ -215,7 +214,7 @@ describe('Dataset engine', () => {
await clone.initialise();
const updates = clone.dataUpdates.pipe(count()).toPromise();
remoteUpdates.next(new DeltaMessage(collabClock.ticks, collabClock.ticked(),
[0, uuid(), '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
[1, '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
// Also enqueue a no-op write, which we can wait for - relying on queue ordering
await clone.write({ '@insert': [] } as Update);
await clone.close(); // Will complete the updates
Expand Down Expand Up @@ -316,7 +315,7 @@ describe('Dataset engine', () => {
});
// Provide a rev-up that pre-dates the local siloed update
revUps.next(new DeltaMessage(remoteTime.ticks, remoteTime.ticked(),
[0, uuid(), '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
[1, '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
revUps.complete();
// Check that the updates are not out of order
await expect(observedTicks).resolves.toEqual([1, 2]);
Expand All @@ -333,7 +332,7 @@ describe('Dataset engine', () => {

// Push a delta claiming a missed tick
remoteUpdates.next(new DeltaMessage(remoteTime.ticks + 1, remoteTime.ticked().ticked(),
[0, uuid(), '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));
[1, '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']));

await expect(clone.status.becomes({ outdated: true })).resolves.toBeDefined();
await expect(clone.status.becomes({ outdated: false })).resolves.toBeDefined();
Expand All @@ -350,7 +349,7 @@ describe('Dataset engine', () => {

const updates = clone.dataUpdates.pipe(toArray()).toPromise();
const delta = new DeltaMessage(remoteTime.ticks, remoteTime.ticked(),
[0, uuid(), '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']);
[1, '{}', '{"@id":"http://test.m-ld.org/wilma","http://test.m-ld.org/#name":"Wilma"}']);
// Push a delta
remoteUpdates.next(delta);
// Push the same delta again
Expand Down
Loading

0 comments on commit b65c411

Please sign in to comment.