Skip to content

Commit

Permalink
add queryId to CompiledQuery to allow async communication between…
Browse files Browse the repository at this point in the history
… more components. (#176)
  • Loading branch information
igalklebanov authored Jan 26, 2025
1 parent 5af5f80 commit 3b753c7
Show file tree
Hide file tree
Showing 28 changed files with 740 additions and 458 deletions.
1 change: 0 additions & 1 deletion src/dialect/mssql/mssql-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ class MssqlRequest<O> {
(event: 'completed' | 'chunkReady' | 'error', error?: unknown) => void
>
readonly #tedious: Tedious
#error: Error | any[] | undefined
#rowCount: number | undefined

constructor(props: MssqlRequestProps<O>) {
Expand Down
38 changes: 21 additions & 17 deletions src/dialect/mysql/mysql-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { parseSavepointCommand } from '../../parser/savepoint-parser.js'
import { CompiledQuery } from '../../query-compiler/compiled-query.js'
import { QueryCompiler } from '../../query-compiler/query-compiler.js'
import { isFunction, isObject, freeze } from '../../util/object-utils.js'
import { createQueryId } from '../../util/query-id.js'
import { extendStackTrace } from '../../util/stack-trace-utils.js'
import {
MysqlDialectConfig,
Expand Down Expand Up @@ -98,7 +99,10 @@ export class MysqlDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('savepoint', savepointName)),
compileQuery(
parseSavepointCommand('savepoint', savepointName),
createQueryId(),
),
)
}

Expand All @@ -108,7 +112,10 @@ export class MysqlDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('rollback to', savepointName)),
compileQuery(
parseSavepointCommand('rollback to', savepointName),
createQueryId(),
),
)
}

Expand All @@ -118,7 +125,10 @@ export class MysqlDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('release savepoint', savepointName)),
compileQuery(
parseSavepointCommand('release savepoint', savepointName),
createQueryId(),
),
)
}

Expand Down Expand Up @@ -157,27 +167,21 @@ class MysqlConnection implements DatabaseConnection {
if (isOkPacket(result)) {
const { insertId, affectedRows, changedRows } = result

const numAffectedRows =
affectedRows !== undefined && affectedRows !== null
? BigInt(affectedRows)
: undefined

const numChangedRows =
changedRows !== undefined && changedRows !== null
? BigInt(changedRows)
: undefined

return {
insertId:
insertId !== undefined &&
insertId !== null &&
insertId.toString() !== '0'
? BigInt(insertId)
: undefined,
// TODO: remove.
numUpdatedOrDeletedRows: numAffectedRows,
numAffectedRows,
numChangedRows,
numAffectedRows:
affectedRows !== undefined && affectedRows !== null
? BigInt(affectedRows)
: undefined,
numChangedRows:
changedRows !== undefined && changedRows !== null
? BigInt(changedRows)
: undefined,
rows: [],
}
} else if (Array.isArray(result)) {
Expand Down
55 changes: 26 additions & 29 deletions src/dialect/postgres/postgres-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ import {
QueryResult,
} from '../../driver/database-connection.js'
import { Driver, TransactionSettings } from '../../driver/driver.js'
import { IdentifierNode } from '../../operation-node/identifier-node.js'
import { RawNode } from '../../operation-node/raw-node.js'
import { parseSavepointCommand } from '../../parser/savepoint-parser.js'
import { CompiledQuery } from '../../query-compiler/compiled-query.js'
import {
QueryCompiler,
RootOperationNode,
} from '../../query-compiler/query-compiler.js'
import { QueryCompiler } from '../../query-compiler/query-compiler.js'
import { isFunction, freeze } from '../../util/object-utils.js'
import { createQueryId } from '../../util/query-id.js'
import { extendStackTrace } from '../../util/stack-trace-utils.js'
import {
PostgresCursorConstructor,
Expand Down Expand Up @@ -91,7 +87,10 @@ export class PostgresDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('savepoint', savepointName)),
compileQuery(
parseSavepointCommand('savepoint', savepointName),
createQueryId(),
),
)
}

Expand All @@ -101,7 +100,10 @@ export class PostgresDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('rollback to', savepointName)),
compileQuery(
parseSavepointCommand('rollback to', savepointName),
createQueryId(),
),
)
}

