Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Monitoring follower #119

Merged
merged 14 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
shell: bash
- uses: actions/setup-go@v4
with:
go-version: '1.20'
go-version: '1.22'
- name: cache Go modules
uses: actions/cache@v3
id: go-cache
Expand Down
1 change: 1 addition & 0 deletions README-extended.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Currently the following options are available:
- `--stage.n.chain-only`: boolean flag specifying if the stage should only run the chain node and not start a client or loadgen. Defaults to `true` for first and last stage. Defaults to `false` for other stages, or if `--stage.n.loadgen.*` is specified.
- `--stage.n.save-storage`: boolean indicating if the storage of the chain node should be saved at the end of the stage. Defaults to `true` for non stage-only stages (where the loadgen runs), as well as for stage 0 (to capture local bootstrap).
- `--stage.n.duration`: the time in minutes for the stage duration. Defaults to the shared duration above for non chain-only stages, or 0 (immediate stop after start) otherwise. Use a negative value to run until interrupted.
- `--acceptance-integration-message-file`: start the chain in follower mode. This flag should hold the path to the file used for two way messaging between the validator node (e.g. an a3p chain running somewhere) and the follower node (this node). In this mode, this node will first use the state sync restoration, then it will catch up to the validator node and write a message `ready` to the message file and finally waits for the `stop` message at which point it exits.

### `start.sh` script

Expand Down
110 changes: 105 additions & 5 deletions runner/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import { makeTimeSource } from './helpers/time.js';

/** @typedef {import('./helpers/async.js').Task} Task */

const FILE_ENCODING = 'utf-8';

const pipeline = promisify(pipelineCallback);
const finished = promisify(finishedCallback);

