Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test being added to an old epoch #94

Merged
merged 16 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 143 additions & 123 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { promisify } = require('util')
const pull = require('pull-stream')
const paraMap = require('pull-paramap')
const pullMany = require('pull-many')
const lodashGet = require('lodash.get')
const pullDefer = require('pull-defer')
const chunk = require('lodash.chunk')
const clarify = require('clarify-error')
const {
Expand Down Expand Up @@ -319,137 +319,159 @@ module.exports = {
}

function listMembers(groupId, opts = {}) {
return pull(
pull.values([0]),
pull.asyncMap((n, cb) => {
get(groupId, (err, group) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to get group info when listing members'))
const deferedSource = pullDefer.source()

if (group.excluded) {
return cb(
new Error("We're excluded from this group, can't list members")
)
} else {
const source = pull(
ssb.db.query(
where(
and(
isDecrypted('box2'),
type('group/add-member'),
groupRecp(groupId)
)
),
opts.live ? live({ old: true }) : null,
toPullStream()
),
pull.map((msg) => msg.value.content.recps.slice(1)),
pull.flatten(),
pull.unique()
get(groupId, (err, group) => {
// prettier-ignore
if (err) return deferedSource.abort(clarify(err, 'Failed to get group info when listing members'))
// prettier-ignore
if (group.excluded) return deferedSource.abort( new Error("We're excluded from this group, can't list members"))

const source = pull(
ssb.db.query(
where(
and(
isDecrypted('box2'),
type('group/add-member'),
groupRecp(groupId)
)
return cb(null, source)
}
})
}),
pull.flatten()
)
),
opts.live ? live({ old: true }) : null,
toPullStream()
),
pull.map((msg) => msg.value.content.recps.slice(1)),
pull.flatten(),
pull.unique()
)

deferedSource.resolve(source)
})

return deferedSource
}

function listInvites() {
return pull(
pull.values([0]), // dummy value used to kickstart the stream
pull.asyncMap((n, cb) => {
ssb.metafeeds.findOrCreate((err, myRoot) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to get root metafeed when listing invites'))
const deferedSource = pullDefer.source()

pull(
pullMany([
ssb.box2.listGroupIds(),
ssb.box2.listGroupIds({ excluded: true }),
]),
pull.flatten(),
pull.collect((err, groupIds) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to list group IDs when listing invites'))

const source = pull(
ssb.db.query(
where(and(isDecrypted('box2'), type('group/add-member'))),
toPullStream()
),
pull.filter((msg) =>
// it's an addition of us
lodashGet(msg, 'value.content.recps', []).includes(
myRoot.id
)
),
pull.filter(
(msg) =>
// we haven't already accepted the addition
!groupIds.includes(
lodashGet(msg, 'value.content.recps[0]')
)
),
pull.map((msg) => {
const key = Buffer.from(
lodashGet(msg, 'value.content.groupKey'),
'base64'
)
const scheme = keySchemes.private_group
return {
id: lodashGet(msg, 'value.content.recps[0]'),
writeKey: { key, scheme },
readKeys: [{ key, scheme }],
root: lodashGet(msg, 'value.content.root'),
}
})
)
getMyGroups((err, myGroups) => {
// prettier-ignore
if (err) return deferedSource.abort(clarify(err, 'Failed to list group IDs when listing invites'))

return cb(null, source)
})
)
})
}),
pull.flatten()
)
const source = pull(
// get all the groupIds we've heard of from invites
ssb.db.query(
where(and(isDecrypted('box2'), type('group/add-member'))),
toPullStream()
),
pull.filter((msg) => isAddMember(msg)),
pull.map((msg) => msg.value.content.recps[0]),
pull.unique(),

// drop those we're a part of already
pull.filter((groupId) => !myGroups.has(groupId)),

// gather all the data required for each group-invite
pull.asyncMap(getGroupInviteData)
)

deferedSource.resolve(source)
})

return deferedSource

// listInvites helpers

function getMyGroups(cb) {
const myGroups = new Set()

pull(
pullMany([
ssb.box2.listGroupIds(),
ssb.box2.listGroupIds({ excluded: true }),
]),
pull.flatten(),
pull.drain(
(groupId) => myGroups.add(groupId),
(err) => {
if (err) return cb(err)
return cb(null, myGroups)
}
)
)
}

