Skip to content

Commit

Permalink
feat(migrations): add Clickhouse db migration handling
Browse files Browse the repository at this point in the history
  • Loading branch information
djMax committed Jan 12, 2024
1 parent 9775aa6 commit abd86ae
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 116 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ await synchronizeTable({
}, {})
```

## Migrations

This module includes a "callable" version of [clickhouse-migrations](https://github.com/VVVi/clickhouse-migrations) which makes it easier
to apply migrations as part of a workflow.

## Kysely

We have custom implentnations for Kysely that make things even easier. To copy a fact table from a Kysely DB (including nice autocomplete):
Expand All @@ -56,4 +61,4 @@ await copyTable(db, ch, {}, {
to: 'identity__address_types',
pk: 'address_type_id',
});
```
```
17 changes: 11 additions & 6 deletions __tests__/db.fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import path from 'path';
import { Pool } from 'pg';
import { createClient } from '@clickhouse/client';

import { applyMigrationsInDirectory } from '../src/migrations';

export function pgPool(db: string) {
return new Pool({
database: db,
Expand Down Expand Up @@ -40,15 +42,18 @@ export function chdb(db: string) {
}

export async function createChDb(db: string) {
// Create the clickhouse db and load the schema
const ch = chdb('default');
await ch.command({ query: `DROP DATABASE IF EXISTS ${db}` });
await ch.command({ query: `CREATE DATABASE ${db}` });
await ch.close();

const ch2 = chdb(db);
const cmds = fs.readFileSync(path.resolve(__dirname, 'db/clickhouse.sql'), 'utf8').split('---');
await cmds.reduce((prev, cmd) => prev.then(() => ch2.command({ query: cmd }).then(() => undefined)), Promise.resolve(undefined));
return ch2;
// Create the clickhouse db and load the schema
await applyMigrationsInDirectory({
host: process.env.CHHOST || 'http://localhost:8123',
username: process.env.CHUSERNAME || 'default',
password: process.env.CHPASSWORD || '',
database: db,
}, path.resolve(__dirname, 'migrations'));
return chdb(db);
}

if (require.main === module) {
Expand Down
10 changes: 9 additions & 1 deletion __tests__/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import path from 'path';

import { afterEach, beforeEach, describe, expect, test } from 'vitest';
import { ClickHouseClient } from '@clickhouse/client';
import { Kysely } from 'kysely';

import { synchronizeTable } from '../src/index';
import { getMigrationsInDirectory, getMigrationsToApply, synchronizeTable } from '../src/index';

import { DB } from './generated/database';
import { createChDb, createPgDb } from './db.fixtures';
Expand Down Expand Up @@ -77,4 +79,10 @@ describe('move tables from postgres to clickhouse', () => {
}, {});
expect(detail.rows).toBe(2);
});

test('migrations are recorded', async () => {
const migrations = getMigrationsInDirectory(path.resolve(__dirname, 'migrations'));
const toApply = await getMigrationsToApply(ch, migrations);
expect(toApply.length).toBe(0);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE identity__address_types (
created_at DateTime64
) ENGINE = ReplacingMergeTree(created_at)
ORDER BY address_type_id;
---

CREATE TABLE identity__individuals (
individual_id Int32,
individual_uuid String,
Expand Down
12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,29 @@
"coconfig": "@openapi-typescript-infra/coconfig"
},
"optionalDependencies": {
"kysely": "^0.27.1"
"kysely": "^0.27.2"
},
"devDependencies": {
"@openapi-typescript-infra/coconfig": "^4.3.0",
"@semantic-release/exec": "^6.0.3",
"@semantic-release/github": "^9.2.6",
"@types/node": "^20.10.6",
"@types/node": "^20.11.0",
"@types/pg": "^8.10.9",
"@types/pg-cursor": "^2.7.2",
"@typescript-eslint/eslint-plugin": "^6.17.0",
"@typescript-eslint/parser": "^6.17.0",
"@typescript-eslint/eslint-plugin": "^6.18.1",
"@typescript-eslint/parser": "^6.18.1",
"coconfig": "^1.4.1",
"eslint": "^8.56.0",
"eslint-config-prettier": "^9.1.0",
"eslint-import-resolver-typescript": "^3.6.1",
"eslint-plugin-import": "^2.29.1",
"kysely": "^0.27.1",
"kysely": "^0.27.2",
"kysely-codegen": "^0.11.0",
"pg": "^8.11.3",
"pg-cursor": "^2.10.3",
"ts-node": "^10.9.2",
"typescript": "^5.3.3",
"vitest": "^1.1.2"
"vitest": "^1.1.3"
},
"dependencies": {
"@clickhouse/client": "^0.2.7"
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './types';
export * from './stream-copy';
export * from './stream-copy';
export * from './migrations';
151 changes: 151 additions & 0 deletions src/migrations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import fs from 'fs';
import { createHash } from 'crypto';
import { ClickHouseClient, createClient } from "@clickhouse/client";
import { sql_queries, sql_sets } from './sql-queries';
import { NodeClickHouseClientConfigOptions } from '@clickhouse/client/dist/client';

export function createDatabase(clickhouse: ClickHouseClient, database: string, { engine = 'Atomic' }: { engine?: string }) {
return clickhouse.exec({
query: `CREATE DATABASE IF NOT EXISTS ${database} ENGINE = ${engine}`,
clickhouse_settings: {
wait_end_of_query: 1,
},
});
}

export async function initializeMigrationTable(clickhouse: ClickHouseClient) {
const q = `CREATE TABLE IF NOT EXISTS _migrations (
uid UUID DEFAULT generateUUIDv4(),
version UInt32,
checksum String,
migration_name String,
applied_at DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY tuple(applied_at)`;

return clickhouse.exec({
query: q,
clickhouse_settings: {
wait_end_of_query: 1,
},
});
}

interface Migration {
version: number;
filename: string;
commands: string;
}

interface CompletedMigration {
version: number;
checksum: string;
migration_name: string;
}

export async function getMigrationsToApply(clickhouse: ClickHouseClient, migrations: Migration[]) {
const alreadyAppliedMigrations = await clickhouse.query({
query: `SELECT version, checksum, migration_name FROM _migrations ORDER BY version`,
format: 'JSONEachRow',
})
.then((rz) => rz.json<CompletedMigration[]>())
.then((rows) => rows.reduce((acc, row) => {
acc[row.version] = row;
return acc;
}, {} as Record<number, CompletedMigration>));

Object.values(alreadyAppliedMigrations).forEach((migration) => {
if (!migrations.find((m) => m.version === migration.version)) {
throw new Error(`Migration ${migration.version} has been applied but no longer exists`);
}
});

const appliedMigrations = [] as Migration[];

for (const migration of migrations) {
const checksum = createHash('md5').update(migration.commands).digest('hex');

if (alreadyAppliedMigrations[migration.version]) {
// Check if migration file was not changed after apply.
if (alreadyAppliedMigrations[migration.version].checksum !== checksum) {
throw new Error(`A migration file should't be changed after apply. Please, restore content of the ${alreadyAppliedMigrations[migration.version].migration_name
} migrations.`)
}

// Skip if a migration is already applied.
continue;
}
appliedMigrations.push(migration);
}

return appliedMigrations;
}

export async function applyMigrations(clickhouse: ClickHouseClient, migrations: Migration[]) {
for (const migration of migrations) {
const checksum = createHash('md5').update(migration.commands).digest('hex');

// Extract sql from the migration.
const queries = sql_queries(migration.commands);
const sets = sql_sets(migration.commands);

for (const query of queries) {
try {
await clickhouse.exec({
query: query,
clickhouse_settings: sets,
});
} catch (e) {
throw new Error(
`the migrations ${migration.filename} has an error. Please, fix it (be sure that already executed parts of the migration would not be run second time) and re-run migration script.
${(e as Error).message}`);
}
}

try {
await clickhouse.insert({
table: '_migrations',
values: [{ version: migration.version, checksum: checksum, migration_name: migration.filename }],
format: 'JSONEachRow',
});
} catch (e: unknown) {
throw new Error(`can't insert a data into the table _migrations: ${(e as Error).message}`);
}
}
}

