Skip to content

Commit

Permalink
✨ unify transaction creation and close db connection in same call for…
Browse files Browse the repository at this point in the history
… scripts (#3360)

This PR adds a new parameter for the transaction creation functions that closes the database connection. This is useful for scripts which usually want to run stuff in one transaction and would then like to have the DB connection closes. For the api router the option is not used of course as it is a long running process.
  • Loading branch information
danyx23 authored Mar 26, 2024
2 parents 53f4424 + cc52aec commit 8501105
Show file tree
Hide file tree
Showing 23 changed files with 233 additions and 186 deletions.
4 changes: 1 addition & 3 deletions adminSiteServer/exportGitData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ const main = async () => {
commitOnly: true,
})
}
})

await db.closeTypeOrmAndKnexConnections()
}, db.TransactionCloseMode.Close)
}

void main()
7 changes: 4 additions & 3 deletions baker/algolia/indexChartsToAlgolia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ const indexChartsToAlgolia = async () => {

const index = client.initIndex(getIndexName(SearchIndexName.Charts))

const records = await db.knexReadonlyTransaction(getChartsRecords)
const records = await db.knexReadonlyTransaction(
getChartsRecords,
db.TransactionCloseMode.Close
)
await index.replaceAllObjects(records)

await db.closeTypeOrmAndKnexConnections()
}

