Skip to content

Commit

Permalink
feat: mvp working incremental backups
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Jan 10, 2025
1 parent 95b97f6 commit d27d1ca
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 93 deletions.
20 changes: 6 additions & 14 deletions @xen-orchestra/backups/RemoteAdapter.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { asyncEach } from '@vates/async-each'
import { asyncMap, asyncMapSettled } from '@xen-orchestra/async-map'
import { writeVhdFileToRemote } from '@xen-orchestra/disk-transform/src/to/VhdRemote.mts'
import { compose } from '@vates/compose'
import { createLogger } from '@xen-orchestra/log'
import { createVhdDirectoryFromStream, openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } from 'vhd-lib'
import { openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } from 'vhd-lib'
import { decorateMethodsWith } from '@vates/decorate-with'
import { deduped } from '@vates/disposable/deduped.js'
import { dirname, join, resolve } from 'node:path'
Expand Down Expand Up @@ -681,22 +682,13 @@ export class RemoteAdapter {
return path
}

async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
async writeVhd(path, disk, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
const handler = this._handler
const dataPath = this.useVhdDirectory() ? `${dirname(path)}/data/${uuidv4()}.vhd` : path
await writeVhdFileToRemote(handler, dataPath, disk, { writeBlockConcurrency })
await validator(dataPath)
if (this.useVhdDirectory()) {
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
const size = await createVhdDirectoryFromStream(handler, dataPath, input, {
concurrency: writeBlockConcurrency,
compression: this.#getCompressionType(),
async validator() {
await input.task
return validator.apply(this, arguments)
},
})
await VhdAbstract.createAlias(handler, path, dataPath)
return size
} else {
return this.outputStream(path, input, { checksum, validator })
}
}

Expand Down
25 changes: 8 additions & 17 deletions @xen-orchestra/backups/_incrementalVm.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { Task } from './Task.mjs'
import pick from 'lodash/pick.js'
import { BASE_DELTA_VDI, COPY_OF, VM_UUID } from './_otherConfig.mjs'

import { VhdRemote } from '@xen-orchestra/disk-transform/src/from/XapiVhdExport.mts'

const ensureArray = value => (value === undefined ? [] : Array.isArray(value) ? value : [value])

