Skip to content

Commit

Permalink
test: run query engine tests with query compiler
Browse files Browse the repository at this point in the history
  • Loading branch information
aqrln committed Feb 3, 2025
1 parent ea09296 commit 0f3aec5
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .prettierrc.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tabWidth = 2
semi = false
singleQuote = true
4 changes: 3 additions & 1 deletion libs/driver-adapters/executor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
"description": "",
"private": true,
"scripts": {
"build": "tsup ./src/testd-qe.ts ./src/demo-se.ts ./src/bench.ts --format esm --dts",
"build": "tsup ./src/testd-qe.ts ./src/testd-qc.ts ./src/demo-se.ts ./src/bench.ts --format esm --dts",
"test:qe": "node --import tsx ./src/testd-qe.ts",
"test:qc": "node --import tsx ./src/testd-qc.ts",
"demo:se": "node --import tsx ./src/demo-se.ts",
"demo:qc": "node --import tsx ./src/demo-qc.ts",
"clean:d1": "rm -rf ../../connector-test-kit-rs/query-engine-tests/.wrangler"
Expand All @@ -27,6 +28,7 @@
"@prisma/adapter-pg": "workspace:*",
"@prisma/adapter-planetscale": "workspace:*",
"@prisma/bundled-js-drivers": "workspace:*",
"@prisma/client-engine-runtime": "workspace:*",
"@prisma/driver-adapter-utils": "workspace:*",
"mitata": "0.1.11",
"query-engine-wasm-baseline": "npm:@prisma/[email protected]",
Expand Down
358 changes: 358 additions & 0 deletions libs/driver-adapters/executor/src/testd-qc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,358 @@
import * as readline from 'node:readline'
import * as S from '@effect/schema/Schema'
import {
bindAdapter,
ConnectionInfo,
ErrorCapturingDriverAdapter,
Queryable,
} from '@prisma/driver-adapter-utils'
import {
QueryInterpreter,
TransactionManager,
IsolationLevel,
} from '@prisma/client-engine-runtime'

import type { DriverAdaptersManager } from './driver-adapters-manager'
import { Env, jsonRpc } from './types'
import * as qc from './query-compiler'
import { assertNever, debug, err } from './utils'
import { setupDriverAdaptersManager } from './setup'
import { SchemaId, JsonProtocolQuery } from './types/jsonRpc'

async function main(): Promise<void> {
const env = S.decodeUnknownSync(Env)(process.env)
console.log('[env]', env)

const iface = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
})

iface.on('line', async (line) => {
try {
const request = S.decodeSync(jsonRpc.RequestFromString)(line)
debug(`Got a request: ${line}`)

try {
const response = await handleRequest(request, env)
respondOk(request.id, response)
} catch (err) {
debug('[nodejs] Error from request handler: ', err)
respondErr(request.id, {
code: 1,
message: err.stack ?? err.toString(),
})
}
} catch (err) {
debug('Received non-json line: ', line)
console.error(err)
}
})
}

const state: Record<
SchemaId,
{
compiler: qc.QueryCompiler
driverAdapterManager: DriverAdaptersManager
adapter: ErrorCapturingDriverAdapter
transactionManager: TransactionManager
logs: string[]
}
> = {}

