forked from eladnava/fcm-v1-http2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
286 lines (237 loc) · 9.97 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
const async = require('async');
const http2 = require('http2');
const { google } = require('googleapis');
// Define default HTTP/2 multiplexing concurrency (max number of sessions and max number of concurrent streams per session)
let config = {}, fcmv1Api = 'https://fcm.googleapis.com', defaultMaxConcurrentConnections = 10, defaultMaxConcurrentStreamsAllowed = 100;
// Package constructor
function Client(options) {
// Set global client object
config = {
serviceAccount: options.serviceAccount,
maxConcurrentConnections: options.maxConcurrentConnections || defaultMaxConcurrentConnections,
maxConcurrentStreamsAllowed: options.maxConcurrentStreamsAllowed || defaultMaxConcurrentStreamsAllowed
};
// No service account?
if (!config.serviceAccount) {
throw new Error('Please provide the service account JSON configuration file.');
}
}
// Send a notification to multiple devices using HTTP/2 multiplexing
Client.prototype.sendMulticast = function sendMulticast(message, tokens) {
// Promisify method
return new Promise((resolve, reject) => {
// Calculate max devices per batch, and prepare batches array
let batchLimit = Math.ceil(tokens.length / config.maxConcurrentConnections), tokenBatches = [];
// Use just one batch/HTTP2 connection if batch limit is less than maxConcurrentStreamsAllowed
if (batchLimit <= config.maxConcurrentStreamsAllowed) {
batchLimit = config.maxConcurrentStreamsAllowed;
}
// Traverse tokens and split them up into batches of X devices each
for (let start = 0; start < tokens.length; start += batchLimit) {
tokenBatches.push(tokens.slice(start, start + batchLimit));
}
// Keep track of unregistered device tokens
let unregisteredTokens = [];
// Get Firebase project ID from service account credentials
let projectId = config.serviceAccount.project_id;
// Ensure we have a project ID
if (!projectId) {
return reject(new Error('Unable to determine Firebase Project ID from service account file.'));
}
// Get OAuth2 token
getAccessToken(config.serviceAccount).then((accessToken) => {
// Count batches to determine when all notifications have been sent
let done = 0;
// Send notification using HTTP/2 multiplexing
for (let tokenBatch of tokenBatches) {
// Send notification to current token batch
processBatch(message, tokenBatch, projectId, accessToken).then((unregisteredTokensList) => {
// Add unregistred tokens (if any)
if (unregisteredTokensList.length > 0)
unregisteredTokens.push(unregisteredTokensList);
// Done with this batch
done++;
// If all batches processed, resolve final promise with list of unregistred tokens
if (done === tokenBatches.length) {
resolve(unregisteredTokens);
}
}).catch((err) => {
// Reject promise with error
reject(err);
});
}
}).catch((err) => {
// Failed to generate OAuth2 token
// most likely due to invalid credentials provided
reject(err);
});
});
}
// Sends notifications to a batch of tokens using HTTP/2
function processBatch(message, devices, projectId, accessToken) {
// Promisify method
return new Promise((resolve, reject) => {
// Create an HTTP2 client and connect to FCM API
let client = http2.connect(fcmv1Api, {
peerMaxConcurrentStreams: config.maxConcurrentConnections
});
// Log connection errors
client.on('error', (err) => {
// Connection reset?
if (err.message.includes('ECONNRESET')) {
// Log temporary connection errors to console (retry mechanism inside sendRequest will take care of retrying)
return console.error('FCM HTTP2 Error', err);
}
// Throw connection error
reject(err);
});
// Log socket errors
client.on('socketError', (err) => {
reject(err);
});
// Keep track of unregistered device tokens
client.unregisteredTokens = [];
// Use async/eachLimit to iterate over device tokens
async.eachLimit(devices, config.maxConcurrentStreamsAllowed, (device, doneCallback) => {
// Create a HTTP/2 request per device token
sendRequest(client, device, message, projectId, accessToken, doneCallback, 0);
}, (err) => {
// All requests completed, close the HTTP2 client
client.close();
// Reject on error
if (err) {
return reject(err);
}
// Resolve the promise with list of unregistered tokens
resolve(client.unregisteredTokens);
});
});
}
// Sends a single notification over an existing HTTP/2 client
function sendRequest(client, device, message, projectId, accessToken, doneCallback, tries) {
// Create a HTTP/2 request per device token
let request = client.request({
':method': 'POST',
':scheme': 'https',
':path': `/v1/projects/${projectId}/messages:send`,
Authorization: `Bearer ${accessToken}`,
});
// Set encoding as UTF8
request.setEncoding('utf8');
// Clone the message object
let clonedMessage = Object.assign({}, message);
// Assign device token for the message
clonedMessage.token = device;
// Send the request body as stringified JSON
request.write(
JSON.stringify({
// validate_only: true, // Uncomment for dry run
message: clonedMessage
})
);
// Buffer response data
let data = '';
// Add each incoming chunk to response data
request.on('data', (chunk) => {
data += chunk;
});
// Keep track of whether we are already retrying this method invocation
let retrying = false;
// Define error handler
let errorHandler = function (err) {
// Retry up to 3 times
if (tries <= 3) {
// Avoid retrying twice for the same error
if (retrying) {
return;
}
// Keep track of whether we are already retrying in this context
retrying = true;
// If HTTP2 session destroyed, open a new one
if (client.destroyed) {
// Create new HTTP/2 session just for this failed device
return processBatch(message, [device], projectId, accessToken).finally(doneCallback);
}
// Retry request using same HTTP2 session in 10 seconds
return setTimeout(() => { sendRequest.apply(this, args) }, 10 * 1000);
}
// Log response data in error
err.data = data;
// Even if request failed, mark request as completed as we've already retried 3 times
doneCallback(err);
}
// Keep track of called args for retry mechanism
let args = arguments;
// Response received in full
request.on('end', () => {
try {
// Server-side error? (may be returned as HTML, so search text explicitly before try parsing from JSON)
if (data.toLowerCase().includes('server error')) {
return errorHandler(new Error('Internal Server Error'));
}
// Convert response body to JSON object
let response = JSON.parse(data);
// Extract status code from JSON response object
let statusCode = response.statusCode ?? response.status;
// Status code found?
if (statusCode) {
// Server-side error?
if (statusCode >= 500) {
// Retry request using same HTTP2 session in 10 seconds
return errorHandler(new Error(statusCode + 'Internal Server Error'));
}
}
// Error?
if (response.error) {
// App uninstall or invalid token?
if ((response.error.details && response.error.details[0].errorCode === 'UNREGISTERED') ||
(response.error.code === 400 && response.error.status === 'INVALID_ARGUMENT' && response.error.message.includes('not a valid FCM registration token'))) {
// Add to unregistered tokens list
client.unregisteredTokens.push(device);
}
else {
// Call async done callback with error
return doneCallback(response.error);
}
}
// Mark request as completed
doneCallback();
}
catch (err) {
// Invoke error handler with retry mechanism
errorHandler(err);
}
});
// Log request errors
request.on('error', (err) => {
// Invoke error handler with retry mechanism
errorHandler(err);
});
// Increment tries
tries++;
// Send the current request
request.end();
}
// OAuth2 access token generation method
function getAccessToken(serviceAccount) {
return new Promise((resolve, reject) => {
// Create JWT client with Firebase Messaging scope
let jwtClient = new google.auth.JWT(
serviceAccount.client_email,
null,
serviceAccount.private_key,
['https://www.googleapis.com/auth/firebase.messaging'],
null
);
// Request OAuth2 token
jwtClient.authorize((err, tokens) => {
// Reject on error
if (err)
return reject(err);
// Resolve promise with accss token
resolve(tokens.access_token);
});
});
}
// Expose the Client class
module.exports = Client;