From 0f3aec52931476dd5693762763444a6e407b947a Mon Sep 17 00:00:00 2001 From: Alexey Orlenko Date: Mon, 3 Feb 2025 13:49:44 +0100 Subject: [PATCH] test: run query engine tests with query compiler Closes: https://linear.app/prisma-company/issue/ORM-555/make-prisma-engines-tests-run-with-query-compiler --- .prettierrc.toml | 3 + libs/driver-adapters/executor/package.json | 4 +- libs/driver-adapters/executor/src/testd-qc.ts | 358 ++++++++++++++++++ libs/driver-adapters/executor/src/testd-qe.ts | 4 +- .../executor/src/types/jsonRpc.ts | 80 +++- libs/driver-adapters/executor/src/utils.ts | 4 + libs/driver-adapters/pnpm-workspace.yaml | 1 + shell.nix | 13 +- 8 files changed, 439 insertions(+), 28 deletions(-) create mode 100644 .prettierrc.toml create mode 100644 libs/driver-adapters/executor/src/testd-qc.ts diff --git a/.prettierrc.toml b/.prettierrc.toml new file mode 100644 index 000000000000..addd6d363c2e --- /dev/null +++ b/.prettierrc.toml @@ -0,0 +1,3 @@ +tabWidth = 2 +semi = false +singleQuote = true diff --git a/libs/driver-adapters/executor/package.json b/libs/driver-adapters/executor/package.json index 334f5dbc1c88..f8f9e2f18300 100644 --- a/libs/driver-adapters/executor/package.json +++ b/libs/driver-adapters/executor/package.json @@ -9,8 +9,9 @@ "description": "", "private": true, "scripts": { - "build": "tsup ./src/testd-qe.ts ./src/demo-se.ts ./src/bench.ts --format esm --dts", + "build": "tsup ./src/testd-qe.ts ./src/testd-qc.ts ./src/demo-se.ts ./src/bench.ts --format esm --dts", "test:qe": "node --import tsx ./src/testd-qe.ts", + "test:qc": "node --import tsx ./src/testd-qc.ts", "demo:se": "node --import tsx ./src/demo-se.ts", "demo:qc": "node --import tsx ./src/demo-qc.ts", "clean:d1": "rm -rf ../../connector-test-kit-rs/query-engine-tests/.wrangler" @@ -27,6 +28,7 @@ "@prisma/adapter-pg": "workspace:*", "@prisma/adapter-planetscale": "workspace:*", "@prisma/bundled-js-drivers": "workspace:*", + "@prisma/client-engine-runtime": "workspace:*", "@prisma/driver-adapter-utils": "workspace:*", "mitata": "0.1.11", "query-engine-wasm-baseline": "npm:@prisma/query-engine-wasm@0.0.19", diff --git a/libs/driver-adapters/executor/src/testd-qc.ts b/libs/driver-adapters/executor/src/testd-qc.ts new file mode 100644 index 000000000000..e5588a896c6e --- /dev/null +++ b/libs/driver-adapters/executor/src/testd-qc.ts @@ -0,0 +1,358 @@ +import * as readline from 'node:readline' +import * as S from '@effect/schema/Schema' +import { + bindAdapter, + ConnectionInfo, + ErrorCapturingDriverAdapter, + Queryable, +} from '@prisma/driver-adapter-utils' +import { + QueryInterpreter, + TransactionManager, + IsolationLevel, +} from '@prisma/client-engine-runtime' + +import type { DriverAdaptersManager } from './driver-adapters-manager' +import { Env, jsonRpc } from './types' +import * as qc from './query-compiler' +import { assertNever, debug, err } from './utils' +import { setupDriverAdaptersManager } from './setup' +import { SchemaId, JsonProtocolQuery } from './types/jsonRpc' + +async function main(): Promise { + const env = S.decodeUnknownSync(Env)(process.env) + console.log('[env]', env) + + const iface = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false, + }) + + iface.on('line', async (line) => { + try { + const request = S.decodeSync(jsonRpc.RequestFromString)(line) + debug(`Got a request: ${line}`) + + try { + const response = await handleRequest(request, env) + respondOk(request.id, response) + } catch (err) { + debug('[nodejs] Error from request handler: ', err) + respondErr(request.id, { + code: 1, + message: err.stack ?? err.toString(), + }) + } + } catch (err) { + debug('Received non-json line: ', line) + console.error(err) + } + }) +} + +const state: Record< + SchemaId, + { + compiler: qc.QueryCompiler + driverAdapterManager: DriverAdaptersManager + adapter: ErrorCapturingDriverAdapter + transactionManager: TransactionManager + logs: string[] + } +> = {} + +async function handleRequest( + { method, params }: jsonRpc.Request, + env: Env, +): Promise { + if (method !== 'initializeSchema') { + if (state[params.schemaId] === undefined) { + throw new Error( + `Schema with id ${params.schemaId} is not initialized. Please call 'initializeSchema' first.`, + ) + } + } + + switch (method) { + case 'initializeSchema': { + debug('Got `initializeSchema', params) + + const { url, schema, schemaId, migrationScript } = params + + const driverAdapterManager = await setupDriverAdaptersManager( + env, + migrationScript, + ) + + const { compiler, adapter } = await initQc({ + url, + driverAdapterManager, + schema, + }) + + const transactionManager = new TransactionManager({ + driverAdapter: adapter, + clientVersion: '0.0.0', + }) + + state[schemaId] = { + compiler, + driverAdapterManager, + adapter, + transactionManager, + logs: [], + } + + if (adapter && adapter.getConnectionInfo) { + const maxBindValuesResult = adapter + .getConnectionInfo() + .map((info) => info.maxBindValues) + if (maxBindValuesResult.ok) { + return { maxBindValues: maxBindValuesResult.value } + } + } + + return { maxBindValues: null } + } + + case 'query': { + debug('Got `query`', params) + const { query, schemaId, txId } = params + const { compiler, adapter, transactionManager, logs } = state[schemaId] + + const executeQuery = async ( + queryable: Queryable, + query: JsonProtocolQuery, + ) => { + // TODO: this shouldn't be async anymore, should it? + const queryPlanString = await compiler.compile(JSON.stringify(query)) + const queryPlan = JSON.parse(queryPlanString) + + const interpreter = new QueryInterpreter({ + queryable, + placeholderValues: {}, + onQuery: (event) => { + logs.push(JSON.stringify(event)) + }, + }) + + return interpreter.run(queryPlan) + } + + const executeIndependentBatch = async ( + queries: readonly JsonProtocolQuery[], + ) => Promise.all(queries.map((query) => executeQuery(adapter, query))) + + const executeTransactionalBatch = async ( + queries: readonly JsonProtocolQuery[], + isolationLevel?: IsolationLevel, + ) => { + const txInfo = await transactionManager.startTransaction({ + isolationLevel, + }) + const queryable = transactionManager.getTransaction( + txInfo, + 'batch transaction query', + ) + + try { + const results: unknown[] = [] + + for (const query of queries) { + const result = await executeQuery(queryable, query) + results.push(result) + } + + await transactionManager.commitTransaction(txInfo.id) + + return results + } catch (err) { + await transactionManager + .rollbackTransaction(txInfo.id) + .catch(console.error) + throw err + } + } + + if ('batch' in query) { + const { batch, transaction } = query + + const results = transaction + ? await executeTransactionalBatch( + batch, + parseIsolationLevel(transaction.isolationLevel), + ) + : await executeIndependentBatch(batch) + + debug('🟢 Batch query results: ', results) + + return { + batchResult: batch.map((query, index) => ({ + data: { [query.action]: results[index] }, + })), + } + } else { + const queryable = txId + ? transactionManager.getTransaction( + { id: txId, payload: undefined }, + query.action, + ) + : adapter + + if (!queryable) { + throw new Error( + `No transaction with id ${txId} found. Please call 'startTx' first.`, + ) + } + + const result = await executeQuery(queryable, query) + + debug('🟢 Query result: ', result) + + return { data: { [query.action]: result } } + } + } + + case 'startTx': { + debug('Got `startTx`', params) + + const { schemaId, options } = params + const { transactionManager } = state[schemaId] + + return await transactionManager.startTransaction({ + maxWait: options.max_wait, + timeout: options.timeout, + isolationLevel: parseIsolationLevel(options.isolation_level), + }) + } + + case 'commitTx': { + debug('Got `commitTx`', params) + + const { schemaId, txId } = params + const { transactionManager } = state[schemaId] + + return await transactionManager.commitTransaction(txId) + } + + case 'rollbackTx': { + debug('Got `rollbackTx`', params) + + const { schemaId, txId } = params + const { transactionManager } = state[schemaId] + + return await transactionManager.rollbackTransaction(txId) + } + + case 'teardown': { + debug('Got `teardown`', params) + + const { schemaId } = params + const { driverAdapterManager } = state[schemaId] + + await driverAdapterManager.teardown() + delete state[schemaId] + + return {} + } + + case 'getLogs': { + const { schemaId } = params + return state[schemaId].logs + } + + default: { + assertNever(method, `Unknown method: \`${method}\``) + } + } +} + +function respondErr(requestId: number, error: jsonRpc.RpcError) { + const msg: jsonRpc.ErrResponse = { + jsonrpc: '2.0', + id: requestId, + error, + } + console.log(JSON.stringify(msg)) +} + +function respondOk(requestId: number, payload: unknown) { + const msg: jsonRpc.OkResponse = { + jsonrpc: '2.0', + id: requestId, + result: payload, + } + console.log(JSON.stringify(msg)) +} + +type InitQueryCompilerParams = { + driverAdapterManager: DriverAdaptersManager + url: string + schema: string +} + +async function initQc({ + driverAdapterManager, + url, + schema, +}: InitQueryCompilerParams) { + const adapter = await driverAdapterManager.connect({ url }) + const errorCapturingAdapter = bindAdapter(adapter) + + let connectionInfo: ConnectionInfo = {} + if (errorCapturingAdapter.getConnectionInfo) { + const result = errorCapturingAdapter.getConnectionInfo() + if (!result.ok) { + throw result.error + } + connectionInfo = result.value + } + + const compiler = await qc.initQueryCompiler({ + datamodel: schema, + flavour: adapter.provider, + connectionInfo, + }) + + return { + compiler, + adapter: errorCapturingAdapter, + } +} + +function parseIsolationLevel( + level: string | null | undefined, +): IsolationLevel | undefined { + switch (level) { + case null: + case undefined: + return undefined + + case 'ReadCommitted': + case 'Read Committed': + return IsolationLevel.ReadCommitted + + case 'ReadUncommitted': + case 'Read Uncommitted': + return IsolationLevel.ReadUncommitted + + case 'RepeatableRead': + case 'Repeatable Read': + return IsolationLevel.RepeatableRead + + case 'Serializable': + return IsolationLevel.Serializable + + case 'Snapshot': + return IsolationLevel.Snapshot + + default: + // We don't validate the isolation level on the RPC schema level because some tests + // rely on sending invalid isolation levels to test error handling, and those invalid + // levels must be forwarded to the query engine as-is in `testd-qe.ts`. + throw new Error(`Unknown isolation level: ${level}`) + } +} + +main().catch(err) diff --git a/libs/driver-adapters/executor/src/testd-qe.ts b/libs/driver-adapters/executor/src/testd-qe.ts index 3095495b558a..4444f85666a1 100644 --- a/libs/driver-adapters/executor/src/testd-qe.ts +++ b/libs/driver-adapters/executor/src/testd-qe.ts @@ -7,7 +7,7 @@ import { Env, jsonRpc } from './types' import * as qe from './query-engine' import { nextRequestId } from './requestId' import { createRNEngineConnector } from './rn' -import { debug, err } from './utils' +import { assertNever, debug, err } from './utils' import { setupDriverAdaptersManager } from './setup' import { SchemaId } from './types/jsonRpc' @@ -179,7 +179,7 @@ async function handleRequest( return state[schemaId].logs } default: { - throw new Error(`Unknown method: \`${method}\``) + assertNever(method, `Unknown method: \`${method}\``) } } } diff --git a/libs/driver-adapters/executor/src/types/jsonRpc.ts b/libs/driver-adapters/executor/src/types/jsonRpc.ts index 194150211d84..6c2380772269 100644 --- a/libs/driver-adapters/executor/src/types/jsonRpc.ts +++ b/libs/driver-adapters/executor/src/types/jsonRpc.ts @@ -1,6 +1,8 @@ import * as S from '@effect/schema/Schema' -const SchemaId = S.number.pipe(S.int(), S.nonNegative()).pipe(S.brand('SchemaId')) +const SchemaId = S.number + .pipe(S.int(), S.nonNegative()) + .pipe(S.brand('SchemaId')) export type SchemaId = S.Schema.Type const InitializeSchemaParams = S.struct({ @@ -9,16 +11,58 @@ const InitializeSchemaParams = S.struct({ url: S.string, migrationScript: S.optional(S.string), }) -export type InitializeSchemaParams = S.Schema.Type +export type InitializeSchemaParams = S.Schema.Type< + typeof InitializeSchemaParams +> const InitializeSchema = S.struct({ method: S.literal('initializeSchema'), params: InitializeSchemaParams, }) +const JsonProtocolQuery = S.struct({ + modelName: S.optional(S.nullable(S.string)), + action: S.union( + S.literal('findUnique'), + S.literal('findUniqueOrThrow'), + S.literal('findFirst'), + S.literal('findFirstOrThrow'), + S.literal('findMany'), + S.literal('createOne'), + S.literal('createMany'), + S.literal('createManyAndReturn'), + S.literal('updateOne'), + S.literal('updateMany'), + S.literal('updateManyAndReturn'), + S.literal('deleteOne'), + S.literal('deleteMany'), + S.literal('upsertOne'), + S.literal('aggregate'), + S.literal('groupBy'), + S.literal('executeRaw'), + S.literal('queryRaw'), + S.literal('runCommandRaw'), + S.literal('findRaw'), + S.literal('aggregateRaw'), + ), + query: S.record(S.string, S.unknown), +}) +export type JsonProtocolQuery = S.Schema.Type + +const JsonProtocolBatchQuery = S.struct({ + batch: S.array(JsonProtocolQuery), + transaction: S.optional( + S.nullable( + S.struct({ + isolationLevel: S.optional(S.nullable(S.string)), + }), + ), + ), +}) + const QueryParams = S.struct({ schemaId: SchemaId, - query: S.record(S.string, S.unknown), + query: S.union(JsonProtocolQuery, JsonProtocolBatchQuery), txId: S.nullable(S.string), }) export type QueryParams = S.Schema.Type @@ -28,9 +72,15 @@ const Query = S.struct({ params: QueryParams, }) +const TxOptions = S.struct({ + max_wait: S.number.pipe(S.int(), S.nonNegative()), + timeout: S.number.pipe(S.int(), S.nonNegative()), + isolation_level: S.optional(S.nullable(S.string)), +}) + const StartTxParams = S.struct({ schemaId: SchemaId, - options: S.unknown, + options: TxOptions, }) export type StartTxParams = S.Schema.Type @@ -110,21 +160,21 @@ export type RequestFromString = S.Schema.Type export type Response = OkResponse | ErrResponse export interface OkResponse { - jsonrpc: '2.0' - result: unknown - error?: never - id: number + jsonrpc: '2.0' + result: unknown + error?: never + id: number } export interface ErrResponse { - jsonrpc: '2.0' - error: RpcError - result?: never - id: number + jsonrpc: '2.0' + error: RpcError + result?: never + id: number } export interface RpcError { - code: number - message: string - data?: unknown + code: number + message: string + data?: unknown } diff --git a/libs/driver-adapters/executor/src/utils.ts b/libs/driver-adapters/executor/src/utils.ts index ecd1ca6ac40d..d80a37e705bd 100644 --- a/libs/driver-adapters/executor/src/utils.ts +++ b/libs/driver-adapters/executor/src/utils.ts @@ -63,3 +63,7 @@ export const debug = (() => { // error logger export const err = (...args: any[]) => console.error('[nodejs] ERROR:', ...args) + +export function assertNever(_: never, message: string): never { + throw new Error(message) +} diff --git a/libs/driver-adapters/pnpm-workspace.yaml b/libs/driver-adapters/pnpm-workspace.yaml index c12624bc6a17..1e5c43156d25 100644 --- a/libs/driver-adapters/pnpm-workspace.yaml +++ b/libs/driver-adapters/pnpm-workspace.yaml @@ -5,6 +5,7 @@ packages: - '../../../prisma/packages/adapter-planetscale' - '../../../prisma/packages/adapter-pg' - '../../../prisma/packages/bundled-js-drivers' + - '../../../prisma/packages/client-engine-runtime' - '../../../prisma/packages/debug' - '../../../prisma/packages/driver-adapter-utils' - './executor' diff --git a/shell.nix b/shell.nix index ac1931e79a3b..2200a7b293c5 100644 --- a/shell.nix +++ b/shell.nix @@ -2,14 +2,6 @@ pkgs ? import { }, }: -let - wasm-bindgen-cli = pkgs.wasm-bindgen-cli.override { - version = "0.2.93"; - hash = "sha256-DDdu5mM3gneraM85pAepBXWn3TMofarVR4NbjMdz3r0="; - cargoHash = "sha256-birrg+XABBHHKJxfTKAMSlmTVYLmnmqMDfRnmG6g/YQ="; - }; - -in pkgs.mkShell { packages = with pkgs; [ binaryen @@ -20,11 +12,12 @@ pkgs.mkShell { graphviz jq llvmPackages_latest.bintools - nodejs_22 + nodejs + nodePackages.prettier pnpm_9 rustup wabt - wasm-bindgen-cli + wasm-bindgen-cli_0_2_93 wasm-pack ];