export async function exportIncrementalVm(
Expand All @@ -19,7 +21,7 @@ export async function exportIncrementalVm(
) {
// refs of VM's VDIs → base's VDIs.

const streams = {}
const disks = {}
const vdis = {}
const vbds = {}
await cancelableMap(cancelToken, vm.$VBDs, async (cancelToken, vbd) => {
Expand Down Expand Up @@ -53,23 +55,12 @@ export async function exportIncrementalVm(
$SR$uuid: vdi.$SR.uuid,
}
try {
streams[`${vdiRef}.vhd`] = await vdi.$exportContent({
baseRef: baseVdi?.$ref,
cancelToken,
format: 'vhd',
nbdConcurrency,
preferNbd,
})
disks[`${vdiRef}.vhd`] = new VhdRemote({ vdi })
} catch (err) {
if (err.code === 'VDI_CANT_DO_DELTA') {
// fall back to a base
Task.info(`Can't do delta, will try to get a full stream`, { vdi })
streams[`${vdiRef}.vhd`] = await vdi.$exportContent({
cancelToken,
format: 'vhd',
nbdConcurrency,
preferNbd,
})
disks[`${vdiRef}.vhd`] = new VhdRemote({ vdi })
// only warn if the fall back succeed
Task.warning(`Can't do delta with this vdi, transfer will be a full`, {
vdi,
Expand All @@ -87,7 +78,7 @@ export async function exportIncrementalVm(
...suspendVdi,
$SR$uuid: suspendVdi.$SR.uuid,
}
streams[`${vdiRef}.vhd`] = await suspendVdi.$exportContent({
disks[`${vdiRef}.vhd`] = await suspendVdi.$exportContent({
cancelToken,
format: 'vhd',
})
Expand Down Expand Up @@ -127,10 +118,10 @@ export async function exportIncrementalVm(
},
vtpms,
},
'streams',
'disks',
{
configurable: true,
value: streams,
value: disks,
writable: true,
}
)
Expand Down
33 changes: 5 additions & 28 deletions @xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
import { asyncEach } from '@vates/async-each'
import { createLogger } from '@xen-orchestra/log'
import { pipeline } from 'node:stream'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import keyBy from 'lodash/keyBy.js'
import mapValues from 'lodash/mapValues.js'
import vhdStreamValidator from 'vhd-lib/vhdStreamValidator.js'

import { AbstractXapi } from './_AbstractXapi.mjs'
import { exportIncrementalVm } from '../../_incrementalVm.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { IncrementalXapiWriter } from '../_writers/IncrementalXapiWriter.mjs'
import { Task } from '../../Task.mjs'
import { watchStreamSize } from '../../_watchStreamSize.mjs'
import {
DATETIME,
DELTA_CHAIN_LENGTH,
Expand All @@ -22,7 +14,6 @@ import {
} from '../../_otherConfig.mjs'

const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')
const noop = Function.prototype

export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends AbstractXapi {
_getWriters() {
Expand All @@ -45,33 +36,19 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
nbdConcurrency: this._settings.nbdConcurrency,
preferNbd: this._settings.preferNbd,
})
// since NBD is network based, if one disk use nbd , all the disk use them
// except the suspended VDI
if (Object.values(deltaExport.streams).some(({ _nbd }) => _nbd)) {
Task.info('Transfer data using NBD')
}

const isVhdDifferencing = {}
// since isVhdDifferencingDisk is reading and unshifting data in stream
// it should be done BEFORE any other stream transform
await asyncEach(Object.entries(deltaExport.streams), async ([key, stream]) => {
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
})
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))

if (this._settings.validateVhdStreams) {
deltaExport.streams = mapValues(deltaExport.streams, stream => pipeline(stream, vhdStreamValidator, noop))
}
deltaExport.streams = mapValues(deltaExport.streams, this._throttleStream)

// @todo : reimplement fork, throttle, validation,isVhdDifferencingDis , nbd use and size computation
// @todo : wrap the genration and transfer in a Disposable.factory to handle error case an resource cleanup
const timestamp = Date.now()

await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
deltaExport,
isVhdDifferencing,
sizeContainers,
sizeContainers: {},
timestamp,
vm,
vmSnapshot: exportedVm,
Expand Down Expand Up @@ -102,7 +79,7 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
await markExportSuccessfull(this._xapi, exportedVm.$ref)
}

const size = Object.values(sizeContainers).reduce((sum, { size }) => sum + size, 0)
const size = Object.values({}).reduce((sum, { size }) => sum + size, 0)
const end = Date.now()
const duration = end - timestamp
debug('transfer complete', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import mapValues from 'lodash/mapValues.js'
import { asyncEach } from '@vates/async-each'
import { asyncMap } from '@xen-orchestra/async-map'
import { chainVhd, openVhd } from 'vhd-lib'

import { createLogger } from '@xen-orchestra/log'
import { decorateClass } from '@vates/decorate-with'
import { defer } from 'golike-defer'
Expand Down Expand Up @@ -203,8 +204,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
let metadataContent = await this._isAlreadyTransferred(timestamp)
if (metadataContent !== undefined) {
// skip backup while being vigilant to not stuck the forked stream
Task.info('This backup has already been transfered')
Object.values(deltaExport.streams).forEach(stream => stream.destroy())
/** @todo destroy fork */
return { size: 0 }
}

Expand All @@ -224,32 +224,28 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
vtpms: deltaExport.vtpms,
}

const { size } = await Task.run({ name: 'transfer' }, async () => {
let transferSize = 0
await Task.run({ name: 'transfer' }, async () => {
await asyncEach(
Object.keys(deltaExport.vdis),
async id => {
const path = `${this._vmBackupDir}/${vhds[id]}`
// don't write it as transferSize += await async function
// since i += await asyncFun lead to race condition
// as explained : https://eslint.org/docs/latest/rules/require-atomic-updates
const transferSizeOneDisk = await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {

await adapter.writeVhd(path, deltaExport.disks[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._config.writeBlockConcurrency,
})
transferSize += transferSizeOneDisk
},
{
concurrency: settings.diskPerVmConcurrency,
}
)

return { size: transferSize }
return { size: 0 }
})
metadataContent.size = size
metadataContent.size = 0 /* todo */
this._metadataFileName = await adapter.writeVmBackupMetadata(vm.uuid, metadataContent)

// TODO: run cleanup?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ export class AbstractIncrementalWriter extends AbstractWriter {
return await this._transfer({ deltaExport, ...other })
} finally {
// ensure all streams are properly closed
for (const stream of Object.values(deltaExport.streams)) {
/* for (const stream of Object.values(deltaExport.streams)) {
stream.destroy()
}
} */
}
}
}
1 change: 1 addition & 0 deletions @xen-orchestra/backups/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@vates/nbd-client": "^3.1.2",
"@vates/parse-duration": "^0.1.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/disk-transform": "*",
"@xen-orchestra/fs": "^4.3.0",
"@xen-orchestra/log": "^0.7.1",
"@xen-orchestra/template": "^0.1.0",
Expand Down
2 changes: 1 addition & 1 deletion @xen-orchestra/disk-transform/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@xen-orchestra/disk-transform",
"version": "0.0.0",
"main": "index.js",
"main": "src/index.mts",
"license": "MIT",
"private": true,
"type": "module",
Expand Down
16 changes: 10 additions & 6 deletions @xen-orchestra/disk-transform/src/from/XapiVhdExport.mts
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ class VhdStreamGenerator extends DiskBlockGenerator {
this.#stream = stream
}
async #read(length: number): Promise<Buffer> {
assert.strictEqual(this.#busy, false, "Can't read/skip multiple block in parallel")
if (this.#busy) {
throw new Error("Can't read/skip multiple block in parallel")
}
this.#busy = true
const data = (await readChunkStrict(this.#stream, length)) as Buffer
this.#streamOffset += length
this.#busy = false
return data
}
async #skip(length: number): Promise<void> {
assert.strictEqual(this.#busy, false, "Can't read/skip multiple block in parallel")
if (this.#busy) {
throw new Error("Can't read/skip multiple block in parallel")
}
this.#busy = true
await skipStrict(this.#stream, length)
this.#streamOffset += length
Expand Down Expand Up @@ -75,10 +79,10 @@ class VhdStreamGenerator extends DiskBlockGenerator {

type XoVdi = {
uuid: Uuid
$exportContent: () => Promise<Readable>
$exportContent: (params: object) => Promise<Readable>
name_label: string
name_description: string
size: number
virtual_size: number
}
export class VhdRemote extends PortableDifferencingDisk {
#vdi: XoVdi
Expand All @@ -92,12 +96,12 @@ export class VhdRemote extends PortableDifferencingDisk {
id: vdi.uuid,
label: vdi.name_label,
description: vdi.name_label,
virtualSize: vdi.size,
virtualSize: vdi.virtual_size,
}
}

async getBlockIterator(): Promise<Disposable<DiskBlockGenerator>> {
const stream = await this.#vdi.$exportContent()
const stream = await this.#vdi.$exportContent({ preferNbd: false, format: 'vhd' })
const generator = new VhdStreamGenerator(stream)
return {
value: generator,
Expand Down
46 changes: 34 additions & 12 deletions @xen-orchestra/disk-transform/src/to/VhdRemote.mts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ import { createFooter, createHeader } from 'vhd-lib/_createFooterHeader.js'
import { unpackFooter, unpackHeader } from 'vhd-lib/Vhd/_utils.js'
import { DISK_TYPES, FOOTER_SIZE } from 'vhd-lib/_constants.js'
import _computeGeometryForSize from 'vhd-lib/_computeGeometryForSize.js'
import { asyncEach } from '@vates/async-each'

async function writeVhdToRemote(targetVhd: Vhd, disk: PortableDifferencingDisk): Promise<void> {
async function writeVhdToRemote(
targetVhd: Vhd,
disk: PortableDifferencingDisk,
{ writeBlockConcurrency = 0 } = {}
): Promise<void> {
return Disposable.use(disk.getBlockIterator(), async (blockIterator: DiskBlockGenerator): Promise<void> => {
// @todo : handle differencing disk parent

Expand All @@ -28,25 +33,42 @@ async function writeVhdToRemote(targetVhd: Vhd, disk: PortableDifferencingDisk):
targetVhd.header = unpackHeader(createHeader(Math.ceil(metada.virtualSize / (2 * 1024 * 1024))))
const bitmap = Buffer.alloc(255, 512)

for await (const block of blockIterator) {
await targetVhd.writeEntireBlock({
id: block.index,
bitmap,
data: block.data,
buffer: Buffer.concat([bitmap, block.data]),
})
}
await asyncEach(
blockIterator,
async block => {
console.log('start', block.index)
await targetVhd.writeEntireBlock({
id: block.index,
bitmap,
data: block.data,
buffer: Buffer.concat([bitmap, block.data]),
})
console.log('end', block.index)
process.stdout.write('.')
},
{
concurrency: writeBlockConcurrency,
}
)
await targetVhd.writeFooter()
await targetVhd.writeHeader()
await targetVhd.writeBlockAllocationTable()
})
}

export async function writeVhdFileToRemote(handler: FileAccessor, path: string, disk: PortableDifferencingDisk) {
return Disposable.use(VhdFile.create(handler, path), async (vhd: Vhd) => {
export async function writeVhdFileToRemote(
handler: FileAccessor,
path: string,
disk: PortableDifferencingDisk,
{ writeBlockConcurrency = 0, vhdClass = VhdFile } = {}
) {
console.log('will write ', { disk, path })

await Disposable.use(vhdClass.create(handler, path), async (vhd: Vhd) => {
// @todo : precompute target bat to ensure we can write the block without updating the bat at each block
return writeVhdToRemote(vhd, disk)
return writeVhdToRemote(vhd, disk, { writeBlockConcurrency: 1 })
})
console.log('written')
}

// @todo: vhddirectory
2 changes: 1 addition & 1 deletion @xen-orchestra/xapi/vdi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class Vdi {
baseParentType = await this.getField('VDI', baseRef, 'type')
} else {
// instantiate a full CBT
changedBlocks = Buffer.alloc(Math.ceil(size / 8), 255)
changedBlocks = Buffer.alloc(Math.ceil(size / 8 / 64 / 1024), 255)
}
}

Expand Down
Loading

0 comments on commit d27d1ca

Please sign in to comment.