-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.js
75 lines (67 loc) · 1.82 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
const Streamz = require('streamz');
const split = require('binary-split');
const DELIMITER = '\n\t\t\t\n\t\n\n\t\t\t\n\t\n';
const highWaterMark = 1;
// We need to explicitly put require on global to make it accessable
// within `new Function(..)`
global.require = require;
global.console.log = console.error;
const out = Streamz(null,{
highWaterMark,
flush: function(cb) {
this.push({ _ClusterStreamMessage: 'end' });
cb();
}
});
let fn;
function initialize(d) {
// Copy any passed arguments into the worker global
global.argv = d.argv;
worker.workerId = d.workerId;
for (let key in d.global)
global[key] = d.global[key];
[].concat(d.require || []).forEach(key => global[key] = require(key));
// Try parsing supplied function or load a module and set up pipe
try {
if (d.fn)
fn = (new Function(`return ${d.fn}`))();
else if (d.module)
fn = require(d.module);
else {
fn = true;
throw new Error('No `fn` or `module` defined');
}
if (d.isMap) {
let map = Streamz(fn,{concurrency:d.concurrency || 1});
map.emitEvent = e => worker.emitEvent(e);
worker.pipe(map).pipe(out);
} else {
fn(worker).pipe(out);
}
} catch(e) {
out.emit('error',e);
}
}
const worker = process.stdin
.pipe(split(DELIMITER),{highWaterMark})
.pipe(Streamz(d => {
d = JSON.parse(d);
return !fn ? initialize(d) : d;
},{highWaterMark}))
.on('error',e => out.emit('error',e));
out
.on('error',e => {
e = {
_ClusterStreamMessage: 'error',
message: e.message || e,
stack: e.stack,
error: true
};
process.stdout.write(JSON.stringify(e)+DELIMITER);
})
.pipe(Streamz(d => JSON.stringify(d)+DELIMITER),{highWaterMark})
.pipe(process.stdout);
worker.emitEvent = d => out.push({
_ClusterStreamMessage: 'event',
data: d
});