async function handleRequest(
{ method, params }: jsonRpc.Request,
env: Env,
): Promise<unknown> {
if (method !== 'initializeSchema') {
if (state[params.schemaId] === undefined) {
throw new Error(
`Schema with id ${params.schemaId} is not initialized. Please call 'initializeSchema' first.`,
)
}
}

switch (method) {
case 'initializeSchema': {
debug('Got `initializeSchema', params)

const { url, schema, schemaId, migrationScript } = params

const driverAdapterManager = await setupDriverAdaptersManager(
env,
migrationScript,
)

const { compiler, adapter } = await initQc({
url,
driverAdapterManager,
schema,
})

const transactionManager = new TransactionManager({
driverAdapter: adapter,
clientVersion: '0.0.0',
})

state[schemaId] = {
compiler,
driverAdapterManager,
adapter,
transactionManager,
logs: [],
}

if (adapter && adapter.getConnectionInfo) {
const maxBindValuesResult = adapter
.getConnectionInfo()
.map((info) => info.maxBindValues)
if (maxBindValuesResult.ok) {
return { maxBindValues: maxBindValuesResult.value }
}
}

return { maxBindValues: null }
}

case 'query': {
debug('Got `query`', params)
const { query, schemaId, txId } = params
const { compiler, adapter, transactionManager, logs } = state[schemaId]

const executeQuery = async (
queryable: Queryable,
query: JsonProtocolQuery,
) => {
// TODO: this shouldn't be async anymore, should it?
const queryPlanString = await compiler.compile(JSON.stringify(query))
const queryPlan = JSON.parse(queryPlanString)

const interpreter = new QueryInterpreter({
queryable,
placeholderValues: {},
onQuery: (event) => {
logs.push(JSON.stringify(event))
},
})

return interpreter.run(queryPlan)
}

const executeIndependentBatch = async (
queries: readonly JsonProtocolQuery[],
) => Promise.all(queries.map((query) => executeQuery(adapter, query)))

const executeTransactionalBatch = async (
queries: readonly JsonProtocolQuery[],
isolationLevel?: IsolationLevel,
) => {
const txInfo = await transactionManager.startTransaction({
isolationLevel,
})
const queryable = transactionManager.getTransaction(
txInfo,
'batch transaction query',
)

try {
const results: unknown[] = []

for (const query of queries) {
const result = await executeQuery(queryable, query)
results.push(result)
}

await transactionManager.commitTransaction(txInfo.id)

return results
} catch (err) {
await transactionManager
.rollbackTransaction(txInfo.id)
.catch(console.error)
throw err
}
}

if ('batch' in query) {
const { batch, transaction } = query

const results = transaction
? await executeTransactionalBatch(
batch,
parseIsolationLevel(transaction.isolationLevel),
)
: await executeIndependentBatch(batch)

debug('🟢 Batch query results: ', results)

return {
batchResult: batch.map((query, index) => ({
data: { [query.action]: results[index] },
})),
}
} else {
const queryable = txId
? transactionManager.getTransaction(
{ id: txId, payload: undefined },
query.action,
)
: adapter

if (!queryable) {
throw new Error(
`No transaction with id ${txId} found. Please call 'startTx' first.`,
)
}

const result = await executeQuery(queryable, query)

debug('🟢 Query result: ', result)

return { data: { [query.action]: result } }
}
}

case 'startTx': {
debug('Got `startTx`', params)

const { schemaId, options } = params
const { transactionManager } = state[schemaId]

return await transactionManager.startTransaction({
maxWait: options.max_wait,
timeout: options.timeout,
isolationLevel: parseIsolationLevel(options.isolation_level),
})
}

case 'commitTx': {
debug('Got `commitTx`', params)

const { schemaId, txId } = params
const { transactionManager } = state[schemaId]

return await transactionManager.commitTransaction(txId)
}

case 'rollbackTx': {
debug('Got `rollbackTx`', params)

const { schemaId, txId } = params
const { transactionManager } = state[schemaId]

return await transactionManager.rollbackTransaction(txId)
}

case 'teardown': {
debug('Got `teardown`', params)

const { schemaId } = params
const { driverAdapterManager } = state[schemaId]

await driverAdapterManager.teardown()
delete state[schemaId]

return {}
}

case 'getLogs': {
const { schemaId } = params
return state[schemaId].logs
}

default: {
assertNever(method, `Unknown method: \`${method}\``)
}
}
}

function respondErr(requestId: number, error: jsonRpc.RpcError) {
const msg: jsonRpc.ErrResponse = {
jsonrpc: '2.0',
id: requestId,
error,
}
console.log(JSON.stringify(msg))
}

function respondOk(requestId: number, payload: unknown) {
const msg: jsonRpc.OkResponse = {
jsonrpc: '2.0',
id: requestId,
result: payload,
}
console.log(JSON.stringify(msg))
}

type InitQueryCompilerParams = {
driverAdapterManager: DriverAdaptersManager
url: string
schema: string
}

async function initQc({
driverAdapterManager,
url,
schema,
}: InitQueryCompilerParams) {
const adapter = await driverAdapterManager.connect({ url })
const errorCapturingAdapter = bindAdapter(adapter)

let connectionInfo: ConnectionInfo = {}
if (errorCapturingAdapter.getConnectionInfo) {
const result = errorCapturingAdapter.getConnectionInfo()
if (!result.ok) {
throw result.error
}
connectionInfo = result.value
}

const compiler = await qc.initQueryCompiler({
datamodel: schema,
flavour: adapter.provider,
connectionInfo,
})

return {
compiler,
adapter: errorCapturingAdapter,
}
}

function parseIsolationLevel(
level: string | null | undefined,
): IsolationLevel | undefined {
switch (level) {
case null:
case undefined:
return undefined

case 'ReadCommitted':
case 'Read Committed':
return IsolationLevel.ReadCommitted

case 'ReadUncommitted':
case 'Read Uncommitted':
return IsolationLevel.ReadUncommitted

case 'RepeatableRead':
case 'Repeatable Read':
return IsolationLevel.RepeatableRead

case 'Serializable':
return IsolationLevel.Serializable

case 'Snapshot':
return IsolationLevel.Snapshot

default:
// We don't validate the isolation level on the RPC schema level because some tests
// rely on sending invalid isolation levels to test error handling, and those invalid
// levels must be forwarded to the query engine as-is in `testd-qe.ts`.
throw new Error(`Unknown isolation level: ${level}`)
}
}

main().catch(err)
4 changes: 2 additions & 2 deletions libs/driver-adapters/executor/src/testd-qe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Env, jsonRpc } from './types'
import * as qe from './query-engine'
import { nextRequestId } from './requestId'
import { createRNEngineConnector } from './rn'
import { debug, err } from './utils'
import { assertNever, debug, err } from './utils'
import { setupDriverAdaptersManager } from './setup'
import { SchemaId } from './types/jsonRpc'

Expand Down Expand Up @@ -179,7 +179,7 @@ async function handleRequest(
return state[schemaId].logs
}
default: {
throw new Error(`Unknown method: \`${method}\``)
assertNever(method, `Unknown method: \`${method}\``)
}
}
}
Expand Down
Loading

0 comments on commit 0f3aec5

Please sign in to comment.