Skip to content

Commit

Permalink
feat(kysely): reorg kysely exports, add more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
djMax committed Jan 6, 2024
1 parent be681c1 commit f9809a8
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 123 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,17 @@ await synchronizeTable({
},
}, {})
```

## Kysely

We have custom implentnations for Kysely that make things even easier. To copy a fact table from a Kysely DB (including nice autocomplete):

```typescript
import { copyTable } from '@sesamecare-oss/to-clickhouse/kysely';

await copyTable(db, ch, {}, {
from: 'address_types',
to: 'identity__address_types',
pk: 'address_type_id',
});
```
10 changes: 5 additions & 5 deletions __tests__/kysely.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from 'vitest';
import { ClickHouseClient } from '@clickhouse/client';
import { Kysely } from 'kysely';

import { kysely } from '../src/dbs/kysely';
import { copyTable, syncTable } from '../src/dbs/kysely';

import { createChDb, createPgDb } from './db.fixtures';
import { kyselyDb } from './kysely.fixtures';
Expand All @@ -28,7 +28,7 @@ describe('simple kysely interface', () => {
});

test('should sync to clickhouse', async () => {
const detail = await kysely.copyTable(db, ch, {}, {
const detail = await copyTable(db, ch, {}, {
from: 'address_types',
to: 'identity__address_types',
pk: 'address_type_id',
Expand All @@ -41,7 +41,7 @@ describe('simple kysely interface', () => {
pk: 'individual_id',
delaySeconds: 0,
} as const;
const ind = await kysely.syncTable(db, ch, {}, indSpec);
const ind = await syncTable(db, ch, {}, indSpec);
expect(ind.rows, 'Table should copy 3 rows').toBe(3);

await new Promise((resolve) => setTimeout(resolve, 250));
Expand All @@ -50,7 +50,7 @@ describe('simple kysely interface', () => {
.where('individual_id', '=', '1')
.execute();

const upd = await kysely.syncTable(db, ch, ind.bookmark, indSpec);
const upd = await syncTable(db, ch, ind.bookmark, indSpec);
expect(upd.rows, 'Table should still copy 3 rows because updated_at >= should match previous rows too').toBe(3);

await new Promise((resolve) => setTimeout(resolve, 250));
Expand All @@ -59,7 +59,7 @@ describe('simple kysely interface', () => {
.where('individual_id', '=', '2')
.execute();

const upd2 = await kysely.syncTable(db, ch, upd.bookmark, indSpec);
const upd2 = await syncTable(db, ch, upd.bookmark, indSpec);
expect(upd2.rows, 'Copy 2 rows after second update').toBe(2);
});
});
235 changes: 117 additions & 118 deletions src/dbs/kysely.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,129 +12,128 @@ interface HasUpdatedAt {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type TSComplex = any;

export const kysely = {
/**
* Given a table with an updated_at column, get the latest rows ordered by updated_at and the PK,
* to get a stable load. Note that it is important to set a delay that is long enough for all reasonable
* transaction durations, otherwise rows could sneak in that were updated before the last sync. I think
* for most use cases 1 minute would be more than enough.
*/
async syncTable<
Schema,
T extends keyof Schema & string,
PK extends AnyColumn<Schema, T>,
PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never,
>(
db: Kysely<Schema>,
ch: ClickHouseClient,
bookmark: Bookmark<PKT>,
spec: {
from: T;
to: string;
pk: PK;
delaySeconds: number;
},
) {
// Type assertion: Ensure that Schema[T] extends HasUpdatedAt
// eslint-disable-next-line @typescript-eslint/no-unused-vars
type AssertSchema = Schema[T] extends HasUpdatedAt ? Schema[T] : never;
/**
* Given a table with an updated_at column, get the latest rows ordered by updated_at and the PK,
* to get a stable load. Note that it is important to set a delay that is long enough for all reasonable
* transaction durations, otherwise rows could sneak in that were updated before the last sync. I think
* for most use cases 1 minute would be more than enough.
*/
export async function syncTable<
Schema,
T extends keyof Schema & string,
PK extends AnyColumn<Schema, T>,
PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never,
>(
db: Kysely<Schema>,
ch: ClickHouseClient,
bookmark: Bookmark<PKT>,
spec: {
from: T;
to: string;
pk: PK;
delaySeconds: number;
},
) {
// Type assertion: Ensure that Schema[T] extends HasUpdatedAt
// eslint-disable-next-line @typescript-eslint/no-unused-vars
type AssertSchema = Schema[T] extends HasUpdatedAt ? Schema[T] : never;

const baseQuery = db
.selectFrom(spec.from)
.selectAll();
const baseQuery = db
.selectFrom(spec.from)
.selectAll();

return synchronizeTable({
getRows(bookmark, limit) {
type TableWhere = Parameters<typeof baseQuery['where']>;
const pkColumn = spec.pk as unknown as TableWhere[0];
const udColumn = 'updated_at' as unknown as TableWhere[0];
let completeQuery = baseQuery
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.where(udColumn, '<', sql<any>`NOW() - INTERVAL \'1 SECOND\' * ${spec.delaySeconds}`)
return synchronizeTable({
getRows(bookmark, limit) {
type TableWhere = Parameters<typeof baseQuery['where']>;
const pkColumn = spec.pk as unknown as TableWhere[0];
const udColumn = 'updated_at' as unknown as TableWhere[0];
let completeQuery = baseQuery
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.where(udColumn, '<', sql<any>`NOW() - INTERVAL \'1 SECOND\' * ${spec.delaySeconds}`)
// Too complicated to figure out how to get this type to be accurate. But it is.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (bookmark?.rowTimestamp && bookmark?.rowId) {
completeQuery = completeQuery.where((eb) => eb.or([
eb(udColumn, '>=', bookmark.rowTimestamp as TSComplex),
eb.and([
eb(udColumn, '=', bookmark.rowTimestamp as TSComplex),
eb(pkColumn, '>', bookmark.rowId as TSComplex),
])
]));
} else if (bookmark?.rowTimestamp) {
completeQuery = completeQuery.where(udColumn, '>=', bookmark.rowTimestamp as TSComplex);
}
return completeQuery
.orderBy(udColumn, 'asc')
.orderBy(pkColumn, 'asc')
.limit(limit)
.stream();
},
getBookmark(row) {
return {
// Too complicated to figure out how to get this type to be accurate. But it is.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (bookmark?.rowTimestamp && bookmark?.rowId) {
completeQuery = completeQuery.where((eb) => eb.or([
eb(udColumn, '>=', bookmark.rowTimestamp as TSComplex),
eb.and([
eb(udColumn, '=', bookmark.rowTimestamp as TSComplex),
eb(pkColumn, '>', bookmark.rowId as TSComplex),
])
]));
} else if (bookmark?.rowTimestamp) {
completeQuery = completeQuery.where(udColumn, '>=', bookmark.rowTimestamp as TSComplex);
}
return completeQuery
.orderBy(udColumn, 'asc')
.orderBy(pkColumn, 'asc')
.limit(limit)
.stream();
},
getBookmark(row) {
return {
// Too complicated to figure out how to get this type to be accurate. But it is.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
rowId: (row as any)[spec.pk],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
rowTimestamp: (row as any).updated_at,
};
},
clickhouse: ch,
tableName: spec.to,
}, bookmark);
},
/**
* Copy the contents of a table that is "forward only" - that is, the id column is enough
* to get a stable load. If you do not pass a bookmark, then we will copy all the rows
* in PK order. If you do pass a bookmark, we will copy all the rows with a PK greater
* than the provided rowId. This allows you to handle both tables that do not update existing content
* and tables that do.
*/
async copyTable<
Schema,
T extends keyof Schema & string,
PK extends AnyColumn<Schema, T>,
PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never,
>(
db: Kysely<Schema>,
ch: ClickHouseClient,
bookmark: Bookmark<PKT>,
spec: {
from: T;
to: string;
pk: PK;
rowId: (row as any)[spec.pk],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
rowTimestamp: (row as any).updated_at,
};
},
) {
const baseQuery = db.selectFrom(spec.from).selectAll();
clickhouse: ch,
tableName: spec.to,
}, bookmark);
}