process.on("unhandledRejection", (e) => {
Expand Down
7 changes: 4 additions & 3 deletions baker/algolia/indexExplorersToAlgolia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,11 @@ const indexExplorersToAlgolia = async () => {
try {
const index = client.initIndex(getIndexName(SearchIndexName.Explorers))

const records = await db.knexReadonlyTransaction(getExplorerRecords)
const records = await db.knexReadonlyTransaction(
getExplorerRecords,
db.TransactionCloseMode.Close
)
await index.replaceAllObjects(records)

await db.closeTypeOrmAndKnexConnections()
} catch (e) {
console.log("Error indexing explorers to Algolia: ", e)
}
Expand Down
6 changes: 4 additions & 2 deletions baker/algolia/indexToAlgolia.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,14 @@ const indexToAlgolia = async () => {
}
const index = client.initIndex(getIndexName(SearchIndexName.Pages))

const records = await db.knexReadonlyTransaction(getPagesRecords)
const records = await db.knexReadonlyTransaction(
getPagesRecords,
db.TransactionCloseMode.Close
)

await index.replaceAllObjects(records)

await wpdb.singleton.end()
await db.closeTypeOrmAndKnexConnections()
}

process.on("unhandledRejection", (e) => {
Expand Down
5 changes: 3 additions & 2 deletions baker/bakeGdocPost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ void yargs(hideBin(process.argv))
async ({ slug }) => {
const baker = new SiteBaker(BAKED_SITE_DIR, BAKED_BASE_URL)

await db.knexReadonlyTransaction((trx) =>
baker.bakeGDocPosts(trx, [slug])
await db.knexReadonlyTransaction(
(trx) => baker.bakeGDocPosts(trx, [slug]),
db.TransactionCloseMode.Close
)
process.exit(0)
}
Expand Down
5 changes: 3 additions & 2 deletions baker/bakeGdocPosts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ void yargs(hideBin(process.argv))
async ({ slugs }) => {
const baker = new SiteBaker(BAKED_SITE_DIR, BAKED_BASE_URL)

await db.knexReadonlyTransaction((trx) =>
baker.bakeGDocPosts(trx, slugs)
await db.knexReadonlyTransaction(
(trx) => baker.bakeGDocPosts(trx, slugs),
db.TransactionCloseMode.Close
)
process.exit(0)
}
Expand Down
6 changes: 3 additions & 3 deletions baker/batchTagWithGpt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ if (require.main === module) {
},
async (argv) => {
try {
await db.knexReadonlyTransaction((trx) =>
batchTagChartsWithGpt(trx, argv)
await db.knexReadonlyTransaction(
(trx) => batchTagChartsWithGpt(trx, argv),
db.TransactionCloseMode.Close
)
} finally {
await db.closeTypeOrmAndKnexConnections()
}
}
)
Expand Down
5 changes: 4 additions & 1 deletion baker/buildLocalBake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ const bakeDomainToFolder = async (
await fs.mkdirp(dir)
const baker = new SiteBaker(dir, baseUrl, bakeSteps)
console.log(`Baking site locally with baseUrl '${baseUrl}' to dir '${dir}'`)
await db.knexReadonlyTransaction((trx) => baker.bakeAll(trx))
await db.knexReadonlyTransaction(
(trx) => baker.bakeAll(trx),
db.TransactionCloseMode.Close
)
}

void yargs(hideBin(process.argv))
Expand Down
5 changes: 3 additions & 2 deletions baker/postUpdatedHook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ const main = async (
) => {
console.log(email, name, postId)
try {
const slug = db.knexReadWriteTransaction((trx) =>
syncPostToGrapher(trx, postId)
const slug = db.knexReadWriteTransaction(
(trx) => syncPostToGrapher(trx, postId),
db.TransactionCloseMode.Close
)

if (BAKE_ON_CHANGE)
Expand Down
6 changes: 4 additions & 2 deletions baker/recalcLatestCountryData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import * as db from "../db/db.js"
import { denormalizeLatestCountryData } from "../baker/countryProfiles.js"

const main = async () => {
await db.knexReadWriteTransaction(denormalizeLatestCountryData)
await db.closeTypeOrmAndKnexConnections()
await db.knexReadWriteTransaction(
denormalizeLatestCountryData,
db.TransactionCloseMode.Close
)
}

if (require.main === module) void main()
12 changes: 7 additions & 5 deletions baker/runBakeGraphers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import * as db from "../db/db.js"
*/

const main = async (folder: string) => {
return db.knexReadonlyTransaction((trx) =>
bakeAllChangedGrapherPagesVariablesPngSvgAndDeleteRemovedGraphers(
folder,
trx
)
return db.knexReadonlyTransaction(
(trx) =>
bakeAllChangedGrapherPagesVariablesPngSvgAndDeleteRemovedGraphers(
folder,
trx
),
db.TransactionCloseMode.Close
)
}

Expand Down
5 changes: 4 additions & 1 deletion baker/startDeployQueueServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ const main = async () => {
setTimeout(deployIfQueueIsNotEmpty, 10 * 1000)
})

await db.knexReadonlyTransaction(deployIfQueueIsNotEmpty)
await db.knexReadonlyTransaction(
deployIfQueueIsNotEmpty,
db.TransactionCloseMode.Close
)
}

void main()
6 changes: 4 additions & 2 deletions baker/syncRedirectsToGrapher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ export const syncRedirectsToGrapher = async (

const main = async (): Promise<void> => {
try {
await db.knexReadWriteTransaction((trx) => syncRedirectsToGrapher(trx))
await db.knexReadWriteTransaction(
(trx) => syncRedirectsToGrapher(trx),
db.TransactionCloseMode.Close
)
} finally {
await wpdb.singleton.end()
await db.closeTypeOrmAndKnexConnections()
}
}

Expand Down
4 changes: 1 addition & 3 deletions db/analyzeWpPosts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ const analyze = async (): Promise<void> => {
for (const [tag, count] of sortedTagCount) {
console.log(`${tag}: ${count}`)
}
})

await db.closeTypeOrmAndKnexConnections()
}, db.TransactionCloseMode.Close)
}

void analyze()
44 changes: 35 additions & 9 deletions db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ export const mysqlFirst = async (

export const closeTypeOrmAndKnexConnections = async (): Promise<void> => {
if (typeormDataSource) await typeormDataSource.destroy()
if (_knexInstance) await _knexInstance.destroy()
if (_knexInstance) {
await _knexInstance.destroy()
_knexInstance = undefined
}
}

let _knexInstance: Knex
let _knexInstance: Knex | undefined = undefined

export const knexInstance = (): Knex<any, any[]> => {
if (_knexInstance) return _knexInstance
Expand Down Expand Up @@ -119,23 +122,46 @@ export type KnexReadWriteTransaction = Knex.Transaction<any, any[]> & {
readonly [__write_capability]: "write"
}

export enum TransactionCloseMode {
Close,
KeepOpen,
}

async function knexTransaction<T, KT>(
transactionFn: (trx: KT) => Promise<T>,
closeConnection: TransactionCloseMode,
readonly: boolean,
knex: Knex<any, any[]>
): Promise<T> {
try {
const options = readonly ? { readOnly: true } : {}
const result = await knex.transaction(
async (trx) => transactionFn(trx as KT),
options
)
return result
} finally {
if (closeConnection === TransactionCloseMode.Close) {
await knex.destroy()
if (knex === _knexInstance) _knexInstance = undefined
}
}
}

export async function knexReadonlyTransaction<T>(
transactionFn: (trx: KnexReadonlyTransaction) => Promise<T>,
closeConnection: TransactionCloseMode = TransactionCloseMode.KeepOpen,
knex: Knex<any, any[]> = knexInstance()
): Promise<T> {
return knex.transaction(
async (trx) => transactionFn(trx as KnexReadonlyTransaction),
{ readOnly: true }
)
return knexTransaction(transactionFn, closeConnection, true, knex)
}

export async function knexReadWriteTransaction<T>(
transactionFn: (trx: KnexReadWriteTransaction) => Promise<T>,
closeConnection: TransactionCloseMode = TransactionCloseMode.KeepOpen,
knex: Knex<any, any[]> = knexInstance()
): Promise<T> {
return knex.transaction(async (trx) =>
transactionFn(trx as KnexReadWriteTransaction)
)
return knexTransaction(transactionFn, closeConnection, false, knex)
}
export const knexRaw = async <TRow = unknown>(
knex: Knex<any, any[]>,
Expand Down
5 changes: 2 additions & 3 deletions db/migrateWpPostsToArchieMl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ const migrate = async (trx: db.KnexReadWriteTransaction): Promise<void> => {
await db.knexRaw(trx, insertQuery, [
JSON.stringify(archieMlFieldContent, null, 2),
JSON.stringify(archieMlStatsContent, null, 2),
markdown,
markdown ?? null,
post.id,
])
console.log("inserted", post.id)
Expand Down Expand Up @@ -315,8 +315,7 @@ const migrate = async (trx: db.KnexReadWriteTransaction): Promise<void> => {
}

async function runMigrate(): Promise<void> {
await db.knexReadWriteTransaction(migrate)
await db.closeTypeOrmAndKnexConnections()
await db.knexReadWriteTransaction(migrate, db.TransactionCloseMode.Close)
}

void runMigrate()
10 changes: 3 additions & 7 deletions db/model/Gdoc/GdocBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -740,13 +740,9 @@ export class GdocBase implements OwidGdocBaseInterface {
[]
)

const { chartIdsBySlug, publishedExplorersBySlug } =
await db.knexReadonlyTransaction(async (trx) => {
const chartIdsBySlug = await mapSlugsToIds(trx)
const publishedExplorersBySlug =
await db.getPublishedExplorersBySlug(trx)
return { chartIdsBySlug, publishedExplorersBySlug }
})
const chartIdsBySlug = await mapSlugsToIds(knex)
const publishedExplorersBySlug =
await db.getPublishedExplorersBySlug(knex)

const linkErrors: OwidGdocErrorMessage[] = this.links.reduce(
(errors: OwidGdocErrorMessage[], link): OwidGdocErrorMessage[] => {
Expand Down
6 changes: 4 additions & 2 deletions db/refreshPageviewsFromDatasette.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ async function downloadAndInsertCSV(

const main = async (): Promise<void> => {
try {
await db.knexReadWriteTransaction((trx) => downloadAndInsertCSV(trx))
await db.knexReadWriteTransaction(
(trx) => downloadAndInsertCSV(trx),
db.TransactionCloseMode.Close
)
} catch (e) {
console.error(e)
} finally {
await db.closeTypeOrmAndKnexConnections()
}
}

Expand Down
6 changes: 4 additions & 2 deletions db/syncPostsToGrapher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,12 @@ const syncPostsToGrapher = async (

const main = async (): Promise<void> => {
try {
await db.knexReadWriteTransaction((trx) => syncPostsToGrapher(trx))
await db.knexReadWriteTransaction(
(trx) => syncPostsToGrapher(trx),
db.TransactionCloseMode.Close
)
} finally {
await wpdb.singleton.end()
await db.closeTypeOrmAndKnexConnections()
}
}

Expand Down
Loading

0 comments on commit 8501105

Please sign in to comment.