export function getMigrationsInDirectory(directory: string): Migration[] {
const migrations = [] as Migration[];

fs.readdirSync(directory).forEach((filename) => {
// Manage only .sql files.
if (!filename.endsWith('.sql')) return;

const version = Number(filename.split('_')[0]);
const commands = fs.readFileSync(`${directory}/${filename}`, 'utf8');

migrations.push({
version,
filename,
commands,
});
});

return migrations.sort((a, b) => a.version - b.version);
}

export async function applyMigrationsInDirectory(config: NodeClickHouseClientConfigOptions & { database: string }, directory: string) {
const defaultDb = createClient({
...config,
database: undefined,
});
await createDatabase(defaultDb, config.database, { engine: 'Atomic' });
const targetDb = createClient(config);
await initializeMigrationTable(targetDb);
const migrations = getMigrationsInDirectory(directory);
const toApply = await getMigrationsToApply(targetDb, migrations);
if (toApply.length > 0) {
return applyMigrations(targetDb, migrations);
}
}
53 changes: 53 additions & 0 deletions src/migrations/sql-queries.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Extract sql queries from migrations.
const sql_queries = (content: string): string[] => {
const queries = content
.replace(/(--|#!|#\s).*(\n|\r\n|\r|$)/gm, '\n')
.replace(/^\s*(SET\s).*(\n|\r\n|\r|$)/gm, '')
.replace(/(\n|\r\n|\r)/gm, ' ')
.replace(/\s+/g, ' ')
.split(';')
.map((el: string) => el.trim())
.filter((el: string) => el.length != 0);

return queries;
};

// Extract query settings from migrations.
const sql_sets = (content: string) => {
const sets: { [key: string]: string } = {};

const sets_arr = content
.replace(/(--|#!|#\s).*(\n|\r\n|\r|$)/gm, '\n')
.replace(/^\s*(?!SET\s).*(\n|\r\n|\r|$)/gm, '')
.replace(/^\s*(SET\s)/gm, '')
.replace(/(\n|\r\n|\r)/gm, ' ')
.replace(/\s+/g, '')
.split(';');

sets_arr.forEach((set_full) => {
const set = set_full.split('=');
if (set[0]) {
sets[set[0]] = set[1];
}
});

return sets;
};

export { sql_queries, sql_sets };

// -- any
// SET allow_experimental_object_type = 1; --set option
// SET allow_experimental_object_new = 1;
// SELECT * FROM events

// sdfsfdsd
// -- asssss
// --asdf
// sdfsdf
// sdfsfs
// SET a=1
// asdf
// SET f=22

// sdf
Loading

0 comments on commit abd86ae

Please sign in to comment.