/**
* Copy the contents of a table that is "forward only" - that is, the id column is enough
* to get a stable load. If you do not pass a bookmark, then we will copy all the rows
* in PK order. If you do pass a bookmark, we will copy all the rows with a PK greater
* than the provided rowId. This allows you to handle both tables that do not update existing content
* and tables that do.
*/
export async function copyTable<
Schema,
T extends keyof Schema & string,
PK extends AnyColumn<Schema, T>,
PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never,
>(
db: Kysely<Schema>,
ch: ClickHouseClient,
bookmark: Bookmark<PKT>,
spec: {
from: T;
to: string;
pk: PK;
},
) {
const baseQuery = db.selectFrom(spec.from).selectAll();

return synchronizeTable({
getRows(bookmark, limit) {
type TableWhere = Parameters<typeof baseQuery['where']>;
let completeQuery = baseQuery;
const pkColumn = spec.pk as unknown as TableWhere[0];
if (bookmark?.rowId) {
completeQuery = completeQuery.where(
pkColumn,
'>',
// Too complicated to figure out how to get this type to be accurate. But it is.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
bookmark.rowId as any,
);
}
return completeQuery
.orderBy(pkColumn, 'asc')
.limit(limit)
.stream();
},
getBookmark(row) {
return {
return synchronizeTable({
getRows(bookmark, limit) {
type TableWhere = Parameters<typeof baseQuery['where']>;
let completeQuery = baseQuery;
const pkColumn = spec.pk as unknown as TableWhere[0];
if (bookmark?.rowId) {
completeQuery = completeQuery.where(
pkColumn,
'>',
// Too complicated to figure out how to get this type to be accurate. But it is.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
rowId: (row as any)[spec.pk],
};
},
clickhouse: ch,
tableName: spec.to,
}, bookmark);
},
};
bookmark.rowId as any,
);
}
return completeQuery
.orderBy(pkColumn, 'asc')
.limit(limit)
.stream();
},
getBookmark(row) {
return {
// Too complicated to figure out how to get this type to be accurate. But it is.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
rowId: (row as any)[spec.pk],
};
},
clickhouse: ch,
tableName: spec.to,
}, bookmark);
}

0 comments on commit f9809a8

Please sign in to comment.