Expand All @@ -111,7 +113,10 @@ export class PostgresDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('release', savepointName)),
compileQuery(
parseSavepointCommand('release', savepointName),
createQueryId(),
),
)
}

Expand Down Expand Up @@ -143,28 +148,20 @@ class PostgresConnection implements DatabaseConnection {

async executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> {
try {
const result = await this.#client.query<O>(compiledQuery.sql, [
...compiledQuery.parameters,
])

if (
result.command === 'INSERT' ||
result.command === 'UPDATE' ||
result.command === 'DELETE' ||
result.command === 'MERGE'
) {
const numAffectedRows = BigInt(result.rowCount)

return {
// TODO: remove.
numUpdatedOrDeletedRows: numAffectedRows,
numAffectedRows,
rows: result.rows ?? [],
}
}
const { command, rowCount, rows } = await this.#client.query<O>(
compiledQuery.sql,
[...compiledQuery.parameters],
)

return {
rows: result.rows ?? [],
numAffectedRows:
command === 'INSERT' ||
command === 'UPDATE' ||
command === 'DELETE' ||
command === 'MERGE'
? BigInt(rowCount)
: undefined,
rows: rows ?? [],
}
} catch (err) {
throw extendStackTrace(err, new Error())
Expand Down
42 changes: 24 additions & 18 deletions src/dialect/sqlite/sqlite-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { parseSavepointCommand } from '../../parser/savepoint-parser.js'
import { CompiledQuery } from '../../query-compiler/compiled-query.js'
import { QueryCompiler } from '../../query-compiler/query-compiler.js'
import { freeze, isFunction } from '../../util/object-utils.js'
import { createQueryId } from '../../util/query-id.js'
import { SqliteDatabase, SqliteDialectConfig } from './sqlite-dialect-config.js'

export class SqliteDriver implements Driver {
Expand Down Expand Up @@ -58,7 +59,10 @@ export class SqliteDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('savepoint', savepointName)),
compileQuery(
parseSavepointCommand('savepoint', savepointName),
createQueryId(),
),
)
}

Expand All @@ -68,7 +72,10 @@ export class SqliteDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('rollback to', savepointName)),
compileQuery(
parseSavepointCommand('rollback to', savepointName),
createQueryId(),
),
)
}

Expand All @@ -78,7 +85,10 @@ export class SqliteDriver implements Driver {
compileQuery: QueryCompiler['compileQuery'],
): Promise<void> {
await connection.executeQuery(
compileQuery(parseSavepointCommand('release', savepointName)),
compileQuery(
parseSavepointCommand('release', savepointName),
createQueryId(),
),
)
}

Expand Down Expand Up @@ -106,23 +116,19 @@ class SqliteConnection implements DatabaseConnection {
return Promise.resolve({
rows: stmt.all(parameters) as O[],
})
} else {
const { changes, lastInsertRowid } = stmt.run(parameters)
}

const numAffectedRows =
changes !== undefined && changes !== null ? BigInt(changes) : undefined
const { changes, lastInsertRowid } = stmt.run(parameters)

return Promise.resolve({
// TODO: remove.
numUpdatedOrDeletedRows: numAffectedRows,
numAffectedRows,
insertId:
lastInsertRowid !== undefined && lastInsertRowid !== null
? BigInt(lastInsertRowid)
: undefined,
rows: [],
})
}
return Promise.resolve({
numAffectedRows:
changes !== undefined && changes !== null ? BigInt(changes) : undefined,
insertId:
lastInsertRowid !== undefined && lastInsertRowid !== null
? BigInt(lastInsertRowid)
: undefined,
rows: [],
})
}

async *streamQuery<R>(
Expand Down
6 changes: 0 additions & 6 deletions src/driver/database-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ export interface DatabaseConnection {
}

export interface QueryResult<O> {
/**
* @deprecated use {@link QueryResult.numAffectedRows} instead.
*/
// TODO: remove.
readonly numUpdatedOrDeletedRows?: bigint

/**
* This is defined for insert, update, delete and merge queries and contains
* the number of rows the query inserted/updated/deleted.
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ export {
} from './util/type-utils.js'
export * from './util/infer-result.js'
export { logOnce } from './util/log-once.js'
export { createQueryId, QueryId } from './util/query-id.js'

export {
SelectExpression,
Expand Down
Loading

0 comments on commit 3b753c7

Please sign in to comment.