From 0192ce026b523c286d4513ad066a65966c29c988 Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Tue, 17 Dec 2024 11:43:01 +0500 Subject: [PATCH 1/8] initial --- packages/internal/src/node/fs-stream.js | 34 ++-- packages/telemetry/src/block-slog.js | 148 ++++++++++++++++++ .../telemetry/src/context-aware-slog-file.js | 17 +- .../src/context-aware-slog-persistent-util.js | 39 +++++ .../telemetry/src/otel-context-aware-slog.js | 40 +---- 5 files changed, 224 insertions(+), 54 deletions(-) create mode 100644 packages/telemetry/src/block-slog.js create mode 100644 packages/telemetry/src/context-aware-slog-persistent-util.js diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index 5d3da5edafe..eff1defd33e 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -47,15 +47,15 @@ const noPath = /** @type {import('fs').PathLike} */ ( /** @typedef {NonNullable>>} FsStreamWriter */ /** @param {string | undefined | null} filePath */ export const makeFsStreamWriter = async filePath => { - if (!filePath) { - return undefined; - } + if (!filePath) return undefined; - const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined); + const isStdOutStream = filePath === '-'; - const stream = handle - ? createWriteStream(noPath, { fd: handle.fd }) - : process.stdout; + const handle = await (isStdOutStream ? undefined : open(filePath, 'a')); + + const stream = isStdOutStream + ? process.stdout + : createWriteStream(noPath, { autoClose: false, fd: handle.fd }); await fsStreamReady(stream); let flushed = Promise.resolve(); @@ -95,20 +95,24 @@ export const makeFsStreamWriter = async filePath => { const flush = async () => { await flushed; - await handle?.sync().catch(err => { - if (err.code === 'EINVAL') { - return; - } - throw err; - }); + if (!isStdOutStream) + await handle.sync().catch(err => { + if (err.code === 'EINVAL') { + return; + } + throw err; + }); }; const close = async () => { // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); - // @ts-expect-error calling a possibly missing method - stream.close?.(); + + if (!isStdOutStream) { + await new Promise(resolve => stream.end(resolve)); + await handle?.close(); + } }; stream.on('error', err => updateFlushed(Promise.reject(err))); diff --git a/packages/telemetry/src/block-slog.js b/packages/telemetry/src/block-slog.js new file mode 100644 index 00000000000..3ecb21aeab4 --- /dev/null +++ b/packages/telemetry/src/block-slog.js @@ -0,0 +1,148 @@ +/* eslint-env node */ + +import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js'; +import { makeContextualSlogProcessor } from './context-aware-slog.js'; +import getContextFilePersistenceUtils, { + DEFAULT_CONTEXT_FILE, +} from './context-aware-slog-persistent-util.js'; +import { serializeSlogObj } from './serialize-slog-obj.js'; + +/** + * @typedef {import('./context-aware-slog.js').Slog} Slog + * @typedef {ReturnType>} ContextSlog + */ + +const BLOCK_STREAM_HANDLERS_WINDOW = 5; +const CLEANUP_INTERVAL = 20000; +const DEFAULT_BLOCK_HEIGHT = -1; + +/** + * @param {import('./index.js').MakeSlogSenderOptions} options + */ +export const makeSlogSender = async options => { + const { CHAIN_ID } = options.env || {}; + if (!options.stateDir) + return console.error( + 'Ignoring invocation of slogger "block-slog" without the presence of "stateDir"', + ); + + let currentBlock = DEFAULT_BLOCK_HEIGHT; + /** + * @type {NodeJS.Timeout} + */ + let cleanupRef; + /** + * @type {{[key: string]: Awaited>}} + */ + const streamHandlers = {}; + + /** + * @type {{[key: string]: ReturnType}} + */ + const streamCreationPromises = {}; + + const persistenceUtils = getContextFilePersistenceUtils( + process.env.SLOG_CONTEXT_FILE_PATH || + `${options.stateDir}/${DEFAULT_CONTEXT_FILE}`, + ); + + /** + * @param {Awaited>} stream + */ + const closeFileStream = async stream => { + if (!stream) return console.error('Trying to close a null stream'); + + await stream.close(); + }; + + const contextualSlogProcessor = makeContextualSlogProcessor( + { 'chain-id': CHAIN_ID }, + persistenceUtils, + ); + + /** + * @param {ContextSlog['attributes']['block.height']} blockHeight + * @param {import('./index.js').MakeSlogSenderOptions['stateDir']} directory + * @param {ContextSlog['time']} time + */ + const createBlockStream = async (blockHeight, directory, time) => { + if (blockHeight === undefined) + throw Error('Block Height required for creating the write stream'); + + const fileName = `${directory}/${blockHeight}-${time}.json`; + const stream = await makeFsStreamWriter( + `${directory}/${blockHeight}-${time}.json`, + ); + + if (!stream) + throw Error(`Couldn't create a write stream on file "${fileName}"`); + + streamHandlers[String(blockHeight)] = stream; + }; + + const regularCleanup = async () => { + if (currentBlock !== DEFAULT_BLOCK_HEIGHT) + await Promise.all( + Object.keys(streamHandlers).map(async streamIdentifier => { + if ( + Number(streamIdentifier) < + currentBlock - BLOCK_STREAM_HANDLERS_WINDOW || + Number(streamIdentifier) > + currentBlock + BLOCK_STREAM_HANDLERS_WINDOW + ) { + await closeFileStream(streamHandlers[streamIdentifier]); + delete streamHandlers[streamIdentifier]; + delete streamCreationPromises[streamIdentifier]; + } + }), + ); + + cleanupRef = setTimeout(regularCleanup, CLEANUP_INTERVAL); + }; + + await regularCleanup(); + + /** + * @param {import('./context-aware-slog.js').Slog} slog + */ + const slogSender = async slog => { + const contextualSlog = contextualSlogProcessor(slog); + const blockHeight = contextualSlog.attributes['block.height']; + const blockHeightString = String(blockHeight); + + if (blockHeight !== undefined && currentBlock !== blockHeight) { + if (!(blockHeightString in streamHandlers)) { + if (!(blockHeightString in streamCreationPromises)) + streamCreationPromises[blockHeightString] = createBlockStream( + blockHeight, + options.stateDir, + contextualSlog.time, + ); + + await streamCreationPromises[blockHeightString]; + } + + currentBlock = blockHeight; + } + + if (currentBlock !== DEFAULT_BLOCK_HEIGHT) { + const stream = streamHandlers[String(currentBlock)]; + if (!stream) + throw Error(`Stream not found for block height ${currentBlock}`); + + stream.write(serializeSlogObj(contextualSlog) + '\n').catch(() => {}); + } + }; + + return Object.assign(slogSender, { + forceFlush: () => streamHandlers[String(currentBlock)]?.flush(), + shutdown: () => { + clearTimeout(cleanupRef); + return Promise.all( + Object.entries(streamHandlers).map(([, stream]) => + closeFileStream(stream), + ), + ); + }, + }); +}; diff --git a/packages/telemetry/src/context-aware-slog-file.js b/packages/telemetry/src/context-aware-slog-file.js index 003680fc2bd..59ca977451c 100644 --- a/packages/telemetry/src/context-aware-slog-file.js +++ b/packages/telemetry/src/context-aware-slog-file.js @@ -2,6 +2,9 @@ import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js'; import { makeContextualSlogProcessor } from './context-aware-slog.js'; +import getContextFilePersistenceUtils, { + DEFAULT_CONTEXT_FILE, +} from './context-aware-slog-persistent-util.js'; import { serializeSlogObj } from './serialize-slog-obj.js'; /** @@ -21,9 +24,17 @@ export const makeSlogSender = async options => { `Couldn't create a write stream on file "${CONTEXTUAL_SLOGFILE}"`, ); - const contextualSlogProcessor = makeContextualSlogProcessor({ - 'chain-id': CHAIN_ID, - }); + const persistenceUtils = getContextFilePersistenceUtils( + process.env.SLOG_CONTEXT_FILE_PATH || + `${options.stateDir}/${DEFAULT_CONTEXT_FILE}`, + ); + + const contextualSlogProcessor = makeContextualSlogProcessor( + { + 'chain-id': CHAIN_ID, + }, + persistenceUtils, + ); /** * @param {import('./context-aware-slog.js').Slog} slog diff --git a/packages/telemetry/src/context-aware-slog-persistent-util.js b/packages/telemetry/src/context-aware-slog-persistent-util.js new file mode 100644 index 00000000000..a32e26a33ea --- /dev/null +++ b/packages/telemetry/src/context-aware-slog-persistent-util.js @@ -0,0 +1,39 @@ +import { readFileSync, writeFileSync } from 'fs'; +import { serializeSlogObj } from './serialize-slog-obj.js'; + +export const DEFAULT_CONTEXT_FILE = 'slog-context.json'; +const FILE_ENCODING = 'utf8'; + +/** + * @param {string} filePath + */ +const getContextFilePersistenceUtils = filePath => { + console.warn(`Using file ${filePath} for slogger context`); + + return { + /** + * @param {import('./context-aware-slog.js').Context} context + */ + persistContext: context => { + try { + writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING); + } catch (err) { + console.error('Error writing context to file: ', err); + } + }, + + /** + * @returns {import('./context-aware-slog.js').Context | null} + */ + restoreContext: () => { + try { + return JSON.parse(readFileSync(filePath, FILE_ENCODING)); + } catch (parseErr) { + console.error('Error reading context from file: ', parseErr); + return null; + } + }, + }; +}; + +export default getContextFilePersistenceUtils; diff --git a/packages/telemetry/src/otel-context-aware-slog.js b/packages/telemetry/src/otel-context-aware-slog.js index 271c13da787..9145d036437 100644 --- a/packages/telemetry/src/otel-context-aware-slog.js +++ b/packages/telemetry/src/otel-context-aware-slog.js @@ -1,4 +1,5 @@ /* eslint-env node */ + import { logs, SeverityNumber } from '@opentelemetry/api-logs'; import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http'; import { Resource } from '@opentelemetry/resources'; @@ -6,46 +7,13 @@ import { LoggerProvider, SimpleLogRecordProcessor, } from '@opentelemetry/sdk-logs'; -import { readFileSync, writeFileSync } from 'fs'; import { makeContextualSlogProcessor } from './context-aware-slog.js'; +import getContextFilePersistenceUtils, { + DEFAULT_CONTEXT_FILE, +} from './context-aware-slog-persistent-util.js'; import { getResourceAttributes } from './index.js'; import { serializeSlogObj } from './serialize-slog-obj.js'; -const DEFAULT_CONTEXT_FILE = 'slog-context.json'; -const FILE_ENCODING = 'utf8'; - -/** - * @param {string} filePath - */ -export const getContextFilePersistenceUtils = filePath => { - console.warn(`Using file ${filePath} for slogger context`); - - return { - /** - * @param {import('./context-aware-slog.js').Context} context - */ - persistContext: context => { - try { - writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING); - } catch (err) { - console.error('Error writing context to file: ', err); - } - }, - - /** - * @returns {import('./context-aware-slog.js').Context | null} - */ - restoreContext: () => { - try { - return JSON.parse(readFileSync(filePath, FILE_ENCODING)); - } catch (parseErr) { - console.error('Error reading context from file: ', parseErr); - return null; - } - }, - }; -}; - /** * @param {import('./index.js').MakeSlogSenderOptions} options */ From b996b3776b70da0b25d0e4f156b1cab26fe25848 Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Tue, 21 Jan 2025 21:44:24 +0500 Subject: [PATCH 2/8] kernel init logs file --- packages/telemetry/src/block-slog.js | 158 ++++++++----------- packages/telemetry/src/context-aware-slog.js | 2 +- 2 files changed, 63 insertions(+), 97 deletions(-) diff --git a/packages/telemetry/src/block-slog.js b/packages/telemetry/src/block-slog.js index 3ecb21aeab4..7893649d1c3 100644 --- a/packages/telemetry/src/block-slog.js +++ b/packages/telemetry/src/block-slog.js @@ -1,7 +1,10 @@ /* eslint-env node */ import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js'; -import { makeContextualSlogProcessor } from './context-aware-slog.js'; +import { + makeContextualSlogProcessor, + SLOG_TYPES, +} from './context-aware-slog.js'; import getContextFilePersistenceUtils, { DEFAULT_CONTEXT_FILE, } from './context-aware-slog-persistent-util.js'; @@ -12,137 +15,100 @@ import { serializeSlogObj } from './serialize-slog-obj.js'; * @typedef {ReturnType>} ContextSlog */ -const BLOCK_STREAM_HANDLERS_WINDOW = 5; -const CLEANUP_INTERVAL = 20000; -const DEFAULT_BLOCK_HEIGHT = -1; - /** * @param {import('./index.js').MakeSlogSenderOptions} options */ export const makeSlogSender = async options => { - const { CHAIN_ID } = options.env || {}; - if (!options.stateDir) + const { CHAIN_ID, CONTEXTUAL_BLOCK_SLOGS } = options.env || {}; + if (!(options.stateDir || CONTEXTUAL_BLOCK_SLOGS)) return console.error( - 'Ignoring invocation of slogger "block-slog" without the presence of "stateDir"', + 'Ignoring invocation of slogger "block-slog" without the presence of "stateDir" or "CONTEXTUAL_BLOCK_SLOGS"', ); - let currentBlock = DEFAULT_BLOCK_HEIGHT; - /** - * @type {NodeJS.Timeout} - */ - let cleanupRef; - /** - * @type {{[key: string]: Awaited>}} - */ - const streamHandlers = {}; - - /** - * @type {{[key: string]: ReturnType}} - */ - const streamCreationPromises = {}; - const persistenceUtils = getContextFilePersistenceUtils( process.env.SLOG_CONTEXT_FILE_PATH || `${options.stateDir}/${DEFAULT_CONTEXT_FILE}`, ); - /** - * @param {Awaited>} stream - */ - const closeFileStream = async stream => { - if (!stream) return console.error('Trying to close a null stream'); - - await stream.close(); - }; - const contextualSlogProcessor = makeContextualSlogProcessor( { 'chain-id': CHAIN_ID }, persistenceUtils, ); - /** - * @param {ContextSlog['attributes']['block.height']} blockHeight - * @param {import('./index.js').MakeSlogSenderOptions['stateDir']} directory - * @param {ContextSlog['time']} time + * @type {ReturnType | null} */ - const createBlockStream = async (blockHeight, directory, time) => { - if (blockHeight === undefined) - throw Error('Block Height required for creating the write stream'); - - const fileName = `${directory}/${blockHeight}-${time}.json`; - const stream = await makeFsStreamWriter( - `${directory}/${blockHeight}-${time}.json`, - ); + let createFileStreamPromise = null; + /** + * @type {Awaited> | null} + */ + let currentStream = null; - if (!stream) - throw Error(`Couldn't create a write stream on file "${fileName}"`); + const closeStream = async () => { + await new Promise(resolve => resolve(null)); - streamHandlers[String(blockHeight)] = stream; + if (currentStream) { + await currentStream.close(); + currentStream = null; + } else console.error('No stream to close'); }; - const regularCleanup = async () => { - if (currentBlock !== DEFAULT_BLOCK_HEIGHT) - await Promise.all( - Object.keys(streamHandlers).map(async streamIdentifier => { - if ( - Number(streamIdentifier) < - currentBlock - BLOCK_STREAM_HANDLERS_WINDOW || - Number(streamIdentifier) > - currentBlock + BLOCK_STREAM_HANDLERS_WINDOW - ) { - await closeFileStream(streamHandlers[streamIdentifier]); - delete streamHandlers[streamIdentifier]; - delete streamCreationPromises[streamIdentifier]; - } - }), - ); - - cleanupRef = setTimeout(regularCleanup, CLEANUP_INTERVAL); + /** + * @param {string} fileName + */ + const createFileStream = async fileName => { + const filePath = `${options.stateDir || CONTEXTUAL_BLOCK_SLOGS}/slogfile_${fileName}.jsonl`; + currentStream = await makeFsStreamWriter(filePath); + + if (!currentStream) + throw Error(`Couldn't create a write stream on file "${filePath}"`); }; - await regularCleanup(); + const ignore = () => {}; /** * @param {import('./context-aware-slog.js').Slog} slog */ const slogSender = async slog => { - const contextualSlog = contextualSlogProcessor(slog); - const blockHeight = contextualSlog.attributes['block.height']; - const blockHeightString = String(blockHeight); - - if (blockHeight !== undefined && currentBlock !== blockHeight) { - if (!(blockHeightString in streamHandlers)) { - if (!(blockHeightString in streamCreationPromises)) - streamCreationPromises[blockHeightString] = createBlockStream( - blockHeight, - options.stateDir, - contextualSlog.time, - ); - - await streamCreationPromises[blockHeightString]; - } + await new Promise(resolve => resolve(null)); - currentBlock = blockHeight; + const { type: slogType } = slog; + + switch (slogType) { + case SLOG_TYPES.KERNEL.INIT.START: { + createFileStreamPromise = createFileStream( + `init_${new Date().getTime()}`, + ); + break; + } + default: { + break; + } } - if (currentBlock !== DEFAULT_BLOCK_HEIGHT) { - const stream = streamHandlers[String(currentBlock)]; - if (!stream) - throw Error(`Stream not found for block height ${currentBlock}`); + const contextualSlog = contextualSlogProcessor(slog); + + if (createFileStreamPromise) await createFileStreamPromise; + createFileStreamPromise = null; + + if (currentStream) + currentStream // eslint-disable-next-line prefer-template + .write(serializeSlogObj(contextualSlog) + '\n') + .catch(ignore); + else console.error(`No stream found for logging slog "${slogType}"`); - stream.write(serializeSlogObj(contextualSlog) + '\n').catch(() => {}); + switch (slogType) { + case SLOG_TYPES.KERNEL.INIT.FINISH: { + await closeStream(); + break; + } + default: { + break; + } } }; return Object.assign(slogSender, { - forceFlush: () => streamHandlers[String(currentBlock)]?.flush(), - shutdown: () => { - clearTimeout(cleanupRef); - return Promise.all( - Object.entries(streamHandlers).map(([, stream]) => - closeFileStream(stream), - ), - ); - }, + forceFlush: () => currentStream?.flush(), + shutdown: closeStream, }); }; diff --git a/packages/telemetry/src/context-aware-slog.js b/packages/telemetry/src/context-aware-slog.js index ee17acf3dc0..0575862fa4b 100644 --- a/packages/telemetry/src/context-aware-slog.js +++ b/packages/telemetry/src/context-aware-slog.js @@ -49,7 +49,7 @@ * }} Slog */ -const SLOG_TYPES = { +export const SLOG_TYPES = { CLIST: 'clist', CONSOLE: 'console', COSMIC_SWINGSET: { From 50b3490c3ece677eda144cd69ad318d270b725f0 Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Wed, 22 Jan 2025 13:16:18 +0500 Subject: [PATCH 3/8] bootstrap block --- packages/internal/src/node/fs-stream.js | 2 +- packages/telemetry/src/block-slog.js | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index eff1defd33e..aea2d53ccae 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -117,5 +117,5 @@ export const makeFsStreamWriter = async filePath => { stream.on('error', err => updateFlushed(Promise.reject(err))); - return harden({ write, flush, close }); + return harden({ close, filePath, flush, write }); }; diff --git a/packages/telemetry/src/block-slog.js b/packages/telemetry/src/block-slog.js index 7893649d1c3..5d4b129c440 100644 --- a/packages/telemetry/src/block-slog.js +++ b/packages/telemetry/src/block-slog.js @@ -43,12 +43,14 @@ export const makeSlogSender = async options => { */ let currentStream = null; - const closeStream = async () => { - await new Promise(resolve => resolve(null)); - + /** + * Immediately frees the `currentStream` assignment and lazily closes the open file stream + */ + const closeStream = () => { if (currentStream) { - await currentStream.close(); + const streamClosePromise = currentStream.close(); currentStream = null; + return streamClosePromise; } else console.error('No stream to close'); }; @@ -56,6 +58,9 @@ export const makeSlogSender = async options => { * @param {string} fileName */ const createFileStream = async fileName => { + if (currentStream) + throw Error(`Stream already open on file ${currentStream.filePath}`); + const filePath = `${options.stateDir || CONTEXTUAL_BLOCK_SLOGS}/slogfile_${fileName}.jsonl`; currentStream = await makeFsStreamWriter(filePath); @@ -80,6 +85,12 @@ export const makeSlogSender = async options => { ); break; } + case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.START: { + createFileStreamPromise = createFileStream( + `bootstrap_${new Date().getTime()}`, + ); + break; + } default: { break; } @@ -97,8 +108,9 @@ export const makeSlogSender = async options => { else console.error(`No stream found for logging slog "${slogType}"`); switch (slogType) { - case SLOG_TYPES.KERNEL.INIT.FINISH: { - await closeStream(); + case SLOG_TYPES.KERNEL.INIT.FINISH: + case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.FINISH: { + closeStream()?.catch(ignore); break; } default: { From d2f94a30756a1eda0bed7d9bd5bc0f3416f68ec6 Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Wed, 22 Jan 2025 15:19:33 +0500 Subject: [PATCH 4/8] block slogs --- packages/telemetry/src/block-slog.js | 48 ++++++++++++++++---- packages/telemetry/src/context-aware-slog.js | 4 +- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/packages/telemetry/src/block-slog.js b/packages/telemetry/src/block-slog.js index 5d4b129c440..0d1b8f209b4 100644 --- a/packages/telemetry/src/block-slog.js +++ b/packages/telemetry/src/block-slog.js @@ -42,6 +42,10 @@ export const makeSlogSender = async options => { * @type {Awaited> | null} */ let currentStream = null; + /** + * @type {Awaited> | null} + */ + let lastBlockStream = null; /** * Immediately frees the `currentStream` assignment and lazily closes the open file stream @@ -68,15 +72,13 @@ export const makeSlogSender = async options => { throw Error(`Couldn't create a write stream on file "${filePath}"`); }; - const ignore = () => {}; - /** * @param {import('./context-aware-slog.js').Slog} slog */ const slogSender = async slog => { await new Promise(resolve => resolve(null)); - const { type: slogType } = slog; + const { blockHeight, type: slogType } = slog; switch (slogType) { case SLOG_TYPES.KERNEL.INIT.START: { @@ -91,6 +93,12 @@ export const makeSlogSender = async options => { ); break; } + case SLOG_TYPES.COSMIC_SWINGSET.BEGIN_BLOCK: { + createFileStreamPromise = createFileStream( + `block_${blockHeight}_${new Date().getTime()}`, + ); + break; + } default: { break; } @@ -101,16 +109,29 @@ export const makeSlogSender = async options => { if (createFileStreamPromise) await createFileStreamPromise; createFileStreamPromise = null; - if (currentStream) - currentStream // eslint-disable-next-line prefer-template - .write(serializeSlogObj(contextualSlog) + '\n') - .catch(ignore); - else console.error(`No stream found for logging slog "${slogType}"`); + writeSlogToStream( + contextualSlog, + slogType === SLOG_TYPES.COSMIC_SWINGSET.AFTER_COMMIT_STATS + ? lastBlockStream + : currentStream, + )?.catch(console.error); switch (slogType) { case SLOG_TYPES.KERNEL.INIT.FINISH: case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.FINISH: { - closeStream()?.catch(ignore); + closeStream()?.catch(console.error); + break; + } + case SLOG_TYPES.COSMIC_SWINGSET.COMMIT.FINISH: { + lastBlockStream = currentStream; + currentStream = null; + break; + } + case SLOG_TYPES.COSMIC_SWINGSET.AFTER_COMMIT_STATS: { + lastBlockStream + ?.close() + .catch(console.error) + .finally(() => (lastBlockStream = null)); break; } default: { @@ -119,6 +140,15 @@ export const makeSlogSender = async options => { } }; + /** + * @param {ReturnType} slog + * @param {Awaited> | null} stream + */ + const writeSlogToStream = (slog, stream) => + !stream + ? console.error(`No stream available for slog type "${slog.body.type}"`) // eslint-disable-next-line prefer-template + : stream.write(serializeSlogObj(slog) + '\n'); + return Object.assign(slogSender, { forceFlush: () => currentStream?.flush(), shutdown: closeStream, diff --git a/packages/telemetry/src/context-aware-slog.js b/packages/telemetry/src/context-aware-slog.js index 0575862fa4b..f15878e6d20 100644 --- a/packages/telemetry/src/context-aware-slog.js +++ b/packages/telemetry/src/context-aware-slog.js @@ -60,8 +60,8 @@ export const SLOG_TYPES = { START: 'cosmic-swingset-bootstrap-block-start', }, COMMIT: { - FINISH: 'cosmic-swingset-commit-finish', - START: 'cosmic-swingset-commit-start', + FINISH: 'cosmic-swingset-commit-block-finish', + START: 'cosmic-swingset-commit-block-start', }, END_BLOCK: { FINISH: 'cosmic-swingset-end-block-finish', From 0f645ee0609701579b1267e1d5cfa6528026be3e Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Wed, 22 Jan 2025 18:14:59 +0500 Subject: [PATCH 5/8] lint --- packages/internal/src/node/fs-stream.js | 18 ++++++-------- packages/telemetry/src/block-slog.js | 33 +++++-------------------- 2 files changed, 14 insertions(+), 37 deletions(-) diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index aea2d53ccae..b2131489916 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -49,13 +49,11 @@ const noPath = /** @type {import('fs').PathLike} */ ( export const makeFsStreamWriter = async filePath => { if (!filePath) return undefined; - const isStdOutStream = filePath === '-'; + const handle = await (filePath === '-' ? undefined : open(filePath, 'a')); - const handle = await (isStdOutStream ? undefined : open(filePath, 'a')); - - const stream = isStdOutStream - ? process.stdout - : createWriteStream(noPath, { autoClose: false, fd: handle.fd }); + const stream = handle + ? createWriteStream(noPath, { autoClose: false, fd: handle.fd }) + : process.stdout; await fsStreamReady(stream); let flushed = Promise.resolve(); @@ -95,7 +93,7 @@ export const makeFsStreamWriter = async filePath => { const flush = async () => { await flushed; - if (!isStdOutStream) + if (handle) await handle.sync().catch(err => { if (err.code === 'EINVAL') { return; @@ -109,9 +107,9 @@ export const makeFsStreamWriter = async filePath => { closed = true; await flush(); - if (!isStdOutStream) { - await new Promise(resolve => stream.end(resolve)); - await handle?.close(); + if (handle) { + await new Promise(resolve => stream.end(() => resolve(null))); + await handle.close(); } }; diff --git a/packages/telemetry/src/block-slog.js b/packages/telemetry/src/block-slog.js index 0d1b8f209b4..5941d69fcce 100644 --- a/packages/telemetry/src/block-slog.js +++ b/packages/telemetry/src/block-slog.js @@ -42,10 +42,6 @@ export const makeSlogSender = async options => { * @type {Awaited> | null} */ let currentStream = null; - /** - * @type {Awaited> | null} - */ - let lastBlockStream = null; /** * Immediately frees the `currentStream` assignment and lazily closes the open file stream @@ -109,29 +105,13 @@ export const makeSlogSender = async options => { if (createFileStreamPromise) await createFileStreamPromise; createFileStreamPromise = null; - writeSlogToStream( - contextualSlog, - slogType === SLOG_TYPES.COSMIC_SWINGSET.AFTER_COMMIT_STATS - ? lastBlockStream - : currentStream, - )?.catch(console.error); + writeSlogToStream(contextualSlog)?.catch(console.error); switch (slogType) { case SLOG_TYPES.KERNEL.INIT.FINISH: - case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.FINISH: { - closeStream()?.catch(console.error); - break; - } - case SLOG_TYPES.COSMIC_SWINGSET.COMMIT.FINISH: { - lastBlockStream = currentStream; - currentStream = null; - break; - } + case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.FINISH: case SLOG_TYPES.COSMIC_SWINGSET.AFTER_COMMIT_STATS: { - lastBlockStream - ?.close() - .catch(console.error) - .finally(() => (lastBlockStream = null)); + closeStream()?.catch(console.error); break; } default: { @@ -142,12 +122,11 @@ export const makeSlogSender = async options => { /** * @param {ReturnType} slog - * @param {Awaited> | null} stream */ - const writeSlogToStream = (slog, stream) => - !stream + const writeSlogToStream = slog => + !currentStream ? console.error(`No stream available for slog type "${slog.body.type}"`) // eslint-disable-next-line prefer-template - : stream.write(serializeSlogObj(slog) + '\n'); + : currentStream.write(serializeSlogObj(slog) + '\n'); return Object.assign(slogSender, { forceFlush: () => currentStream?.flush(), From 26f17f96def5b935348e8240ac550dd7be9a370e Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Mon, 10 Feb 2025 19:40:56 +0500 Subject: [PATCH 6/8] remove async slog sender + revert other changes --- packages/internal/src/node/fs-stream.js | 30 ++--- packages/telemetry/src/block-slog.js | 124 ++++++++++-------- .../telemetry/src/context-aware-slog-file.js | 17 +-- .../src/context-aware-slog-persistent-util.js | 39 ------ packages/telemetry/src/context-aware-slog.js | 6 +- .../telemetry/src/otel-context-aware-slog.js | 40 +++++- 6 files changed, 125 insertions(+), 131 deletions(-) delete mode 100644 packages/telemetry/src/context-aware-slog-persistent-util.js diff --git a/packages/internal/src/node/fs-stream.js b/packages/internal/src/node/fs-stream.js index b2131489916..5d3da5edafe 100644 --- a/packages/internal/src/node/fs-stream.js +++ b/packages/internal/src/node/fs-stream.js @@ -47,12 +47,14 @@ const noPath = /** @type {import('fs').PathLike} */ ( /** @typedef {NonNullable>>} FsStreamWriter */ /** @param {string | undefined | null} filePath */ export const makeFsStreamWriter = async filePath => { - if (!filePath) return undefined; + if (!filePath) { + return undefined; + } - const handle = await (filePath === '-' ? undefined : open(filePath, 'a')); + const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined); const stream = handle - ? createWriteStream(noPath, { autoClose: false, fd: handle.fd }) + ? createWriteStream(noPath, { fd: handle.fd }) : process.stdout; await fsStreamReady(stream); @@ -93,27 +95,23 @@ export const makeFsStreamWriter = async filePath => { const flush = async () => { await flushed; - if (handle) - await handle.sync().catch(err => { - if (err.code === 'EINVAL') { - return; - } - throw err; - }); + await handle?.sync().catch(err => { + if (err.code === 'EINVAL') { + return; + } + throw err; + }); }; const close = async () => { // TODO: Consider creating a single Error here to use a write rejection closed = true; await flush(); - - if (handle) { - await new Promise(resolve => stream.end(() => resolve(null))); - await handle.close(); - } + // @ts-expect-error calling a possibly missing method + stream.close?.(); }; stream.on('error', err => updateFlushed(Promise.reject(err))); - return harden({ close, filePath, flush, write }); + return harden({ write, flush, close }); }; diff --git a/packages/telemetry/src/block-slog.js b/packages/telemetry/src/block-slog.js index 5941d69fcce..2efa38f5233 100644 --- a/packages/telemetry/src/block-slog.js +++ b/packages/telemetry/src/block-slog.js @@ -1,97 +1,100 @@ /* eslint-env node */ -import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js'; -import { - makeContextualSlogProcessor, - SLOG_TYPES, -} from './context-aware-slog.js'; -import getContextFilePersistenceUtils, { - DEFAULT_CONTEXT_FILE, -} from './context-aware-slog-persistent-util.js'; +import { open } from 'node:fs/promises'; +import { SLOG_TYPES } from './context-aware-slog.js'; import { serializeSlogObj } from './serialize-slog-obj.js'; /** * @typedef {import('./context-aware-slog.js').Slog} Slog - * @typedef {ReturnType>} ContextSlog */ /** * @param {import('./index.js').MakeSlogSenderOptions} options */ export const makeSlogSender = async options => { - const { CHAIN_ID, CONTEXTUAL_BLOCK_SLOGS } = options.env || {}; + const { CONTEXTUAL_BLOCK_SLOGS } = options.env || {}; if (!(options.stateDir || CONTEXTUAL_BLOCK_SLOGS)) return console.error( 'Ignoring invocation of slogger "block-slog" without the presence of "stateDir" or "CONTEXTUAL_BLOCK_SLOGS"', ); - const persistenceUtils = getContextFilePersistenceUtils( - process.env.SLOG_CONTEXT_FILE_PATH || - `${options.stateDir}/${DEFAULT_CONTEXT_FILE}`, - ); - - const contextualSlogProcessor = makeContextualSlogProcessor( - { 'chain-id': CHAIN_ID }, - persistenceUtils, - ); + let chainedPromises = Promise.resolve(); /** - * @type {ReturnType | null} + * @type {import('node:fs/promises').FileHandle | null} */ - let createFileStreamPromise = null; + let currentFileHandle = null; /** - * @type {Awaited> | null} + * @type {import('node:fs').WriteStream | null} */ let currentStream = null; /** - * Immediately frees the `currentStream` assignment and lazily closes the open file stream + * @param {Array<() => Promise>} promises */ - const closeStream = () => { - if (currentStream) { - const streamClosePromise = currentStream.close(); - currentStream = null; - return streamClosePromise; - } else console.error('No stream to close'); + const chainPromises = (...promises) => + // eslint-disable-next-line github/array-foreach + promises.forEach( + promise => (chainedPromises = chainedPromises.then(promise)), + ); + + const closeStream = async () => { + if (currentStream) + return new Promise(resolve => + currentStream?.close(err => { + if (err) console.error("Couldn't close stream due to error: ", err); + resolve(undefined); + }), + ) + .then(() => { + currentStream = null; + }) + .then(() => currentFileHandle?.close()) + .then(() => { + currentFileHandle = null; + }); + else { + console.error('No stream to close'); + return Promise.resolve(); + } }; /** * @param {string} fileName */ const createFileStream = async fileName => { - if (currentStream) - throw Error(`Stream already open on file ${currentStream.filePath}`); + if (currentStream) throw Error('Stream already open'); const filePath = `${options.stateDir || CONTEXTUAL_BLOCK_SLOGS}/slogfile_${fileName}.jsonl`; - currentStream = await makeFsStreamWriter(filePath); + currentFileHandle = await open(filePath, 'w'); + currentStream = currentFileHandle.createWriteStream({ + autoClose: true, + encoding: 'utf-8', + }); if (!currentStream) throw Error(`Couldn't create a write stream on file "${filePath}"`); }; /** - * @param {import('./context-aware-slog.js').Slog} slog + * @param {Slog} slog */ - const slogSender = async slog => { - await new Promise(resolve => resolve(null)); - + const slogSender = slog => { const { blockHeight, type: slogType } = slog; switch (slogType) { case SLOG_TYPES.KERNEL.INIT.START: { - createFileStreamPromise = createFileStream( - `init_${new Date().getTime()}`, - ); + chainPromises(() => createFileStream(`init_${new Date().getTime()}`)); break; } case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.START: { - createFileStreamPromise = createFileStream( - `bootstrap_${new Date().getTime()}`, + chainPromises(() => + createFileStream(`bootstrap_${new Date().getTime()}`), ); break; } case SLOG_TYPES.COSMIC_SWINGSET.BEGIN_BLOCK: { - createFileStreamPromise = createFileStream( - `block_${blockHeight}_${new Date().getTime()}`, + chainPromises(() => + createFileStream(`block_${blockHeight}_${new Date().getTime()}`), ); break; } @@ -100,18 +103,13 @@ export const makeSlogSender = async options => { } } - const contextualSlog = contextualSlogProcessor(slog); - - if (createFileStreamPromise) await createFileStreamPromise; - createFileStreamPromise = null; - - writeSlogToStream(contextualSlog)?.catch(console.error); + chainPromises(() => writeSlogToStream(slog)); switch (slogType) { case SLOG_TYPES.KERNEL.INIT.FINISH: case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.FINISH: case SLOG_TYPES.COSMIC_SWINGSET.AFTER_COMMIT_STATS: { - closeStream()?.catch(console.error); + chainPromises(closeStream); break; } default: { @@ -121,15 +119,31 @@ export const makeSlogSender = async options => { }; /** - * @param {ReturnType} slog + * @param {Slog} slog + * @returns {Promise} */ const writeSlogToStream = slog => - !currentStream - ? console.error(`No stream available for slog type "${slog.body.type}"`) // eslint-disable-next-line prefer-template - : currentStream.write(serializeSlogObj(slog) + '\n'); + new Promise(resolve => { + if (!currentStream) { + console.error(`No stream available for slog type "${slog.type}"`); + resolve(); + } else { + // eslint-disable-next-line prefer-template + const message = serializeSlogObj(slog) + '\n'; + + const wrote = currentStream.write(message); + if (!wrote) { + console.warn('Stream full, waiting for drain'); + currentStream.once('drain', () => { + currentStream?.write(message); + resolve(); + }); + } else resolve(); + } + }); return Object.assign(slogSender, { - forceFlush: () => currentStream?.flush(), + forceFlush: () => chainedPromises, shutdown: closeStream, }); }; diff --git a/packages/telemetry/src/context-aware-slog-file.js b/packages/telemetry/src/context-aware-slog-file.js index 59ca977451c..003680fc2bd 100644 --- a/packages/telemetry/src/context-aware-slog-file.js +++ b/packages/telemetry/src/context-aware-slog-file.js @@ -2,9 +2,6 @@ import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js'; import { makeContextualSlogProcessor } from './context-aware-slog.js'; -import getContextFilePersistenceUtils, { - DEFAULT_CONTEXT_FILE, -} from './context-aware-slog-persistent-util.js'; import { serializeSlogObj } from './serialize-slog-obj.js'; /** @@ -24,17 +21,9 @@ export const makeSlogSender = async options => { `Couldn't create a write stream on file "${CONTEXTUAL_SLOGFILE}"`, ); - const persistenceUtils = getContextFilePersistenceUtils( - process.env.SLOG_CONTEXT_FILE_PATH || - `${options.stateDir}/${DEFAULT_CONTEXT_FILE}`, - ); - - const contextualSlogProcessor = makeContextualSlogProcessor( - { - 'chain-id': CHAIN_ID, - }, - persistenceUtils, - ); + const contextualSlogProcessor = makeContextualSlogProcessor({ + 'chain-id': CHAIN_ID, + }); /** * @param {import('./context-aware-slog.js').Slog} slog diff --git a/packages/telemetry/src/context-aware-slog-persistent-util.js b/packages/telemetry/src/context-aware-slog-persistent-util.js deleted file mode 100644 index a32e26a33ea..00000000000 --- a/packages/telemetry/src/context-aware-slog-persistent-util.js +++ /dev/null @@ -1,39 +0,0 @@ -import { readFileSync, writeFileSync } from 'fs'; -import { serializeSlogObj } from './serialize-slog-obj.js'; - -export const DEFAULT_CONTEXT_FILE = 'slog-context.json'; -const FILE_ENCODING = 'utf8'; - -/** - * @param {string} filePath - */ -const getContextFilePersistenceUtils = filePath => { - console.warn(`Using file ${filePath} for slogger context`); - - return { - /** - * @param {import('./context-aware-slog.js').Context} context - */ - persistContext: context => { - try { - writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING); - } catch (err) { - console.error('Error writing context to file: ', err); - } - }, - - /** - * @returns {import('./context-aware-slog.js').Context | null} - */ - restoreContext: () => { - try { - return JSON.parse(readFileSync(filePath, FILE_ENCODING)); - } catch (parseErr) { - console.error('Error reading context from file: ', parseErr); - return null; - } - }, - }; -}; - -export default getContextFilePersistenceUtils; diff --git a/packages/telemetry/src/context-aware-slog.js b/packages/telemetry/src/context-aware-slog.js index f15878e6d20..ee17acf3dc0 100644 --- a/packages/telemetry/src/context-aware-slog.js +++ b/packages/telemetry/src/context-aware-slog.js @@ -49,7 +49,7 @@ * }} Slog */ -export const SLOG_TYPES = { +const SLOG_TYPES = { CLIST: 'clist', CONSOLE: 'console', COSMIC_SWINGSET: { @@ -60,8 +60,8 @@ export const SLOG_TYPES = { START: 'cosmic-swingset-bootstrap-block-start', }, COMMIT: { - FINISH: 'cosmic-swingset-commit-block-finish', - START: 'cosmic-swingset-commit-block-start', + FINISH: 'cosmic-swingset-commit-finish', + START: 'cosmic-swingset-commit-start', }, END_BLOCK: { FINISH: 'cosmic-swingset-end-block-finish', diff --git a/packages/telemetry/src/otel-context-aware-slog.js b/packages/telemetry/src/otel-context-aware-slog.js index 9145d036437..271c13da787 100644 --- a/packages/telemetry/src/otel-context-aware-slog.js +++ b/packages/telemetry/src/otel-context-aware-slog.js @@ -1,5 +1,4 @@ /* eslint-env node */ - import { logs, SeverityNumber } from '@opentelemetry/api-logs'; import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http'; import { Resource } from '@opentelemetry/resources'; @@ -7,13 +6,46 @@ import { LoggerProvider, SimpleLogRecordProcessor, } from '@opentelemetry/sdk-logs'; +import { readFileSync, writeFileSync } from 'fs'; import { makeContextualSlogProcessor } from './context-aware-slog.js'; -import getContextFilePersistenceUtils, { - DEFAULT_CONTEXT_FILE, -} from './context-aware-slog-persistent-util.js'; import { getResourceAttributes } from './index.js'; import { serializeSlogObj } from './serialize-slog-obj.js'; +const DEFAULT_CONTEXT_FILE = 'slog-context.json'; +const FILE_ENCODING = 'utf8'; + +/** + * @param {string} filePath + */ +export const getContextFilePersistenceUtils = filePath => { + console.warn(`Using file ${filePath} for slogger context`); + + return { + /** + * @param {import('./context-aware-slog.js').Context} context + */ + persistContext: context => { + try { + writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING); + } catch (err) { + console.error('Error writing context to file: ', err); + } + }, + + /** + * @returns {import('./context-aware-slog.js').Context | null} + */ + restoreContext: () => { + try { + return JSON.parse(readFileSync(filePath, FILE_ENCODING)); + } catch (parseErr) { + console.error('Error reading context from file: ', parseErr); + return null; + } + }, + }; +}; + /** * @param {import('./index.js').MakeSlogSenderOptions} options */ From 867a4435c74a1dc7399266e5f75ef79d3e5df704 Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Mon, 10 Feb 2025 19:48:15 +0500 Subject: [PATCH 7/8] oops --- packages/telemetry/src/context-aware-slog.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/telemetry/src/context-aware-slog.js b/packages/telemetry/src/context-aware-slog.js index ee17acf3dc0..0575862fa4b 100644 --- a/packages/telemetry/src/context-aware-slog.js +++ b/packages/telemetry/src/context-aware-slog.js @@ -49,7 +49,7 @@ * }} Slog */ -const SLOG_TYPES = { +export const SLOG_TYPES = { CLIST: 'clist', CONSOLE: 'console', COSMIC_SWINGSET: { From 1660e9bb126e12c890ba8ca0a432656d0f5295be Mon Sep 17 00:00:00 2001 From: usmanmani1122 Date: Tue, 11 Feb 2025 19:31:21 +0000 Subject: [PATCH 8/8] compression support --- packages/telemetry/src/block-slog.js | 104 ++++++++++--------- packages/telemetry/src/context-aware-slog.js | 8 +- 2 files changed, 60 insertions(+), 52 deletions(-) diff --git a/packages/telemetry/src/block-slog.js b/packages/telemetry/src/block-slog.js index 2efa38f5233..b23e8fbc26a 100644 --- a/packages/telemetry/src/block-slog.js +++ b/packages/telemetry/src/block-slog.js @@ -1,6 +1,7 @@ /* eslint-env node */ import { open } from 'node:fs/promises'; +import { createGzip } from 'node:zlib'; import { SLOG_TYPES } from './context-aware-slog.js'; import { serializeSlogObj } from './serialize-slog-obj.js'; @@ -26,53 +27,50 @@ export const makeSlogSender = async options => { /** * @type {import('node:fs').WriteStream | null} */ + let currentFileStream = null; + /** + * @type {import('node:zlib').Gzip | null} + */ let currentStream = null; /** * @param {Array<() => Promise>} promises */ - const chainPromises = (...promises) => - // eslint-disable-next-line github/array-foreach - promises.forEach( - promise => (chainedPromises = chainedPromises.then(promise)), - ); - - const closeStream = async () => { - if (currentStream) - return new Promise(resolve => - currentStream?.close(err => { - if (err) console.error("Couldn't close stream due to error: ", err); - resolve(undefined); - }), - ) - .then(() => { - currentStream = null; - }) - .then(() => currentFileHandle?.close()) - .then(() => { - currentFileHandle = null; - }); - else { - console.error('No stream to close'); - return Promise.resolve(); - } + const chainPromises = (...promises) => { + for (const promise of promises) + chainedPromises = chainedPromises.then(promise); }; + const closeStream = () => + currentStream + ? chainPromises( + () => + new Promise(resolve => + // @ts-expect-error + currentStream.end(() => currentStream.once('finish', resolve)), + ), + () => + // @ts-expect-error + new Promise(resolve => currentFileStream.once('finish', resolve)), + async () => { + currentStream = null; + currentFileStream = null; + currentFileHandle = null; + }, + ) + : console.error('No stream to close'); + /** * @param {string} fileName */ - const createFileStream = async fileName => { + const createStream = async fileName => { if (currentStream) throw Error('Stream already open'); - const filePath = `${options.stateDir || CONTEXTUAL_BLOCK_SLOGS}/slogfile_${fileName}.jsonl`; + const filePath = `${options.stateDir || CONTEXTUAL_BLOCK_SLOGS}/slogfile_${fileName}.gz`; currentFileHandle = await open(filePath, 'w'); - currentStream = currentFileHandle.createWriteStream({ - autoClose: true, - encoding: 'utf-8', - }); - - if (!currentStream) - throw Error(`Couldn't create a write stream on file "${filePath}"`); + currentFileStream = currentFileHandle.createWriteStream(); + currentStream = createGzip(); + currentStream.pipe(currentFileStream); }; /** @@ -83,18 +81,20 @@ export const makeSlogSender = async options => { switch (slogType) { case SLOG_TYPES.KERNEL.INIT.START: { - chainPromises(() => createFileStream(`init_${new Date().getTime()}`)); + chainPromises(() => createStream(`init_${new Date().getTime()}`)); break; } case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.START: { - chainPromises(() => - createFileStream(`bootstrap_${new Date().getTime()}`), - ); + chainPromises(() => createStream(`bootstrap_${new Date().getTime()}`)); + break; + } + case SLOG_TYPES.COSMIC_SWINGSET.UPGRADE.START: { + chainPromises(() => createStream(`upgrade_${new Date().getTime()}`)); break; } case SLOG_TYPES.COSMIC_SWINGSET.BEGIN_BLOCK: { chainPromises(() => - createFileStream(`block_${blockHeight}_${new Date().getTime()}`), + createStream(`block_${blockHeight}_${new Date().getTime()}`), ); break; } @@ -108,8 +108,9 @@ export const makeSlogSender = async options => { switch (slogType) { case SLOG_TYPES.KERNEL.INIT.FINISH: case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.FINISH: + case SLOG_TYPES.COSMIC_SWINGSET.UPGRADE.FINISH: case SLOG_TYPES.COSMIC_SWINGSET.AFTER_COMMIT_STATS: { - chainPromises(closeStream); + closeStream(); break; } default: { @@ -131,19 +132,22 @@ export const makeSlogSender = async options => { // eslint-disable-next-line prefer-template const message = serializeSlogObj(slog) + '\n'; - const wrote = currentStream.write(message); - if (!wrote) { - console.warn('Stream full, waiting for drain'); - currentStream.once('drain', () => { - currentStream?.write(message); - resolve(); - }); - } else resolve(); + if (!currentStream.write(message)) currentStream.once('drain', resolve); + else resolve(); } }); return Object.assign(slogSender, { - forceFlush: () => chainedPromises, - shutdown: closeStream, + forceFlush: () => + chainedPromises.then( + () => + /** @type {Promise} */ ( + new Promise(resolve => currentStream?.flush(resolve)) + ), + ), + shutdown: async () => { + closeStream(); + await chainedPromises; + }, }); }; diff --git a/packages/telemetry/src/context-aware-slog.js b/packages/telemetry/src/context-aware-slog.js index 0575862fa4b..1f4b1cef4b4 100644 --- a/packages/telemetry/src/context-aware-slog.js +++ b/packages/telemetry/src/context-aware-slog.js @@ -60,8 +60,8 @@ export const SLOG_TYPES = { START: 'cosmic-swingset-bootstrap-block-start', }, COMMIT: { - FINISH: 'cosmic-swingset-commit-finish', - START: 'cosmic-swingset-commit-start', + FINISH: 'cosmic-swingset-commit-block-finish', + START: 'cosmic-swingset-commit-block-start', }, END_BLOCK: { FINISH: 'cosmic-swingset-end-block-finish', @@ -72,6 +72,10 @@ export const SLOG_TYPES = { FINISH: 'cosmic-swingset-run-finish', START: 'cosmic-swingset-run-start', }, + UPGRADE: { + FINISH: 'cosmic-swingset-upgrade-finish', + START: 'cosmic-swingset-upgrade-start', + }, }, COSMIC_SWINGSET_TRIGGERS: { BRIDGE_INBOUND: 'cosmic-swingset-bridge-inbound',