Skip to content

Commit

Permalink
feat: new grpc call for subscribring alerts such as low balance (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsercano committed Nov 9, 2020
1 parent 74a59dc commit c7cc394
Show file tree
Hide file tree
Showing 16 changed files with 1,131 additions and 339 deletions.
41 changes: 41 additions & 0 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions lib/cli/commands/subscribealerts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { ServiceError, status } from 'grpc';
import { Arguments } from 'yargs';
import { XudClient } from '../../proto/xudrpc_grpc_pb';
import * as xudrpc from '../../proto/xudrpc_pb';
import { setTimeoutPromise } from '../../utils/utils';
import { loadXudClient } from '../command';
import { AlertType } from '../../constants/enums';

export const command = 'subscribealerts';

export const describe = 'subscribe alerts such as low balance';

export const builder = {};

export const handler = async (argv: Arguments) => {
await ensureConnection(argv, true);
};

let client: XudClient;

const ensureConnection = async (argv: Arguments, printError?: boolean) => {
if (!client) {
client = await loadXudClient(argv);
}
client.waitForReady(Date.now() + 3000, (error: Error | null) => {
if (error) {
if (error.message === 'Failed to connect before the deadline') {
console.error(`could not connect to xud at ${argv.rpchost}:${argv.rpcport}, is xud running?`);
process.exit(1);
}

if (printError) console.error(`${error.name}: ${error.message}`);
setTimeout(ensureConnection.bind(undefined, argv, printError), 3000);
} else {
console.log('Successfully connected, subscribing for alerts');
subscribeAlerts(argv);
}
});
};

const subscribeAlerts = (argv: Arguments<any>) => {
const request = new xudrpc.SubscribeAlertsRequest();
const alertsSubscription = client.subscribeAlerts(request);

alertsSubscription.on('data', (alert: xudrpc.Alert) => {
console.log(`${AlertType[alert.getType()]}: ${alert.getMessage()}`);
});
alertsSubscription.on('end', reconnect.bind(undefined, argv));
alertsSubscription.on('error', async (err: ServiceError) => {
if (err.code === status.UNIMPLEMENTED) {
console.error("xud is locked, run 'xucli unlock', 'xucli create', or 'xucli restore' then try again");
process.exit(1);
}
console.warn(`Unexpected error occured: ${err.message}, reconnecting in 1 second`);
await setTimeoutPromise(1000);
await ensureConnection(argv);
});
};

const reconnect = async (argv: Arguments) => {
console.log('Stream has closed, trying to reconnect');
await ensureConnection(argv, false);
};
8 changes: 8 additions & 0 deletions lib/connextclient/ConnextClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ interface ConnextClient {
on(event: 'htlcAccepted', listener: (rHash: string, amount: number, currency: string) => void): this;
on(event: 'connectionVerified', listener: (swapClientInfo: SwapClientInfo) => void): this;
on(event: 'depositConfirmed', listener: (hash: string) => void): this;
on(event: 'noBalance', listener: (currency: string) => void): this;
once(event: 'initialized', listener: () => void): this;
emit(event: 'htlcAccepted', rHash: string, amount: number, currency: string): boolean;
emit(event: 'connectionVerified', swapClientInfo: SwapClientInfo): boolean;
emit(event: 'initialized'): boolean;
emit(event: 'preimage', preimageRequest: ProvidePreimageEvent): void;
emit(event: 'transferReceived', transferReceivedRequest: TransferReceivedEvent): void;
emit(event: 'depositConfirmed', hash: string): void;
emit(event: 'noBalance', currency: string): boolean;
}

/**
Expand Down Expand Up @@ -332,6 +334,12 @@ class ConnextClient extends SwapClient {
channelBalancePromises.push(this.channelBalance(currency));
}
await Promise.all(channelBalancePromises);

this.outboundAmounts.forEach((amount, currency) => {
if (amount === 0) {
this.emit('noBalance', currency);
}
});
} catch (e) {
this.logger.error('failed to update total outbound capacity', e);
}
Expand Down
4 changes: 4 additions & 0 deletions lib/constants/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,7 @@ export enum DisconnectionReason {
AuthFailureInvalidSignature = 12,
WireProtocolErr = 13,
}

export enum AlertType {
NoBalance = 0,
}
18 changes: 18 additions & 0 deletions lib/grpc/GrpcService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,24 @@ class GrpcService {
}
}

/*
* See [[Service.subscribeAlerts]]
*/
public subscribeAlerts: grpc.handleServerStreamingCall<xudrpc.SubscribeAlertsRequest, xudrpc.Alert> = (call) => {
if (!this.isReady(this.service, call)) {
return;
}

const cancelled$ = getCancelled$(call);
this.service.subscribeAlerts((type, message) => {
const alert = new xudrpc.Alert();
alert.setType(type as number);
alert.setMessage(message);
call.write(alert);
},
cancelled$);
}

/*
* See [[Service.subscribeOrders]]
*/
Expand Down
6 changes: 6 additions & 0 deletions lib/lndclient/LndClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ interface LndClient {
on(event: 'channelBackup', listener: (channelBackup: Uint8Array) => void): this;
on(event: 'channelBackupEnd', listener: () => void): this;
on(event: 'locked', listener: () => void): this;
on(event: 'noBalance', listener: (currency: string) => void): this;

once(event: 'initialized', listener: () => void): this;

Expand All @@ -32,6 +33,7 @@ interface LndClient {
emit(event: 'channelBackupEnd'): boolean;
emit(event: 'locked'): boolean;
emit(event: 'initialized'): boolean;
emit(event: 'noBalance', currency: string): boolean;
}

const MAXFEE = 0.03;
Expand Down Expand Up @@ -233,6 +235,10 @@ class LndClient extends SwapClient {
await this.channelBalance().catch(async (err) => {
this.logger.error('failed to update total outbound capacity', err);
});

if (this.maxChannelOutboundAmount === 0) {
this.emit('noBalance', this.currency);
}
}

private unaryCall = <T, U>(methodName: Exclude<keyof LightningClient, ClientMethods>, params: T): Promise<U> => {
Expand Down
2 changes: 1 addition & 1 deletion lib/proto/annotations_grpc_pb.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/proto/xudp2p_grpc_pb.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 49 additions & 0 deletions lib/proto/xudrpc.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions lib/proto/xudrpc_grpc_pb.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c7cc394

Please sign in to comment.