Skip to content

Commit

Permalink
feat: allow nested transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnagydavid committed Nov 15, 2024
1 parent 2a90121 commit 388c816
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 16 deletions.
128 changes: 118 additions & 10 deletions src/commondao/common.dao.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,18 +526,126 @@ async function getEven(): Promise<TestItemBM[]> {
return await dao.query().filterEq('even', true).runQuery()
}

test('runInTransaction', async () => {
const items = createTestItemsBM(4)

await dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
await tx.save(dao, items[3]!)
await tx.deleteById(dao, items[1]!.id)
describe('runInTransaction', () => {
test('should work', async () => {
const items = createTestItemsBM(4)

await dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
await tx.save(dao, items[3]!)
await tx.deleteById(dao, items[1]!.id)
})

const items2 = await dao.query().runQuery()
expect(items2.map(i => i.id).sort()).toEqual(['id1', 'id4'])
})

const items2 = await dao.query().runQuery()
expect(items2.map(i => i.id).sort()).toEqual(['id1', 'id4'])
describe('with nested transactions', () => {
test('should work', async () => {
const dao2 = new CommonDao(daoCfg)
const items = createTestItemsBM(4)

await dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
await dao2.runInTransaction(async tx2 => {
await tx2.save(dao, items[3]!)
await tx2.deleteById(dao, items[1]!.id)
})
})

const items2 = await dao.query().runQuery()
expect(items2.map(i => i.id).sort()).toEqual(['id1', 'id4'])
})

test('should rollback when the inner transaction fails', async () => {
const dao2 = new CommonDao(daoCfg)
const items = createTestItemsBM(4)

const transaction = dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
await dao2.runInTransaction(async tx2 => {
await tx2.save(dao, items[3]!)
await tx2.deleteById(dao, items[1]!.id)
throw new Error('I should not have done that!')
})
})

await expect(transaction).rejects.toThrow('I should not have done that!')
const items2 = await dao.query().runQuery()
expect(items2.map(i => i.id).sort()).toEqual([])
})

test('should rollback when the outer transaction fails', async () => {
const dao2 = new CommonDao(daoCfg)
const items = createTestItemsBM(4)

const transaction = dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
await dao2.runInTransaction(async tx2 => {
await tx2.save(dao, items[3]!)
await tx2.deleteById(dao, items[1]!.id)
})
throw new Error('I should not have done that!')
})

await expect(transaction).rejects.toThrow('I should not have done that!')
const items2 = await dao.query().runQuery()
expect(items2.map(i => i.id).sort()).toEqual([])
})

test('transactions of different DBs should not interfere', async () => {
const db2 = new InMemoryDB()
const daoCfg2 = { ...daoCfg, db: db2 }
const dao2 = new CommonDao(daoCfg2)
const items = createTestItemsBM(4)

await dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
try {
await dao2.runInTransaction(async tx2 => {
await tx2.save(dao, items[3]!)
await tx2.deleteById(dao, items[1]!.id)
throw new Error('I should not have done that!')
})
} catch {
//
}
})

const itemsInDb1 = await dao.query().runQuery()
expect(itemsInDb1.map(i => i.id).sort()).toEqual(['id1', 'id2'])
const itemsInDb2 = await dao2.query().runQuery()
expect(itemsInDb2.map(i => i.id).sort()).toEqual([])
})

test('transactions of different DBs should not interfere', async () => {
const db2 = new InMemoryDB()
const daoCfg2 = { ...daoCfg, db: db2 }
const dao2 = new CommonDao(daoCfg2)
const items = createTestItemsBM(4)

const transaction = dao.runInTransaction(async tx => {
await tx.save(dao, items[0]!)
await tx.save(dao, items[1]!)
await dao2.runInTransaction(async tx2 => {
await tx2.save(dao, items[3]!)
await tx2.deleteById(dao, items[1]!.id)
})
throw new Error('I should not have done that!')
})

await expect(transaction).rejects.toThrow('I should not have done that!')
const itemsInDb1 = await dao.query().runQuery()
expect(itemsInDb1.map(i => i.id).sort()).toEqual([])
const itemsInDb2 = await dao2.query().runQuery()
expect(itemsInDb2.map(i => i.id).sort()).toEqual(['id4'])
})
})
})

test('should not be able to query by a non-indexed property', async () => {
Expand Down
28 changes: 22 additions & 6 deletions src/commondao/common.dao.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
transformNoOp,
writableVoid,
} from '@naturalcycles/nodejs-lib'
import { CommonDB } from '..'
import { DBLibError } from '../cnst'
import { CommonDBTransactionOptions, DBTransaction, RunQueryResult } from '../db.model'
import { DBQuery, RunnableDBQuery } from '../query/dbQuery'
Expand Down Expand Up @@ -75,6 +76,8 @@ const isCI = !!process.env['CI']
* TM = Transport model (optimized to be sent over the wire)
*/
export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, ID = BM['id']> {
private static transaction = new Map<CommonDB, CommonDaoTransaction | undefined>()

constructor(public cfg: CommonDaoCfg<BM, DBM, ID>) {
this.cfg = {
// Default is to NOT log in AppEngine and in CI,
Expand Down Expand Up @@ -1310,16 +1313,29 @@ export class CommonDao<BM extends BaseDBEntity, DBM extends BaseDBEntity = BM, I
): Promise<T> {
let r: T

await this.cfg.db.runInTransaction(async tx => {
const daoTx = new CommonDaoTransaction(tx, this.cfg.logger!)

const tx = CommonDao.transaction.get(this.cfg.db)
if (tx) {
try {
r = await fn(daoTx)
r = await fn(tx)
} catch (err) {
await daoTx.rollback() // graceful rollback that "never throws"
await tx.rollback() // graceful rollback that "never throws"
throw err
}
}, opt)
} else {
await this.cfg.db.runInTransaction(async tx => {
const daoTx = new CommonDaoTransaction(tx, this.cfg.logger!)

try {
CommonDao.transaction.set(this.cfg.db, daoTx)
r = await fn(daoTx)
} catch (err) {
await daoTx.rollback() // graceful rollback that "never throws"
throw err
} finally {
CommonDao.transaction.delete(this.cfg.db)
}
}, opt)
}

return r!
}
Expand Down

0 comments on commit 388c816

Please sign in to comment.