Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new grpc call for subscribring alerts such as low balance (#864) #1984

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 83 additions & 0 deletions lib/cli/commands/streamalerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { loadXudClient } from '../command';
import { AlertType, BalanceSide } from '../../constants/enums';
import { onStreamError, waitForClient } from '../utils';

export const command = 'streamalerts';

export const describe = 'stream/subscribe alerts such as low balance';
rsercano marked this conversation as resolved.
Show resolved Hide resolved

export const builder = (argv: Argv) => argv
.option('pretty', {
type: 'boolean',
})
.example('$0 streamalerts -j', 'prints alert payload in a JSON structure')
.example('$0 streamalerts', 'prints alert message only');

export const handler = async (argv: Arguments) => {
await ensureConnection(argv, true);
};

let client: XudClient;

const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}

waitForClient(client, argv, ensureConnection, streamalerts, printError);
};

const structAlertJson = (alertObject: xudrpc.Alert.AsObject) => {
const result: {type: string, payload: {
totalBalance?: number,
side?: string,
bound?: number,
sideBalance?: number,
channelPoint?: string,
currency?: string,
} | undefined } = {
type: AlertType[alertObject.type],
payload: undefined,
};

function getCommonBalanceAlertFields(payload?: xudrpc.ChannelBalanceAlert.AsObject | xudrpc.BalanceAlert.AsObject) {
return {
totalBalance: payload?.totalBalance,
side: BalanceSide[payload?.side || 0],
sideBalance: payload?.sideBalance,
bound: payload?.bound,
currency: payload?.currency,
};
}

if (alertObject.type === xudrpc.Alert.AlertType.LOW_TRADING_BALANCE) {
result.payload = getCommonBalanceAlertFields(alertObject.balanceAlert);
} else if (alertObject.type === xudrpc.Alert.AlertType.LOW_CHANNEL_BALANCE) {
result.payload = { ...getCommonBalanceAlertFields(alertObject.channelBalanceAlert), channelPoint: alertObject.channelBalanceAlert?.channelPoint };
}

return result;
};

const streamalerts = (argv: Arguments<any>) => {
const request = new xudrpc.SubscribeAlertsRequest();
const alertsSubscription = client.subscribeAlerts(request);

alertsSubscription.on('data', (alert: xudrpc.Alert) => {
if (argv.json) {
console.log(JSON.stringify(structAlertJson(alert.toObject()), undefined, 2));
} else {
console.log(`${AlertType[alert.getType()]}: ${alert.getMessage()}`);
}
});
alertsSubscription.on('end', reconnect.bind(undefined, argv));
alertsSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));
};

const reconnect = async (argv: Arguments) => {
console.log('Stream has closed, trying to reconnect');
await ensureConnection(argv, false);
};
27 changes: 3 additions & 24 deletions lib/cli/commands/streamorders.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { ServiceError, status } from 'grpc';
import { Arguments, Argv } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { setTimeoutPromise } from '../../utils/utils';
import { loadXudClient } from '../command';
import { onStreamError, waitForClient } from '../utils';

export const command = 'streamorders [existing]';

Expand All @@ -26,20 +25,8 @@ const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
if (error) {
if (error.message === 'Failed to connect before the deadline') {
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
process.exit(1);
}

if (printError) console.error(`${error.name}: ${error.message}`);
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
} else {
console.log('Successfully connected, subscribing for orders');
streamOrders(argv);
}
});
waitForClient(client, argv, ensureConnection, streamOrders, printError);
};

const streamOrders = (argv: Arguments<any>) => {
Expand All @@ -57,15 +44,7 @@ const streamOrders = (argv: Arguments<any>) => {
// adding end, close, error events only once,
// since they'll be thrown for three of subscriptions in the corresponding cases, catching once is enough.
ordersSubscription.on('end', reconnect.bind(undefined, argv));
ordersSubscription.on('error', async (err: ServiceError) => {
if (err.code === status.UNIMPLEMENTED) {
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
process.exit(1);
}
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
await setTimeoutPromise(1000);
await ensureConnection(argv);
});
ordersSubscription.on('error', onStreamError.bind(undefined, ensureConnection.bind(undefined, argv)));

const swapsRequest = new xudrpc.SubscribeSwapsRequest();
swapsRequest.setIncludeTaker(true);
Expand Down
31 changes: 31 additions & 0 deletions lib/cli/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import colors from 'colors/safe';
import { accessSync, watch } from 'fs';
import os from 'os';
import path from 'path';
import { XudClient } from '../proto/xudrpc_grpc_pb';
import { Arguments } from 'yargs';
import { ServiceError, status } from 'grpc';
import { setTimeoutPromise } from '../utils/utils';

const SATOSHIS_PER_COIN = 10 ** 8;

Expand Down Expand Up @@ -100,3 +104,30 @@ be recovered with it and must be backed up and recovered separately. Keep it \
somewhere safe, it is your ONLY backup in case of data loss.
`);
}

export const waitForClient = (client: XudClient, argv: Arguments, ensureConnection: Function, successCallback: Function, printError?: boolean) => {
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
if (error) {
if (error.message === 'Failed to connect before the deadline') {
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
process.exit(1);
}

if (printError) console.error(`${error.name}: ${error.message}`);
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
} else {
console.log('Successfully connected, streaming');
successCallback(argv);
}
});
};

export const onStreamError = async (ensureConnection: Function, err: ServiceError) => {
if (err.code === status.UNIMPLEMENTED) {
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
process.exit(1);
}
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
await setTimeoutPromise(1000);
await ensureConnection();
};
Loading