forked from senecajs/seneca-transport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransport.js
126 lines (110 loc) · 3.67 KB
/
transport.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/* Copyright (c) 2013-2015 Richard Rodger & other contributors, MIT License */
'use strict'
// Load modules
var _ = require('lodash')
var LruCache = require('lru-cache')
var Tcp = require('./lib/tcp')
var TransportUtil = require('./lib/transport-utils.js')
var Http = require('./lib/http')
// Declare internals
var internals = {
defaults: {
msgprefix: 'seneca_',
callmax: 111111,
msgidlen: 12,
warn: {
unknown_message_id: true,
invalid_kind: true,
invalid_origin: true,
no_message_id: true,
message_loop: true,
own_message: true
},
check: {
message_loop: true,
own_message: true
},
web: {
type: 'web',
port: 10101,
host: '0.0.0.0',
path: '/act',
protocol: 'http',
timeout: 5555,
max_listen_attempts: 11,
attempt_delay: 222
},
tcp: {
type: 'tcp',
host: '0.0.0.0',
port: 10201,
timeout: 5555
}
},
plugin: 'transport'
}
module.exports = function (options) {
var seneca = this
var settings = seneca.util.deepextend(internals.defaults, options)
var callmap = LruCache(settings.callmax)
var transportUtil = new TransportUtil({
callmap: callmap,
seneca: seneca,
options: settings
})
seneca.add({ role: internals.plugin, cmd: 'inflight' }, internals.inflight(callmap))
seneca.add({ role: internals.plugin, cmd: 'listen' }, internals.listen)
seneca.add({ role: internals.plugin, cmd: 'client' }, internals.client)
seneca.add({ role: internals.plugin, hook: 'listen', type: 'tcp' }, Tcp.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'tcp' }, Tcp.client(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'listen', type: 'web' }, Http.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'web' }, Http.client(settings, transportUtil))
// Aliases.
seneca.add({ role: internals.plugin, hook: 'listen', type: 'http' }, Http.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'http' }, Http.client(settings, transportUtil))
// Legacy API.
seneca.add({ role: internals.plugin, hook: 'listen', type: 'direct' }, Http.listen(settings, transportUtil))
seneca.add({ role: internals.plugin, hook: 'client', type: 'direct' }, Http.client(settings, transportUtil))
return {
name: internals.plugin,
exportmap: { utils: transportUtil },
options: settings
}
}
internals.inflight = function (callmap) {
return function (args, callback) {
var inflight = {}
callmap.forEach(function (val, key) {
inflight[key] = val
})
callback(null, inflight)
}
}
internals.listen = function (args, callback) {
var seneca = this
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'listen' })
var listen_args = seneca.util.clean(_.omit(config, 'cmd'))
var legacyError = internals.legacyError(seneca, listen_args.type)
if (legacyError) {
return callback(legacyError)
}
seneca.act(listen_args, callback)
}
internals.client = function (args, callback) {
var seneca = this
var config = _.extend({}, args.config, { role: internals.plugin, hook: 'client' })
var client_args = seneca.util.clean(_.omit(config, 'cmd'))
var legacyError = internals.legacyError(seneca, client_args.type)
if (legacyError) {
return callback(legacyError)
}
seneca.act(client_args, callback)
}
internals.legacyError = function (seneca, type) {
if (type === 'pubsub') {
return seneca.fail('plugin-needed', { name: 'seneca-redis-transport' })
}
if (type === 'queue') {
return seneca.fail('plugin-needed', { name: 'seneca-beanstalkd-transport' })
}
}