diff --git a/changelog.d/235.feature b/changelog.d/235.feature new file mode 100644 index 00000000..f8b4538b --- /dev/null +++ b/changelog.d/235.feature @@ -0,0 +1 @@ +Suppport puppeted reactions/redactions \ No newline at end of file diff --git a/changelog.d/236.bugfix b/changelog.d/236.bugfix new file mode 100644 index 00000000..48f121d8 --- /dev/null +++ b/changelog.d/236.bugfix @@ -0,0 +1 @@ +Remove option slack_user_token on link command \ No newline at end of file diff --git a/changelog.d/237.bugfix b/changelog.d/237.bugfix new file mode 100644 index 00000000..792faa34 --- /dev/null +++ b/changelog.d/237.bugfix @@ -0,0 +1 @@ +Messages from puppeted accounts are no longer duplicated over the bridge \ No newline at end of file diff --git a/changelog.d/238.bugfix b/changelog.d/238.bugfix new file mode 100644 index 00000000..96f21001 --- /dev/null +++ b/changelog.d/238.bugfix @@ -0,0 +1 @@ +Do not send messages to slack with no content \ No newline at end of file diff --git a/src/AdminCommands.ts b/src/AdminCommands.ts index 0462de80..c6b31023 100644 --- a/src/AdminCommands.ts +++ b/src/AdminCommands.ts @@ -198,9 +198,6 @@ export class AdminCommands { alias: "t", description: "Slack bot user token. Used with Slack bot user & Events api", }, - slack_user_token: { - description: "Slack user token. Used to bridge files", - }, webhook_url: { alias: "u", description: "Slack webhook URL. Used with Slack outgoing hooks integration", diff --git a/src/BridgedRoom.ts b/src/BridgedRoom.ts index 6547aa2c..ad079726 100644 --- a/src/BridgedRoom.ts +++ b/src/BridgedRoom.ts @@ -22,7 +22,7 @@ import { default as substitutions, getFallbackForMissingEmoji, ISlackToMatrixRes import * as emoji from "node-emoji"; import { ISlackMessageEvent, ISlackEvent } from "./BaseSlackHandler"; import { WebClient } from "@slack/web-api"; -import { TeamInfoResponse, AuthTestResponse, UsersInfoResponse, ChatUpdateResponse, +import { ChatUpdateResponse, ChatPostMessageResponse, ConversationsInfoResponse } from "./SlackResponses"; import { RoomEntry, EventEntry, TeamEntry } from "./datastore/Models"; @@ -47,6 +47,9 @@ interface ISlackChatMessagePayload extends ISlackToMatrixResult { icon_url?: string; } +const RECENT_MESSAGE_MAX = 10; +const PUPPET_INCOMING_DELAY_MS = 1500; + export class BridgedRoom { public get isDirty() { return this.dirty; @@ -142,6 +145,7 @@ export class BridgedRoom { private intent: Intent; // Is the matrix room in use by the bridge. public MatrixRoomActive: boolean; + private recentSlackMessages: string[] = []; /** * True if this instance has changed from the version last read/written to the RoomStore. @@ -251,15 +255,24 @@ export class BridgedRoom { emojiKeyName = emojiKeyName.substring(1, emojiKeyName.length - 1); } } + let client: WebClient = this.botClient; + const puppet = await this.main.clientFactory.getClientForUserWithId(this.SlackTeamId!, message.sender); + if (puppet) { + client = puppet.client; + // We must do this before sending to avoid racing + // Use the unicode key for uniqueness + this.addRecentSlackMessage(`reactadd:${relatesTo.key}:${puppet.id}:${event.slackTs}`); + } - // TODO: This only works once from matrix as we are sending the event as the + // TODO: This only works once from matrix if we are sending the event as the // bot user. - const res = await this.botClient.reactions.add({ + const res = await client.reactions.add({ as_user: false, channel: this.slackChannelId, name: emojiKeyName, timestamp: event.slackTs, }); + log.info(`Reaction :${emojiKeyName}: added to ${event.slackTs}`); if (!res.ok) { log.error("HTTP Error: ", res); @@ -280,7 +293,8 @@ export class BridgedRoom { return; } - const res = await this.botClient.chat.delete({ + const client = (await this.main.clientFactory.getClientForUser(this.SlackTeamId!, message.sender)) || this.botClient; + const res = await client.chat.delete({ as_user: false, channel: this.slackChannelId!, ts: event.slackTs, @@ -345,6 +359,11 @@ export class BridgedRoom { username: user.getDisplaynameForRoom(message.room_id) || matrixToSlackResult.username, }; + if (!body.attachments && !body.text) { + // The message type might not be understood. In any case, we can't send something without + // text. + return; + } const reply = await this.findParentReply(message); let parentStoredEvent: EventEntry | null = null; if (reply !== message.event_id) { @@ -388,6 +407,8 @@ export class BridgedRoom { channel: this.slackChannelId!, })) as ChatPostMessageResponse; + this.addRecentSlackMessage(res.ts); + this.main.incCounter(METRIC_SENT_MESSAGES, {side: "remote"}); if (!res.ok) { @@ -414,6 +435,19 @@ export class BridgedRoom { } public async onSlackMessage(message: ISlackMessageEvent, content?: Buffer) { + if (this.slackTeamId && message.user) { + // This just checks if the user *could* be puppeted. If they are, delay handling their incoming messages. + const hasPuppet = null !== await this.main.datastore.getPuppetTokenBySlackId(this.slackTeamId, message.user); + if (hasPuppet) { + await new Promise((r) => setTimeout(r, PUPPET_INCOMING_DELAY_MS)); + } + } + if (this.recentSlackMessages.includes(message.ts)) { + // We sent this, ignore. + return; + } + // Dedupe across RTM/Event streams + this.addRecentSlackMessage(message.ts); try { const ghost = await this.main.getGhostForSlackMessage(message, this.slackTeamId!); await ghost.update(message, this); @@ -429,18 +463,23 @@ export class BridgedRoom { if (message.user_id === this.team!.user_id) { return; } - const ghost = await this.main.getGhostForSlackMessage(message, teamId); - await ghost.update(message, this); const reaction = `:${message.reaction}:`; const reactionKey = emoji.emojify(reaction, getFallbackForMissingEmoji); + if (this.recentSlackMessages.includes(`reactadd:${reactionKey}:${message.user_id}:${message.item.ts}`)) { + // We sent this, ignore. + return; + } + const ghost = await this.main.getGhostForSlackMessage(message, teamId); + await ghost.update(message, this); + const event = await this.main.datastore.getEventBySlackId(message.item.channel, message.item.ts); if (event === null) { return; } - + log.debug(`Sending reaction ${reactionKey} for ${event.eventId} as ${ghost.userId}`); return ghost.sendReaction(this.MatrixRoomId, event.eventId, reactionKey, message.item.channel, message.event_ts); } @@ -743,6 +782,14 @@ export class BridgedRoom { this.intent = this.main.getIntent(firstGhost); return this.intent; } + + private addRecentSlackMessage(ts: string) { + log.debug("Recent message key add:", ts); + this.recentSlackMessages.push(ts); + if (this.recentSlackMessages.length > RECENT_MESSAGE_MAX) { + this.recentSlackMessages.shift(); + } + } } /** diff --git a/src/Main.ts b/src/Main.ts index 8ba4b86e..8c477b74 100644 --- a/src/Main.ts +++ b/src/Main.ts @@ -782,11 +782,10 @@ export class Main { const joinedRooms = await roomListPromise; await Promise.all(entries.map(async (entry) => { // If we aren't in the room, mark as inactive until we get re-invited. - const activeRoom = joinedRooms.includes(entry.matrix_id); + const activeRoom = entry.remote.puppet_owner !== undefined || joinedRooms.includes(entry.matrix_id); if (!activeRoom) { log.warn(`${entry.matrix_id} marked as inactive, bot is not joined to room`); } - const teamId = entry.remote.slack_team_id; const teamEntry = teamId ? await this.datastore.getTeam(teamId) || undefined : undefined; let slackClient: WebClient|null = null; diff --git a/src/SlackClientFactory.ts b/src/SlackClientFactory.ts index 5a86069c..61075ff4 100644 --- a/src/SlackClientFactory.ts +++ b/src/SlackClientFactory.ts @@ -17,8 +17,8 @@ interface RequiredConfigOptions { export class SlackClientFactory { private teamClients: Map = new Map(); - private puppets: Map = new Map(); - constructor(private datastore: Datastore, private config: RequiredConfigOptions, private onRemoteCall: (method: string) => void) { + private puppets: Map = new Map(); + constructor(private datastore: Datastore, private config?: RequiredConfigOptions, private onRemoteCall?: (method: string) => void) { } @@ -119,7 +119,7 @@ export class SlackClientFactory { return teamRes!.id; } - public async getClientForUser(teamId: string, matrixUser: string): Promise { + public async getClientForUserWithId(teamId: string, matrixUser: string): Promise<{client: WebClient, id: string}|null> { const key = `${teamId}:${matrixUser}`; if (this.puppets.has(key)) { return this.puppets.get(key) || null; @@ -129,25 +129,34 @@ export class SlackClientFactory { return null; } const client = new WebClient(token); + let id: string; try { - await client.auth.test(); + const res = (await client.auth.test()) as AuthTestResponse; + id = res.user_id; } catch (ex) { log.warn("Failed to auth puppeted client for user:", ex); return null; } - this.puppets.set(key, client); - return client; + this.puppets.set(key, {id, client}); + return {id, client}; + } + + public async getClientForUser(teamId: string, matrixUser: string): Promise { + const res = await this.getClientForUserWithId(teamId, matrixUser); + return res !== null ? res.client : null; } private async createTeamClient(token: string) { - const opts = this.config.slack_client_opts; + const opts = this.config ? this.config.slack_client_opts : undefined; const slackClient = new WebClient(token, { logger: { setLevel: () => {}, // We don't care about these. setName: () => {}, debug: (msg: string) => { // non-ideal way to detect calls to slack. - const match = /apiCall\('([\w\.]+)'\) start/.exec(msg); + webLog.debug.bind(webLog); + if (!this.onRemoteCall) { return; } + const match = /apiCall\('([\w\.]+)'\) start/.exec(msg[0]); if (match && match[1]) { this.onRemoteCall(match[1]); } diff --git a/src/datastore/Models.ts b/src/datastore/Models.ts index 876ff006..d0ab50af 100644 --- a/src/datastore/Models.ts +++ b/src/datastore/Models.ts @@ -26,7 +26,7 @@ export interface RoomEntry { slack_type?: string; id: string; name: string; - webhook_uri: string; + webhook_uri?: string; slack_private?: boolean; puppet_owner?: string; }; diff --git a/src/scripts/migrateToPostgres.ts b/src/scripts/migrateToPostgres.ts index cfeb67d6..8af79785 100644 --- a/src/scripts/migrateToPostgres.ts +++ b/src/scripts/migrateToPostgres.ts @@ -27,19 +27,25 @@ import { NedbDatastore } from "../datastore/NedbDatastore"; import { PgDatastore } from "../datastore/postgres/PgDatastore"; import { BridgedRoom } from "../BridgedRoom"; import { SlackGhost } from "../SlackGhost"; +import { Datastore, TeamEntry } from "../datastore/Models"; +import { WebClient } from "@slack/web-api"; +import { TeamInfoResponse } from "../SlackResponses"; +import { SlackClientFactory } from "../SlackClientFactory"; Logging.configure({ console: "info" }); const log = Logging.get("script"); +const POSTGRES_URL = process.argv[2]; +const NEDB_DIRECTORY = process.argv[3] || ""; +const USER_PREFIX = process.argv[4] || "slack_"; + async function main() { - const POSTGRES_URL = process.argv[2]; if (!POSTGRES_URL) { log.error("You must specify the postgres url (ex: postgresql://user:pass@host/database"); throw Error(""); } const pgres = new PgDatastore(POSTGRES_URL); await pgres.ensureSchema(); - const NEDB_DIRECTORY = process.argv[3] || ""; const config = { autoload: false, @@ -67,62 +73,135 @@ async function main() { new EventBridgeStore(eventStore), teamStore, ); + try { + const startedAt = Date.now(); + await migrateFromNedb(nedb, pgres); + log.info(`Completed migration in ${Math.round(Date.now() - startedAt)}ms`); + } catch (ex) { + log.error("An error occured while migrating databases:"); + log.error(ex); + log.error("Your existing databases have not been modified, but you may need to drop the postgres table and start over"); + } +} + +export async function migrateFromNedb(nedb: NedbDatastore, targetDs: Datastore) { const allRooms = await nedb.getAllRooms(); const allEvents = await nedb.getAllEvents(); - const allTeams = await nedb.getAllTeams(); + // the format has changed quite a bit. + // tslint:disable-next-line: no-any + const allTeams = (await nedb.getAllTeams()) as any[]; const allSlackUsers = await nedb.getAllUsers(false); const allMatrixUsers = await nedb.getAllUsers(true); + const slackClientFactory = new SlackClientFactory(targetDs); + log.info(`Migrating ${allRooms.length} rooms`); log.info(`Migrating ${allTeams.length} teams`); log.info(`Migrating ${allEvents.length} events`); log.info(`Migrating ${allSlackUsers.length} slack users`); log.info(`Migrating ${allMatrixUsers.length} matrix users`); - const roomMigrations = allRooms.map(async (room, i) => { + + const teamTokenMap: Map = new Map(); // token -> teamId. + let readyTeams: TeamEntry[]; + + const preTeamMigrations = () => Promise.all(allRooms.map(async (room, i) => { + // This is an old format remote + // tslint:disable-next-line: no-any + const at = (room.remote as any).access_token; + if (!at) { + return; + } + try { + const teamId = await slackClientFactory.upsertTeamByToken(at); + log.info("Got team from token:", teamId); + teamTokenMap.set(at, teamId); + } catch (ex) { + log.warn("Failed to get team token for slack token:", ex); + } + })); + + const teamMigrations = () => Promise.all(allTeams.map(async (team, i) => { + if (team.bot_token && !teamTokenMap.has(team.bot_token)) { + let teamId: string; + try { + teamId = await slackClientFactory.upsertTeamByToken(team.bot_token); + } catch (ex) { + log.warn("Team token is not valid:", ex); + return; + } + log.info("Got team from token:", teamId); + teamTokenMap.set(team.bot_token, teamId); + } else { + log.info(`Skipped team (${i + 1}/${allTeams.length})`); + } + log.info(`Migrated team (${i + 1}/${allTeams.length})`); + })); + + const roomMigrations = () => Promise.all(allRooms.map(async (room, i) => { // tslint:disable-next-line: no-any - await pgres.upsertRoom(BridgedRoom.fromEntry(null as any, room)); + const token = (room.remote as any).slack_bot_token; + if (!room.remote.slack_team_id && token) { + room.remote.slack_team_id = teamTokenMap.get(token); + } + await targetDs.upsertRoom(BridgedRoom.fromEntry(null as any, room)); log.info(`Migrated room ${room.id} (${i + 1}/${allRooms.length})`); - }); + })); - const eventMigrations = allEvents.map(async (event, i) => { - await pgres.upsertEvent(event); + const eventMigrations = () => Promise.all(allEvents.map(async (event, i) => { + await targetDs.upsertEvent(event); log.info(`Migrated event ${event.eventId} ${event.slackTs} (${i + 1}/${allEvents.length})`); - }); - - const teamMigrations = allTeams.map(async (team, i) => { - await pgres.upsertTeam(team); - log.info(`Migrated team ${team.id} ${team.name} (${i + 1}/${allTeams.length})`); - }); + })); - const slackUserMigrations = allSlackUsers.map(async (user, i) => { + const slackUserMigrations = () => Promise.all(allSlackUsers.map(async (user, i) => { // tslint:disable-next-line: no-any - const ghost = SlackGhost.fromEntry(null as any, user, null); - await pgres.upsertUser(ghost); + let ghost = SlackGhost.fromEntry(null as any, user, null); + if (!ghost.slackId || !ghost.teamId) { + const localpart = ghost.userId.split(":")[0]; + // XXX: we are making an assumption here that the prefix ends with _ + const parts = localpart.substr(USER_PREFIX.length + 1).split("_"); // Remove any prefix. + // If we encounter more parts than expected, the domain may be underscored + while (parts.length > 2) { + parts[0] = `${parts.shift()}_${parts[0]}`; + } + const existingTeam = readyTeams.find((t) => t.domain === parts[0]); + if (!existingTeam) { + log.warn("No existing team could be found for", ghost.userId); + return; + } + user.slack_id = parts[1]; + user.team_id = existingTeam!.id; + // tslint:disable-next-line: no-any + ghost = SlackGhost.fromEntry(null as any, user, null); + } + await targetDs.upsertUser(ghost); log.info(`Migrated slack user ${user.id} (${i + 1}/${allSlackUsers.length})`); - }); + })); - const matrixUserMigrations = allMatrixUsers.map(async (user, i) => { + const matrixUserMigrations = () => Promise.all(allMatrixUsers.map(async (user, i) => { const mxUser = new MatrixUser(user.id, user); // tslint:disable-next-line: no-any - await pgres.storeMatrixUser(mxUser); + await targetDs.storeMatrixUser(mxUser); log.info(`Migrated matrix user ${mxUser.getId()} (${i + 1}/${allMatrixUsers.length})`); - }); - - try { - await Promise.all( - roomMigrations.concat( - eventMigrations, - teamMigrations, - slackUserMigrations, - matrixUserMigrations, - ), - ); - log.info("Completed migration"); - } catch (ex) { - log.error("An error occured while migrating databases:"); - log.error(ex); - log.error("Your existing databases have not been modified, but you may need to drop the postgres table and start over"); - } + })); + log.info("Starting eventMigrations"); + await eventMigrations(); + log.info("Finished eventMigrations"); + log.info("Starting preTeamMigrations"); + await preTeamMigrations(); + log.info("Finished preTeamMigrations"); + log.info("Starting teamMigrations"); + await teamMigrations(); + log.info("Finished teamMigrations"); + readyTeams = await targetDs.getAllTeams(); + log.info("Starting roomMigrations"); + await roomMigrations(); + log.info("Finished roomMigrations"); + log.info("Starting slackUserMigrations"); + await slackUserMigrations(); + log.info("Finished slackUserMigrations"); + log.info("Starting matrixUserMigrations"); + await matrixUserMigrations(); + log.info("Finished matrixUserMigrations"); } main().then(() => {