function getGroupInviteData(groupId, cb) {
let root
const secrets = new Set()

pull(
ssb.db.query(
where(
and(
isDecrypted('box2'),
type('group/add-member'),
groupRecp(groupId)
)
),
toPullStream()
),
pull.filter((msg) => isAddMember(msg)),
pull.drain(
(msg) => {
root ||= msg.value.content.root
secrets.add(msg.value.content.groupKey)
},
(err) => {
if (err) return cb(err)

const readKeys = [...secrets].map((secret) => ({
key: Buffer.from(secret, 'base64'),
scheme: keySchemes.private_group,
}))
const invite = {
id: groupId,
root,
readKeys,
}
return cb(null, invite)
}
)
)
}
}

function acceptInvite(groupId, cb) {
if (cb === undefined) return promisify(acceptInvite)(groupId)

let foundInvite = false
pull(
listInvites(),
pull.filter((groupInfo) => groupInfo.id === groupId),
pull.filter((inviteInfo) => inviteInfo.id === groupId),
pull.take(1),
pull.drain(
(groupInfo) => {
foundInvite = true
ssb.box2.addGroupInfo(
groupInfo.id,
{
key: groupInfo.writeKey.key,
root: groupInfo.root,
},
(err) => {
pull.collect((err, inviteInfos) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to list invites when accepting an invite'))
// prettier-ignore
if (!inviteInfos.length) return cb(new Error("Didn't find invite for that group id"))

// TODO: which writeKey should be picked??
// this will essentially pick a random write key
const { id, root, readKeys } = inviteInfos[0]
pull(
pull.values(readKeys),
pull.asyncMap((readKey, cb) => {
ssb.box2.addGroupInfo(id, { key: readKey.key, root }, cb)
}),
pull.collect((err) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to add group info when accepting an invite'))
ssb.db.reindexEncrypted((err) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to add group info when accepting an invite'))
ssb.db.reindexEncrypted((err) => {
// prettier-ignore
if (err) cb(clarify(err, 'Failed to reindex encrypted messages when accepting an invite'))
else cb(null, groupInfo)
})
}
)
},
(err) => {
// prettier-ignore
if (err) return cb(clarify(err, 'Failed to list invites when accepting an invite'))
// prettier-ignore
if (!foundInvite) return cb(new Error("Didn't find invite for that group id"))
}
)
if (err) cb(clarify(err, 'Failed to reindex encrypted messages when accepting an invite'))
else cb(null, inviteInfos[0])
})
})
)
})
)
}

Expand Down Expand Up @@ -493,7 +515,6 @@ module.exports = {
// look for new epochs that we're added to
pull(
ssb.db.query(
// TODO: does this output new stuff if we accept an invite to an old epoch and then find additions to newer epochs?
Powersource marked this conversation as resolved.
Show resolved Hide resolved
where(and(isDecrypted('box2'), type('group/add-member'))),
live({ old: true }),
toPullStream()
Expand All @@ -507,15 +528,14 @@ module.exports = {
paraMap((msg, cb) => {
pull(
ssb.box2.listGroupIds(),
pull.filter((groupId) =>
groupId === msg.value.content.recps[0]
),
pull.take(1),
pull.collect((err, groupIds) => {
// prettier-ignore
if (err) return cb(clarify(err, "Error getting groups we're already in when looking for new epochs"))

if (groupIds.includes(msg.value.content.recps[0])) {
return cb(null, msg)
} else {
return cb()
}
cb(null, groupIds.length ? msg : null)
})
)
}, 4),
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@
"envelope-spec": "^1.1.1",
"fast-deep-equal": "^3.1.3",
"is-canonical-base64": "^1.1.1",
"jitdb": "^7.0.7",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had to add this as tests were failing when i pulled in master because of missing jitdb/operators dep. but when installing this i got

% pnpm add jitdb
Packages: +1
+
Progress: resolved 521, reused 521, downloaded 0, added 1, done

dependencies:
+ jitdb 7.0.7

 WARN  Issues with peer dependencies found
.
└─┬ jitdb 7.0.7
  └── ✕ missing peer async-append-only-log@^4.3.2
Peer dependencies that should be installed:
  async-append-only-log@^4.3.2

Done in 5.9s

things seem to work though

@mixmix

"lodash.chunk": "^4.2.0",
"lodash.get": "^4.4.2",
"lodash.set": "^4.3.2",
"private-group-spec": "^6.0.0",
"pull-defer": "^0.2.3",
"pull-paramap": "^1.2.2",
"pull-stream": "^3.7.0",
"set.prototype.difference": "^1.0.2",
Expand Down
Loading