Expand Down Expand Up @@ -162,10 +164,19 @@ const makeInterrupterKit = ({ console }) => {
* @returns {Promise<import('./tasks/types.js').SDKBinaries>}
*/
const getSDKBinaries = async () => {
const srcHelpers = 'agoric/src/helpers.js';
const helpersSource = 'src/helpers.js';
const srcHelpers = `agoric/${helpersSource}`;
const libHelpers = 'agoric/lib/helpers.js';
try {
const cliHelpers = await import(srcHelpers).catch(() => import(libHelpers));
const cliHelpers = await import(srcHelpers)
.catch(() => import(libHelpers))
.catch((e) =>
process.env.SDK_SRC
? import(
`${process.env.SDK_SRC}/packages/agoric-cli/${helpersSource}`
)
: Promise.reject(e),
);
Comment on lines +171 to +179
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because SDK_SRC is option someone has to enable, I would expect it to take precedence.

also I would suggest refactoring to,

Suggested change
const cliHelpers = await import(srcHelpers)
.catch(() => import(libHelpers))
.catch((e) =>
process.env.SDK_SRC
? import(
`${process.env.SDK_SRC}/packages/agoric-cli/${helpersSource}`
)
: Promise.reject(e),
);
const cliHelpers = await getCliHelpers();

It's a shame we have to deep import instead of having real exports of that package, but even if we added good exports we'd still need backwards compatibility so it would only add another case to this code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past I've thought of adding a command to the agoric-cli to return these path. However I don't think it would help in this case since agoric would not be on the PATH. I'm open to suggestion on how to find entrypoint paths in agoric-sdk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because SDK_SRC is option someone has to enable, I would expect it to take precedence.

Yeah you're right. I originally implemented it as a fallback, but it should take precedence.

return cliHelpers.getSDKBinaries();
} catch (err) {
// Older SDKs were only at lib
Expand Down Expand Up @@ -218,9 +229,14 @@ const main = async (progName, rawArgs, powers) => {
'duplicate-arguments-array': false,
'flatten-duplicate-arrays': false,
'greedy-arrays': true,
'strip-dashed': true,
},
});

/** @type {string | undefined} */
const acceptanceIntegrationMessageFile =
argv.acceptanceIntegrationMessageFile;

const { getProcessInfo, getCPUTimeOffset } = makeProcfsHelper({ fs, spawn });
const { dirDiskUsage, makeFIFO } = makeFsHelper({
fs,
Expand Down Expand Up @@ -268,6 +284,49 @@ const main = async (progName, rawArgs, powers) => {
};
};

/**
* @param {ReturnType<typeof makeConsole>['console']} console
* @param {RegExp} [regex]
*/
const waitForMessageFromMessageFile = async (console, regex) => {
if (!acceptanceIntegrationMessageFile)
throw Error("acceptance-integration-message-file flag wasn't set");

console.log(
`Starting to wait for message of format "${regex}" from file "${acceptanceIntegrationMessageFile}"`,
);

for await (const { eventType } of fs.watch(
acceptanceIntegrationMessageFile,
))
if (eventType === 'change') {
const fileContent = (
await fs.readFile(acceptanceIntegrationMessageFile, FILE_ENCODING)
).trim();
if (regex && !regex.test(fileContent))
console.warn('Ignoring unsupported file content: ', fileContent);
else return fileContent;
Comment on lines +306 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: curly braces.

}

return undefined;
};

/**
* @param {ReturnType<typeof makeConsole>['console']} console
* @param {string} message
*/
const writeMessageToMessageFile = (console, message) => {
if (!acceptanceIntegrationMessageFile)
throw Error("acceptance-integration-message-file flag wasn't set");

console.log(
`Writing message "${message}" to file "${acceptanceIntegrationMessageFile}"`,
);
return fs.writeFile(acceptanceIntegrationMessageFile, message, {
encoding: FILE_ENCODING,
});
};

const { console: topConsole } = makeConsole();

const outputDir = resolvePath(argv.outputDir || `results/run-${Date.now()}`);
Expand Down Expand Up @@ -344,6 +403,14 @@ const main = async (progName, rawArgs, powers) => {
import.meta.url,
),
)
.catch((e) =>
process.env.SDK_SRC
? importMetaResolve(
`${process.env.SDK_SRC}/packages/vm-config/${identifier}`,
import.meta.url,
)
: Promise.reject(e),
)
.catch(() => importMetaResolve(identifier, import.meta.url))
.catch(() => {})),
]),
Expand Down Expand Up @@ -472,6 +539,7 @@ const main = async (progName, rawArgs, powers) => {
* @param {boolean | undefined} [config.loadgenWindDown]
* @param {boolean} config.withMonitor
* @param {string | void} config.chainStorageLocation
* @param {string} [config.acceptanceIntegrationMessageFile]
*/
const runStage = async (config) => {
const {
Expand All @@ -482,6 +550,7 @@ const main = async (progName, rawArgs, powers) => {
loadgenWindDown,
withMonitor,
chainStorageLocation,
acceptanceIntegrationMessageFile: messageFilePath,
} = config;
currentStageTimeSource = timeSource.shift();

Expand All @@ -501,6 +570,37 @@ const main = async (progName, rawArgs, powers) => {
stageConfig: config,
});

/**
* @type {Task}
*
* This task should always run in three stages
*
* The first stage will wait for the state sync restore
*
* The second stage will wait for this follower to catch
* up and then signal to the tests runner that the
* follower has caught up to the chain (this catching up will
* be covered by the `spawnChain` task) and exit
*
* The third stage will keep running the follower and wait
* for a stop signal from the test runner at which point it
* will stop the chain process and exit
*/
const spwanAcceptance = async (nextStep) => {
const { console: acceptanceConsole } = makeConsole(
'acceptance',
out,
err,
);

if (currentStage === 1)
await writeMessageToMessageFile(acceptanceConsole, 'ready');
else if (currentStage === 2)
await waitForMessageFromMessageFile(acceptanceConsole, /stop/);

await nextStep(Promise.resolve());
};

/** @type {Task} */
const spawnChain = async (nextStep) => {
stageConsole.log('Running chain');
Expand Down Expand Up @@ -914,9 +1014,8 @@ const main = async (progName, rawArgs, powers) => {
tasks.push(spawnChain);
}

if (!chainOnly) {
tasks.push(spawnClient, spawnLoadgen);
}
if (!chainOnly) tasks.push(spawnClient, spawnLoadgen);
else if (messageFilePath) tasks.push(spwanAcceptance);

if (tasks.length === 1) {
throw new Error('Nothing to do');
Expand Down Expand Up @@ -1162,6 +1261,7 @@ const main = async (progName, rawArgs, powers) => {
loadgenWindDown,
withMonitor,
chainStorageLocation,
acceptanceIntegrationMessageFile,
}),
async (...stageError) => {
const suffix = `-stage-${currentStage}`;
Expand Down
26 changes: 10 additions & 16 deletions runner/lib/tasks/testnet.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,12 @@ export const makeTasks = ({
const configPath = joinPath(chainStateDir, 'config', 'config.toml');

console.log('Patching config');
const config = await TOML.parse.async(
await fs.readFile(configPath, 'utf-8'),
const config = /** @type {import("./types.js").CometBFTConfig} */ (
await TOML.parse.async(await fs.readFile(configPath, 'utf-8'))
);
const configP2p = /** @type {import('@iarna/toml').JsonMap} */ (
config.p2p
);
configP2p.persistent_peers = peers.join(',');
configP2p.seeds = seeds.join(',');
configP2p.addr_book_strict = false;
delete config.log_level;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this line removed?

config.p2p.persistent_peers = peers.join(',');
config.p2p.seeds = seeds.join(',');
config.p2p.addr_book_strict = false;

if (!useStateSync) {
console.log('Fetching genesis');
Expand Down Expand Up @@ -286,15 +282,13 @@ export const makeTasks = ({
const stateSyncRpc =
rpcAddrs.length < 2 ? [rpcAddrs[0], rpcAddrs[0]] : rpcAddrs;

const configStatesync = /** @type {import('@iarna/toml').JsonMap} */ (
config.statesync
);
configStatesync.enable = true;
configStatesync.rpc_servers = stateSyncRpc
config.statesync.enable = true;
config.statesync.rpc_servers = stateSyncRpc
.map((rpcAddr) => rpcAddrWithScheme(rpcAddr))
.join(',');
configStatesync.trust_height = trustHeight;
configStatesync.trust_hash = trustHash;
config.statesync.trust_height = trustHeight;
config.statesync.trust_hash = trustHash;
config.statesync.trust_period = '17280h0m0s'; // 2 years
}

await fs.writeFile(configPath, TOML.stringify(config));
Expand Down
120 changes: 120 additions & 0 deletions runner/lib/tasks/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,123 @@ export interface OrchestratorTasks {
runClient(options: TaskSwingSetOptions): Promise<RunClientResult>;
runLoadgen(options: TaskBaseOptions): Promise<RunLoadgenResult>;
}

/* eslint-disable camelcase */
export type CometBFTConfig = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, where is this coming from?

proxy_app: string;
moniker: string;
fast_sync: boolean;
db_backend: string;
db_dir: string;
log_format: string;
genesis_file: string;
priv_validator_key_file: string;
priv_validator_state_file: string;
priv_validator_laddr: string;
node_key_file: string;
abci: string;
filter_peers: boolean;
rpc: {
laddr: string;
cors_allowed_origins: string[];
cors_allowed_methods: string[];
cors_allowed_headers: string[];
grpc_laddr: string;
grpc_max_open_connections: number;
unsafe: boolean;
max_open_connections: number;
max_subscription_clients: number;
max_subscriptions_per_client: number;
experimental_subscription_buffer_size: number;
experimental_websocket_write_buffer_size: number;
experimental_close_on_slow_client: boolean;
timeout_broadcast_tx_commit: string;
max_body_bytes: number;
max_header_bytes: number;
tls_cert_file: string;
tls_key_file: string;
pprof_laddr: string;
};
p2p: {
laddr: string;
external_address: string;
seeds: string;
persistent_peers: string;
upnp: boolean;
addr_book_file: string;
addr_book_strict: boolean;
max_num_inbound_peers: number;
max_num_outbound_peers: number;
unconditional_peer_ids: string;
persistent_peers_max_dial_period: string;
flush_throttle_timeout: string;
max_packet_msg_payload_size: number;
send_rate: number;
recv_rate: number;
pex: boolean;
seed_mode: boolean;
private_peer_ids: string;
allow_duplicate_ip: boolean;
handshake_timeout: string;
dial_timeout: string;
};
mempool: {
version: string;
recheck: boolean;
broadcast: boolean;
wal_dir: string;
size: number;
max_txs_bytes: number;
cache_size: number;
keep_invalid_txs_in_cache: boolean;
max_tx_bytes: number;
max_batch_bytes: number;
ttl_duration: string;
ttl_num_blocks: number;
};
statesync: {
enable: boolean;
rpc_servers: string;
trust_height: number;
trust_hash: string;
trust_period: string;
discovery_time: string;
temp_dir: string;
chunk_request_timeout: string;
chunk_fetchers: string;
};
fastsync: {
version: string;
};
consensus: {
wal_file: string;
timeout_propose: string;
timeout_propose_delta: string;
timeout_prevote: string;
timeout_prevote_delta: string;
timeout_precommit: string;
timeout_precommit_delta: string;
timeout_commit: string;
double_sign_check_height: number;
skip_timeout_commit: boolean;
create_empty_blocks: boolean;
create_empty_blocks_interval: string;
peer_gossip_sleep_duration: string;
peer_query_maj23_sleep_duration: string;
};
storage: {
discard_abci_responses: boolean;
};
tx_index: {
indexer: string;
psql_conn: string;
index_all_keys: boolean;
};
instrumentation: {
prometheus: boolean;
prometheus_listen_addr: string;
max_open_connections: number;
namespace: string;
};
};
/* eslint-enable camelcase */
6 changes: 3 additions & 3 deletions runner/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
"typescript": "~4.7.4"
},
"dependencies": {
"@endo/init": "^0.5.53",
"@endo/promise-kit": "^0.2.53",
"@endo/init": "^1.1.8",
"@endo/promise-kit": "^1.1.9",
"@iarna/toml": "^2.2.3",
"anylogger": "^0.21.0",
"chalk": "^2.4.2",
"deterministic-json": "^1.0.5",
"inquirer": "^6.3.1",
"readline-transform": "^1.0.0",
"ses": "^0.15.1",
"ses": "^1.11.0",
"yargs-parser": "^20.2.2"
},
"keywords": [],
Expand Down
Loading
Loading