Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry: slog sender with block chunking #10710

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
30 changes: 16 additions & 14 deletions packages/internal/src/node/fs-stream.js
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what motivates the changes here, but if we're gonna change this, I'm wondering if it might not be better to use the "new" filehandle.createWriteStream API that exists since v16.11.

Also depending on the reasoning for no longer auto-closing the handle, we likely want to to use the new flush option.

Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ const noPath = /** @type {import('fs').PathLike} */ (
/** @typedef {NonNullable<Awaited<ReturnType<typeof makeFsStreamWriter>>>} FsStreamWriter */
/** @param {string | undefined | null} filePath */
export const makeFsStreamWriter = async filePath => {
if (!filePath) {
return undefined;
}
if (!filePath) return undefined;

const handle = await (filePath !== '-' ? open(filePath, 'a') : undefined);
const handle = await (filePath === '-' ? undefined : open(filePath, 'a'));

const stream = handle
? createWriteStream(noPath, { fd: handle.fd })
? createWriteStream(noPath, { autoClose: false, fd: handle.fd })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set autoClose: false ? Afaik we don't sufficiently monitor stream error/finish to correctly call close on it.

: process.stdout;
await fsStreamReady(stream);

Expand Down Expand Up @@ -95,23 +93,27 @@ export const makeFsStreamWriter = async filePath => {

const flush = async () => {
await flushed;
await handle?.sync().catch(err => {
if (err.code === 'EINVAL') {
return;
}
throw err;
});
if (handle)
await handle.sync().catch(err => {
if (err.code === 'EINVAL') {
return;
}
throw err;
});
};

const close = async () => {
// TODO: Consider creating a single Error here to use a write rejection
closed = true;
await flush();
// @ts-expect-error calling a possibly missing method
stream.close?.();

if (handle) {
await new Promise(resolve => stream.end(() => resolve(null)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from close to end with autoClose: false will result in the stream not getting destroyed. Is that intended? I suppose we explicitly close the handle below.

await handle.close();
}
};

stream.on('error', err => updateFlushed(Promise.reject(err)));

return harden({ write, flush, close });
return harden({ close, filePath, flush, write });
};
135 changes: 135 additions & 0 deletions packages/telemetry/src/block-slog.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/* eslint-env node */

import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js';
import {
makeContextualSlogProcessor,
SLOG_TYPES,
} from './context-aware-slog.js';
import getContextFilePersistenceUtils, {
DEFAULT_CONTEXT_FILE,
} from './context-aware-slog-persistent-util.js';
import { serializeSlogObj } from './serialize-slog-obj.js';

/**
* @typedef {import('./context-aware-slog.js').Slog} Slog
* @typedef {ReturnType<ReturnType<typeof makeContextualSlogProcessor>>} ContextSlog
*/

/**
* @param {import('./index.js').MakeSlogSenderOptions} options
*/
export const makeSlogSender = async options => {
const { CHAIN_ID, CONTEXTUAL_BLOCK_SLOGS } = options.env || {};
if (!(options.stateDir || CONTEXTUAL_BLOCK_SLOGS))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If both are needed, this condition isn't correct

Suggested change
if (!(options.stateDir || CONTEXTUAL_BLOCK_SLOGS))
if (!options.stateDir || !CONTEXTUAL_BLOCK_SLOGS)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, atleast one of them is needed

return console.error(
'Ignoring invocation of slogger "block-slog" without the presence of "stateDir" or "CONTEXTUAL_BLOCK_SLOGS"',
);

const persistenceUtils = getContextFilePersistenceUtils(
process.env.SLOG_CONTEXT_FILE_PATH ||
`${options.stateDir}/${DEFAULT_CONTEXT_FILE}`,
);

const contextualSlogProcessor = makeContextualSlogProcessor(
{ 'chain-id': CHAIN_ID },
persistenceUtils,
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using a contextual slog? I'm not sure we care about contextualizing, as the goal of this tool is mostly for archiving, not as much for querying. Most tools we have currently work against original slogs events.

/**
* @type {ReturnType<typeof createFileStream> | null}
*/
let createFileStreamPromise = null;
/**
* @type {Awaited<ReturnType<typeof makeFsStreamWriter>> | null}
*/
let currentStream = null;

/**
* Immediately frees the `currentStream` assignment and lazily closes the open file stream
*/
const closeStream = () => {
if (currentStream) {
const streamClosePromise = currentStream.close();
currentStream = null;
return streamClosePromise;
} else console.error('No stream to close');
};

/**
* @param {string} fileName
*/
const createFileStream = async fileName => {
if (currentStream)
throw Error(`Stream already open on file ${currentStream.filePath}`);

const filePath = `${options.stateDir || CONTEXTUAL_BLOCK_SLOGS}/slogfile_${fileName}.jsonl`;
currentStream = await makeFsStreamWriter(filePath);

if (!currentStream)
throw Error(`Couldn't create a write stream on file "${filePath}"`);
};

/**
* @param {import('./context-aware-slog.js').Slog} slog
*/
const slogSender = async slog => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slog senders cannot be async. Please use an internal queue if an async implementation is actually needed (and make sure that forceFlush respects this queue. The flight recorder has a trivial example

I'm also not a fan of all the console.error logging on errors. I think we need to let errors go back up to the result promise, and for flush to be able to report an aggregation of all errors. An error happening for one event should not prevent us from attempting to write another event.

await new Promise(resolve => resolve(null));

const { blockHeight, type: slogType } = slog;

switch (slogType) {
case SLOG_TYPES.KERNEL.INIT.START: {
createFileStreamPromise = createFileStream(
`init_${new Date().getTime()}`,
);
break;
}
case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.START: {
createFileStreamPromise = createFileStream(
`bootstrap_${new Date().getTime()}`,
);
break;
}
case SLOG_TYPES.COSMIC_SWINGSET.BEGIN_BLOCK: {
createFileStreamPromise = createFileStream(
`block_${blockHeight}_${new Date().getTime()}`,
);
break;
}
default: {
break;
}
}

const contextualSlog = contextualSlogProcessor(slog);

if (createFileStreamPromise) await createFileStreamPromise;
createFileStreamPromise = null;

writeSlogToStream(contextualSlog)?.catch(console.error);

switch (slogType) {
case SLOG_TYPES.KERNEL.INIT.FINISH:
case SLOG_TYPES.COSMIC_SWINGSET.BOOTSTRAP_BLOCK.FINISH:
case SLOG_TYPES.COSMIC_SWINGSET.AFTER_COMMIT_STATS: {
closeStream()?.catch(console.error);
break;
}
default: {
break;
}
}
};

/**
* @param {ReturnType<contextualSlogProcessor>} slog
*/
const writeSlogToStream = slog =>
!currentStream
? console.error(`No stream available for slog type "${slog.body.type}"`) // eslint-disable-next-line prefer-template
: currentStream.write(serializeSlogObj(slog) + '\n');

return Object.assign(slogSender, {
forceFlush: () => currentStream?.flush(),
shutdown: closeStream,
});
};
17 changes: 14 additions & 3 deletions packages/telemetry/src/context-aware-slog-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import { makeFsStreamWriter } from '@agoric/internal/src/node/fs-stream.js';
import { makeContextualSlogProcessor } from './context-aware-slog.js';
import getContextFilePersistenceUtils, {
DEFAULT_CONTEXT_FILE,
} from './context-aware-slog-persistent-util.js';
import { serializeSlogObj } from './serialize-slog-obj.js';

/**
Expand All @@ -21,9 +24,17 @@ export const makeSlogSender = async options => {
`Couldn't create a write stream on file "${CONTEXTUAL_SLOGFILE}"`,
);

const contextualSlogProcessor = makeContextualSlogProcessor({
'chain-id': CHAIN_ID,
});
const persistenceUtils = getContextFilePersistenceUtils(
process.env.SLOG_CONTEXT_FILE_PATH ||
`${options.stateDir}/${DEFAULT_CONTEXT_FILE}`,
);

const contextualSlogProcessor = makeContextualSlogProcessor(
{
'chain-id': CHAIN_ID,
},
persistenceUtils,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were we not passing the persistence utils for the file version? I totally missed that. Let's extract this fix in a separate PR.

);

/**
* @param {import('./context-aware-slog.js').Slog} slog
Expand Down
39 changes: 39 additions & 0 deletions packages/telemetry/src/context-aware-slog-persistent-util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { readFileSync, writeFileSync } from 'fs';
import { serializeSlogObj } from './serialize-slog-obj.js';

export const DEFAULT_CONTEXT_FILE = 'slog-context.json';
const FILE_ENCODING = 'utf8';

/**
* @param {string} filePath
*/
const getContextFilePersistenceUtils = filePath => {
console.warn(`Using file ${filePath} for slogger context`);

return {
/**
* @param {import('./context-aware-slog.js').Context} context
*/
persistContext: context => {
try {
writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING);
} catch (err) {
console.error('Error writing context to file: ', err);
}
},

/**
* @returns {import('./context-aware-slog.js').Context | null}
*/
restoreContext: () => {
try {
return JSON.parse(readFileSync(filePath, FILE_ENCODING));
} catch (parseErr) {
console.error('Error reading context from file: ', parseErr);
return null;
}
},
};
};

export default getContextFilePersistenceUtils;
6 changes: 3 additions & 3 deletions packages/telemetry/src/context-aware-slog.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* }} Slog
*/

const SLOG_TYPES = {
export const SLOG_TYPES = {
CLIST: 'clist',
CONSOLE: 'console',
COSMIC_SWINGSET: {
Expand All @@ -60,8 +60,8 @@ const SLOG_TYPES = {
START: 'cosmic-swingset-bootstrap-block-start',
},
COMMIT: {
FINISH: 'cosmic-swingset-commit-finish',
START: 'cosmic-swingset-commit-start',
FINISH: 'cosmic-swingset-commit-block-finish',
START: 'cosmic-swingset-commit-block-start',
},
END_BLOCK: {
FINISH: 'cosmic-swingset-end-block-finish',
Expand Down
40 changes: 4 additions & 36 deletions packages/telemetry/src/otel-context-aware-slog.js
Original file line number Diff line number Diff line change
@@ -1,51 +1,19 @@
/* eslint-env node */

import { logs, SeverityNumber } from '@opentelemetry/api-logs';
import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http';
import { Resource } from '@opentelemetry/resources';
import {
LoggerProvider,
SimpleLogRecordProcessor,
} from '@opentelemetry/sdk-logs';
import { readFileSync, writeFileSync } from 'fs';
import { makeContextualSlogProcessor } from './context-aware-slog.js';
import getContextFilePersistenceUtils, {
DEFAULT_CONTEXT_FILE,
} from './context-aware-slog-persistent-util.js';
import { getResourceAttributes } from './index.js';
import { serializeSlogObj } from './serialize-slog-obj.js';

const DEFAULT_CONTEXT_FILE = 'slog-context.json';
const FILE_ENCODING = 'utf8';

/**
* @param {string} filePath
*/
export const getContextFilePersistenceUtils = filePath => {
console.warn(`Using file ${filePath} for slogger context`);

return {
/**
* @param {import('./context-aware-slog.js').Context} context
*/
persistContext: context => {
try {
writeFileSync(filePath, serializeSlogObj(context), FILE_ENCODING);
} catch (err) {
console.error('Error writing context to file: ', err);
}
},

/**
* @returns {import('./context-aware-slog.js').Context | null}
*/
restoreContext: () => {
try {
return JSON.parse(readFileSync(filePath, FILE_ENCODING));
} catch (parseErr) {
console.error('Error reading context from file: ', parseErr);
return null;
}
},
};
};

/**
* @param {import('./index.js').MakeSlogSenderOptions} options
*/
Expand Down
Loading