diff --git a/README.md b/README.md index 2df6687..5dc07e2 100644 --- a/README.md +++ b/README.md @@ -43,3 +43,17 @@ await synchronizeTable({ }, }, {}) ``` + +## Kysely + +We have custom implentnations for Kysely that make things even easier. To copy a fact table from a Kysely DB (including nice autocomplete): + +```typescript +import { copyTable } from '@sesamecare-oss/to-clickhouse/kysely'; + +await copyTable(db, ch, {}, { + from: 'address_types', + to: 'identity__address_types', + pk: 'address_type_id', +}); +``` \ No newline at end of file diff --git a/__tests__/kysely.spec.ts b/__tests__/kysely.spec.ts index 9de569b..49d6745 100644 --- a/__tests__/kysely.spec.ts +++ b/__tests__/kysely.spec.ts @@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from 'vitest'; import { ClickHouseClient } from '@clickhouse/client'; import { Kysely } from 'kysely'; -import { kysely } from '../src/dbs/kysely'; +import { copyTable, syncTable } from '../src/dbs/kysely'; import { createChDb, createPgDb } from './db.fixtures'; import { kyselyDb } from './kysely.fixtures'; @@ -28,7 +28,7 @@ describe('simple kysely interface', () => { }); test('should sync to clickhouse', async () => { - const detail = await kysely.copyTable(db, ch, {}, { + const detail = await copyTable(db, ch, {}, { from: 'address_types', to: 'identity__address_types', pk: 'address_type_id', @@ -41,7 +41,7 @@ describe('simple kysely interface', () => { pk: 'individual_id', delaySeconds: 0, } as const; - const ind = await kysely.syncTable(db, ch, {}, indSpec); + const ind = await syncTable(db, ch, {}, indSpec); expect(ind.rows, 'Table should copy 3 rows').toBe(3); await new Promise((resolve) => setTimeout(resolve, 250)); @@ -50,7 +50,7 @@ describe('simple kysely interface', () => { .where('individual_id', '=', '1') .execute(); - const upd = await kysely.syncTable(db, ch, ind.bookmark, indSpec); + const upd = await syncTable(db, ch, ind.bookmark, indSpec); expect(upd.rows, 'Table should still copy 3 rows because updated_at >= should match previous rows too').toBe(3); await new Promise((resolve) => setTimeout(resolve, 250)); @@ -59,7 +59,7 @@ describe('simple kysely interface', () => { .where('individual_id', '=', '2') .execute(); - const upd2 = await kysely.syncTable(db, ch, upd.bookmark, indSpec); + const upd2 = await syncTable(db, ch, upd.bookmark, indSpec); expect(upd2.rows, 'Copy 2 rows after second update').toBe(2); }); }); diff --git a/src/dbs/kysely.ts b/src/dbs/kysely.ts index 1741f91..c339703 100644 --- a/src/dbs/kysely.ts +++ b/src/dbs/kysely.ts @@ -12,129 +12,128 @@ interface HasUpdatedAt { // eslint-disable-next-line @typescript-eslint/no-explicit-any type TSComplex = any; -export const kysely = { - /** - * Given a table with an updated_at column, get the latest rows ordered by updated_at and the PK, - * to get a stable load. Note that it is important to set a delay that is long enough for all reasonable - * transaction durations, otherwise rows could sneak in that were updated before the last sync. I think - * for most use cases 1 minute would be more than enough. - */ - async syncTable< - Schema, - T extends keyof Schema & string, - PK extends AnyColumn, - PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never, - >( - db: Kysely, - ch: ClickHouseClient, - bookmark: Bookmark, - spec: { - from: T; - to: string; - pk: PK; - delaySeconds: number; - }, - ) { - // Type assertion: Ensure that Schema[T] extends HasUpdatedAt - // eslint-disable-next-line @typescript-eslint/no-unused-vars - type AssertSchema = Schema[T] extends HasUpdatedAt ? Schema[T] : never; +/** + * Given a table with an updated_at column, get the latest rows ordered by updated_at and the PK, + * to get a stable load. Note that it is important to set a delay that is long enough for all reasonable + * transaction durations, otherwise rows could sneak in that were updated before the last sync. I think + * for most use cases 1 minute would be more than enough. + */ +export async function syncTable< + Schema, + T extends keyof Schema & string, + PK extends AnyColumn, + PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never, +>( + db: Kysely, + ch: ClickHouseClient, + bookmark: Bookmark, + spec: { + from: T; + to: string; + pk: PK; + delaySeconds: number; + }, +) { + // Type assertion: Ensure that Schema[T] extends HasUpdatedAt + // eslint-disable-next-line @typescript-eslint/no-unused-vars + type AssertSchema = Schema[T] extends HasUpdatedAt ? Schema[T] : never; - const baseQuery = db - .selectFrom(spec.from) - .selectAll(); + const baseQuery = db + .selectFrom(spec.from) + .selectAll(); - return synchronizeTable({ - getRows(bookmark, limit) { - type TableWhere = Parameters; - const pkColumn = spec.pk as unknown as TableWhere[0]; - const udColumn = 'updated_at' as unknown as TableWhere[0]; - let completeQuery = baseQuery - // eslint-disable-next-line @typescript-eslint/no-explicit-any - .where(udColumn, '<', sql`NOW() - INTERVAL \'1 SECOND\' * ${spec.delaySeconds}`) + return synchronizeTable({ + getRows(bookmark, limit) { + type TableWhere = Parameters; + const pkColumn = spec.pk as unknown as TableWhere[0]; + const udColumn = 'updated_at' as unknown as TableWhere[0]; + let completeQuery = baseQuery + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .where(udColumn, '<', sql`NOW() - INTERVAL \'1 SECOND\' * ${spec.delaySeconds}`) + // Too complicated to figure out how to get this type to be accurate. But it is. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if (bookmark?.rowTimestamp && bookmark?.rowId) { + completeQuery = completeQuery.where((eb) => eb.or([ + eb(udColumn, '>=', bookmark.rowTimestamp as TSComplex), + eb.and([ + eb(udColumn, '=', bookmark.rowTimestamp as TSComplex), + eb(pkColumn, '>', bookmark.rowId as TSComplex), + ]) + ])); + } else if (bookmark?.rowTimestamp) { + completeQuery = completeQuery.where(udColumn, '>=', bookmark.rowTimestamp as TSComplex); + } + return completeQuery + .orderBy(udColumn, 'asc') + .orderBy(pkColumn, 'asc') + .limit(limit) + .stream(); + }, + getBookmark(row) { + return { // Too complicated to figure out how to get this type to be accurate. But it is. // eslint-disable-next-line @typescript-eslint/no-explicit-any - if (bookmark?.rowTimestamp && bookmark?.rowId) { - completeQuery = completeQuery.where((eb) => eb.or([ - eb(udColumn, '>=', bookmark.rowTimestamp as TSComplex), - eb.and([ - eb(udColumn, '=', bookmark.rowTimestamp as TSComplex), - eb(pkColumn, '>', bookmark.rowId as TSComplex), - ]) - ])); - } else if (bookmark?.rowTimestamp) { - completeQuery = completeQuery.where(udColumn, '>=', bookmark.rowTimestamp as TSComplex); - } - return completeQuery - .orderBy(udColumn, 'asc') - .orderBy(pkColumn, 'asc') - .limit(limit) - .stream(); - }, - getBookmark(row) { - return { - // Too complicated to figure out how to get this type to be accurate. But it is. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - rowId: (row as any)[spec.pk], - // eslint-disable-next-line @typescript-eslint/no-explicit-any - rowTimestamp: (row as any).updated_at, - }; - }, - clickhouse: ch, - tableName: spec.to, - }, bookmark); - }, - /** - * Copy the contents of a table that is "forward only" - that is, the id column is enough - * to get a stable load. If you do not pass a bookmark, then we will copy all the rows - * in PK order. If you do pass a bookmark, we will copy all the rows with a PK greater - * than the provided rowId. This allows you to handle both tables that do not update existing content - * and tables that do. - */ - async copyTable< - Schema, - T extends keyof Schema & string, - PK extends AnyColumn, - PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never, - >( - db: Kysely, - ch: ClickHouseClient, - bookmark: Bookmark, - spec: { - from: T; - to: string; - pk: PK; + rowId: (row as any)[spec.pk], + // eslint-disable-next-line @typescript-eslint/no-explicit-any + rowTimestamp: (row as any).updated_at, + }; }, - ) { - const baseQuery = db.selectFrom(spec.from).selectAll(); + clickhouse: ch, + tableName: spec.to, + }, bookmark); +} + +/** + * Copy the contents of a table that is "forward only" - that is, the id column is enough + * to get a stable load. If you do not pass a bookmark, then we will copy all the rows + * in PK order. If you do pass a bookmark, we will copy all the rows with a PK greater + * than the provided rowId. This allows you to handle both tables that do not update existing content + * and tables that do. + */ +export async function copyTable< + Schema, + T extends keyof Schema & string, + PK extends AnyColumn, + PKT extends string | number = Schema[T][PK] extends string | number ? Schema[T][PK] : never, +>( + db: Kysely, + ch: ClickHouseClient, + bookmark: Bookmark, + spec: { + from: T; + to: string; + pk: PK; + }, +) { + const baseQuery = db.selectFrom(spec.from).selectAll(); - return synchronizeTable({ - getRows(bookmark, limit) { - type TableWhere = Parameters; - let completeQuery = baseQuery; - const pkColumn = spec.pk as unknown as TableWhere[0]; - if (bookmark?.rowId) { - completeQuery = completeQuery.where( - pkColumn, - '>', - // Too complicated to figure out how to get this type to be accurate. But it is. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - bookmark.rowId as any, - ); - } - return completeQuery - .orderBy(pkColumn, 'asc') - .limit(limit) - .stream(); - }, - getBookmark(row) { - return { + return synchronizeTable({ + getRows(bookmark, limit) { + type TableWhere = Parameters; + let completeQuery = baseQuery; + const pkColumn = spec.pk as unknown as TableWhere[0]; + if (bookmark?.rowId) { + completeQuery = completeQuery.where( + pkColumn, + '>', // Too complicated to figure out how to get this type to be accurate. But it is. // eslint-disable-next-line @typescript-eslint/no-explicit-any - rowId: (row as any)[spec.pk], - }; - }, - clickhouse: ch, - tableName: spec.to, - }, bookmark); - }, -}; + bookmark.rowId as any, + ); + } + return completeQuery + .orderBy(pkColumn, 'asc') + .limit(limit) + .stream(); + }, + getBookmark(row) { + return { + // Too complicated to figure out how to get this type to be accurate. But it is. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + rowId: (row as any)[spec.pk], + }; + }, + clickhouse: ch, + tableName: spec.to, + }, bookmark); +}