Skip to content

Commit

Permalink
move to multi-meta
Browse files Browse the repository at this point in the history
  • Loading branch information
jchris committed Sep 11, 2024
1 parent 3c74a81 commit fbafd68
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
17 changes: 9 additions & 8 deletions src/blockstore/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import type {
DbMetaEventBlock,
CarClockLink,
} from "./types.js";
import { Falsy, StoreType, SuperThis, throwFalsy } from "../types.js";
import { Falsy, StoreType, SuperThis, throwFalsy, CRDTEntry } from "../types.js";
import { Gateway } from "./gateway.js";
import { ensureLogger, isNotFoundError } from "../utils.js";
import { carLogIncludesGroup } from "./loader.js";
Expand Down Expand Up @@ -176,8 +176,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {
return event as EventBlock<{ dbMeta: Uint8Array }>;
}

async decodeMetaBlock(bytes: Uint8Array): Promise<{ eventCid: CarClockLink; dbMeta: DbMeta; parents: string[] }> {
const crdtEntry = JSON.parse(this.sthis.txt.decode(bytes)) as { data: string; parents: string[]; cid: string };
async decodeMetaBlock(crdtEntry: CRDTEntry): Promise<{ eventCid: CarClockLink; dbMeta: DbMeta; parents: string[] }> {
const eventBytes = decodeFromBase64(crdtEntry.data);
const eventBlock = await this.decodeEventBlock(eventBytes);
return {
Expand All @@ -187,17 +186,19 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {
};
}

async handleByteHeads(byteHeads: Uint8Array[]) {
async handleByteHeads(byteHeads: Uint8Array) {
const crdtEntries = JSON.parse(this.sthis.txt.decode(byteHeads)) as CRDTEntry[];
try {
const dbMetas = await Promise.all(byteHeads.map((bytes) => this.decodeMetaBlock(bytes)));
const dbMetas = await Promise.all(crdtEntries.map((entry) => this.decodeMetaBlock(entry)));
return dbMetas;
} catch (e) {
throw this.logger.Error().Err(e).Msg("parseHeader").AsError();
}
}

async handleEventByteHead(byteHead: Uint8Array) {
const { eventCid, dbMeta, parents } = await this.decodeMetaBlock(byteHead);
const crdtEntry = JSON.parse(this.sthis.txt.decode(byteHead)) as CRDTEntry;
const { eventCid, dbMeta, parents } = await this.decodeMetaBlock(crdtEntry);
this.loader?.taskManager?.handleEvent(eventCid, parents, dbMeta);
}

Expand All @@ -215,7 +216,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {
}
throw this.logger.Error().Url(url.Ok()).Result("bytes:", bytes).Msg("gateway get").AsError();
}
const dbMetas = await this.handleByteHeads([bytes.Ok()]);
const dbMetas = await this.handleByteHeads(bytes.Ok());
await this.loader?.handleDbMetasFromStore(dbMetas.map((m) => m.dbMeta)); // the old one didn't await
const cids = dbMetas.map((m) => m.eventCid);
const uniqueParentsMap = new Map([...this.parents, ...cids].map((p) => [p.toString(), p]));
Expand All @@ -230,7 +231,7 @@ export class MetaStoreImpl extends BaseStoreImpl implements MetaStore {
data: base64String,
parents: this.parents.map((p) => p.toString()),
};
return this.sthis.txt.encode(JSON.stringify(crdtEntry));
return this.sthis.txt.encode(JSON.stringify([crdtEntry]));
}

async save(meta: DbMeta, branch?: string): Promise<Result<void>> {
Expand Down
2 changes: 1 addition & 1 deletion src/blockstore/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ export interface MetaStore extends BaseStore {
load(branch?: string): Promise<DbMeta[] | Falsy>;
// branch is defaulted to "main"
save(meta: DbMeta, branch?: string): Promise<Result<void>>;
handleByteHeads(byteHeads: Uint8Array[], branch?: string): Promise<{ eventCid: CarClockLink; dbMeta: DbMeta }[]>;
handleByteHeads(byteHeads: Uint8Array, branch?: string): Promise<{ eventCid: CarClockLink; dbMeta: DbMeta }[]>;
}

export interface DataSaveOpts {
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,9 @@ export interface DocResponse {
export type UpdateListenerFn<T extends DocTypes> = (docs: DocWithId<T>[]) => Promise<void> | void;
export type NoUpdateListenerFn = () => Promise<void> | void;
export type ListenerFn<T extends DocTypes> = UpdateListenerFn<T> | NoUpdateListenerFn;

export interface CRDTEntry {
data: string;
parents: string[];
cid: string;
}
6 changes: 3 additions & 3 deletions tests/blockstore/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ describe("MetaStore", function () {
};
await store.save(h);
const file = await raw.get(store.url(), "main");
const [blockMeta] = await store.handleByteHeads([file]);
const [blockMeta] = await store.handleByteHeads(file);
const decodedHeader = blockMeta.dbMeta;
expect(decodedHeader).toBeTruthy();
expect(decodedHeader.cars).toBeTruthy();
Expand Down Expand Up @@ -156,10 +156,10 @@ describe("MetaStore with a saved header", function () {
const bytes = await raw.get(store.url(), "main");
const data = decoder.decode(bytes);
expect(data).toMatch(/parents/);
const header = JSON.parse(data);
const header = JSON.parse(data)[0];
expect(header).toBeDefined();
expect(header.parents).toBeDefined();
const [blockMeta] = await store.handleByteHeads([bytes]);
const [blockMeta] = await store.handleByteHeads(bytes);
const decodedHeader = blockMeta.dbMeta;
expect(decodedHeader).toBeDefined();
expect(decodedHeader.cars).toBeDefined();
Expand Down

0 comments on commit fbafd68

Please sign in to comment.