-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathindex.js
101 lines (90 loc) · 2.64 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
var EventEmitter = require('events').EventEmitter
var collect = require('collect-stream')
var through = require('through2')
var readonly = require('read-only-stream')
var makeView = require('kappa-view')
module.exports = List
function List (db, mapFn, opts) {
var events = new EventEmitter()
opts = opts || {}
return makeView(db, function (ldb) {
var idx = {
map: function (msgs, next) {
var allOps = []
var pending = msgs.length + 1
for (var i = 0; i < msgs.length; i++) {
var msg = msgs[i]
mapFn(msg, function (err, ops) {
if (!ops) ops = []
ops = ops.map(function (op) {
return {
type: 'put',
key: op,
value: msg.key + '@' + msg.seq
}
})
done(err, ops)
})
}
done(null, [])
function done (err, ops) {
if (err) {
pending = Infinity
return next(err)
}
allOps.push.apply(allOps, ops)
if (!--pending) ldb.batch(allOps, next)
}
},
indexed: function (msgs) {
for (var i = 0; i < msgs.length; i++) {
mapFn(msgs[i], function (err, ops) {
if (err) return
events.emit('insert', msgs[i])
})
}
},
api: {
read: function (core, opts, cb) {
if (typeof opts === 'function' && !cb) {
cb = opts
opts = {}
}
opts = opts || {}
var t = through.obj(function (entry, _, next) {
var id = entry.value
var feed = core._logs.feed(id.split('@')[0])
var seq = Number(id.split('@')[1])
feed.get(seq, function (err, value) {
if (err) return next(err)
next(null, {
key: feed.key.toString('hex'),
seq: seq,
value: value
})
})
})
core.ready(function () {
ldb.createReadStream(opts).pipe(t)
})
if (cb) collect(t, cb)
else return readonly(t)
},
onInsert: function (core, cb) {
events.on('insert', cb)
},
tail: function (core, size, fn) {
events.on('insert', function (msg) {
idx.api.read(core, {limit:size,reverse:true}, function (err, msgs) {
var found = msgs.filter(function (m) {
return msg.key === m.key && m.seq === m.seq
}).length > 0
if (found) fn(msgs.reverse())
})
})
}
}
}
return idx
})
}