diff --git a/redbot/cogs/audio/apis/global_db.py b/redbot/cogs/audio/apis/global_db.py index 8880abce6ba..7e791c84425 100644 --- a/redbot/cogs/audio/apis/global_db.py +++ b/redbot/cogs/audio/apis/global_db.py @@ -51,7 +51,7 @@ async def _get_api_key( self._token = await self.bot.get_shared_api_tokens("audiodb") self.api_key = self._token.get("api_key", None) self.has_api_key = self.cog.global_api_user.get("can_post") - id_list = list(self.bot.owner_ids) + id_list = list(self.bot.all_owner_ids) self._handshake_token = "||".join(map(str, id_list)) return self.api_key diff --git a/redbot/core/bot.py b/redbot/core/bot.py index 1075f5e6c7e..913f8d837d4 100644 --- a/redbot/core/bot.py +++ b/redbot/core/bot.py @@ -9,6 +9,8 @@ import weakref import functools from collections import namedtuple +from contextvars import ContextVar +from copy import copy from datetime import datetime from enum import IntEnum from importlib.machinery import ModuleSpec @@ -20,6 +22,7 @@ Iterable, Dict, NoReturn, + FrozenSet, Set, TypeVar, Callable, @@ -94,6 +97,7 @@ def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs): self._config = Config.get_core_conf(force_registration=False) self.rpc_enabled = cli_flags.rpc self.rpc_port = cli_flags.rpc_port + self._owner_sudo_tasks: Dict[int, asyncio.Task] = {} self._last_exception = None self._config.register_global( token=None, @@ -106,6 +110,7 @@ def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs): regional_format=None, embeds=True, color=15158332, + sudotime=15 * 60, # 15 minutes default fuzzy=False, custom_info=None, help__page_char_limit=1000, @@ -185,22 +190,41 @@ async def prefix_manager(bot, message) -> List[str]: if "command_prefix" not in kwargs: kwargs["command_prefix"] = prefix_manager - if "owner_id" in kwargs: - raise RuntimeError("Red doesn't accept owner_id kwarg, use owner_ids instead.") - if "intents" not in kwargs: intents = discord.Intents.all() for intent_name in cli_flags.disable_intent: setattr(intents, intent_name, False) kwargs["intents"] = intents - self._owner_id_overwrite = cli_flags.owner + # This keeps track of owners with elevated privileges in the different contexts. + # This is `None` if sudo functionality is disabled. + self._sudo_ctx_var: Optional[ContextVar] = None + if cli_flags.enable_sudo: + self._sudo_ctx_var = ContextVar("SudoOwners") + + if "owner_id" in kwargs: + raise RuntimeError("Red doesn't accept owner_id kwarg, use owner_ids instead.") + + # This is owner ID overwrite, which when set is used *instead of* owner of a non-team app. + # For teams, it is just appended to the set of all owner IDs. + # This is set to `--owner` *or* if the flag is not passed, to `self._config.owner()` + self._owner_id_overwrite: Optional[int] = cli_flags.owner + # These are IDs of ALL owners, whether they currently have elevated privileges or not + self._all_owner_ids: FrozenSet[int] = frozenset() + # These are IDs of the owners, that currently have their privileges elevated globally. + # If sudo functionality is not enabled, this will remain empty throughout bot's lifetime. + self._elevated_owner_ids: FrozenSet[int] = frozenset() if "owner_ids" in kwargs: - kwargs["owner_ids"] = set(kwargs["owner_ids"]) - else: - kwargs["owner_ids"] = set() - kwargs["owner_ids"].update(cli_flags.co_owner) + self._all_owner_ids = frozenset(kwargs.pop("owner_ids")) + self._all_owner_ids = self._all_owner_ids.union(cli_flags.co_owner) + + # ensure that d.py doesn't run into AttributeError when trying to set `self.owner_ids` + # See documentation of `owner_ids`'s setter for more information. + kwargs["owner_ids"] = self._all_owner_ids + + # to prevent multiple calls to app info during startup + self._app_info = None if "command_not_found" not in kwargs: kwargs["command_not_found"] = "Command {} not found.\n{}" @@ -221,9 +245,9 @@ async def prefix_manager(bot, message) -> List[str]: self._main_dir = bot_dir self._cog_mgr = CogManager() self._use_team_features = cli_flags.use_team_features - # to prevent multiple calls to app info during startup - self._app_info = None + super().__init__(*args, help_command=None, **kwargs) + # Do not manually use the help formatter attribute here, see `send_help_for`, # for a documented API. The internals of this object are still subject to change. self._help_formatter = commands.help.RedHelpFormatter() @@ -235,6 +259,43 @@ async def prefix_manager(bot, message) -> List[str]: self._deletion_requests: MutableMapping[int, asyncio.Lock] = weakref.WeakValueDictionary() + @property + def all_owner_ids(self) -> FrozenSet[int]: + """ + IDs of ALL owners regardless of their elevation status. + + If you're doing privilege checks, use `owner_ids` instead. + This attribute is meant to be used for things + that actually need to get a full list of owners for informational purposes. + + Example + ------- + `send_to_owners()` uses this property to be able to send message to + all bot owners, not just the ones that are currently using elevated permissions. + """ + return self._all_owner_ids + + @property + def owner_ids(self) -> FrozenSet[int]: + """ + IDs of owners that are elevated in current context. + + You should NEVER try to set to this attribute. + + This should be used for any privilege checks. + If sudo functionality is disabled, this will be equivalent to `all_owner_ids`. + """ + if self._sudo_ctx_var is None: + return self._all_owner_ids + return self._sudo_ctx_var.get(self._elevated_owner_ids) + + @owner_ids.setter + def owner_ids(self, value) -> NoReturn: + # this `if` is needed so that d.py's __init__ can "set" to `owner_ids` successfully + if self._sudo_ctx_var is None and self._all_owner_ids is value: + return # type: ignore[misc] + raise AttributeError("can't set attribute") + def set_help_formatter(self, formatter: commands.help.HelpFormatterABC): """ Set's Red's help formatter. @@ -1083,7 +1144,7 @@ async def _pre_login(self) -> None: if self._owner_id_overwrite is None: self._owner_id_overwrite = await self._config.owner() if self._owner_id_overwrite is not None: - self.owner_ids.add(self._owner_id_overwrite) + self._all_owner_ids |= {self._owner_id_overwrite} i18n_locale = await self._config.locale() i18n.set_locale(i18n_locale) @@ -1202,13 +1263,13 @@ async def _pre_fetch_owners(self) -> None: if app_info.team: if self._use_team_features: - self.owner_ids.update(m.id for m in app_info.team.members) + self._all_owner_ids |= {m.id for m in app_info.team.members} elif self._owner_id_overwrite is None: - self.owner_ids.add(app_info.owner.id) + self._all_owner_ids |= {app_info.owner.id} self._app_info = app_info - if not self.owner_ids: + if not self._all_owner_ids: raise _NoOwnerSet("Bot doesn't have any owner set!") async def start(self, *args, **kwargs): @@ -1477,14 +1538,23 @@ async def process_commands(self, message: discord.Message): messages, without the overhead of additional get_context calls per cog. """ - if not message.author.bot: - ctx = await self.get_context(message) - await self.invoke(ctx) - else: - ctx = None + if self._sudo_ctx_var is not None: + # we need to ensure that ctx var is set to actual value + # rather than rely on the default that can change at any moment + token = self._sudo_ctx_var.set(self.owner_ids) + + try: + if not message.author.bot: + ctx = await self.get_context(message) + await self.invoke(ctx) + else: + ctx = None - if ctx is None or ctx.valid is False: - self.dispatch("message_without_command", message) + if ctx is None or ctx.valid is False: + self.dispatch("message_without_command", message) + finally: + if self._sudo_ctx_var is not None: + self._sudo_ctx_var.reset(token) @staticmethod def list_packages(): @@ -1781,7 +1851,7 @@ async def get_owner_notification_destinations(self) -> List[discord.abc.Messagea await self.wait_until_red_ready() destinations = [] opt_outs = await self._config.owner_opt_out_list() - for user_id in self.owner_ids: + for user_id in self.all_owner_ids: if user_id not in opt_outs: user = self.get_user(user_id) if user and not user.bot: # user.bot is possible with flags and teams diff --git a/redbot/core/cli.py b/redbot/core/cli.py index dcc1378ffd6..356d97f369d 100644 --- a/redbot/core/cli.py +++ b/redbot/core/cli.py @@ -284,6 +284,9 @@ def parse_cli_flags(args): help="Enable showing local variables in tracebacks generated by Rich.\n" "Useful for development.", ) + parser.add_argument( + "--enable-sudo", action="store_true", help="Enable the Super User permission mechanics." + ) args = parser.parse_args(args) diff --git a/redbot/core/commands/commands.py b/redbot/core/commands/commands.py index 5de209be315..6de4ac3ee96 100644 --- a/redbot/core/commands/commands.py +++ b/redbot/core/commands/commands.py @@ -1166,3 +1166,22 @@ async def can_run(self, ctx, *args, **kwargs) -> bool: return await ctx.bot._config.datarequests.allow_user_requests() can_see = can_run + + +# This is intentionally left out of `__all__` as it is not intended for general use +class _IsTrueBotOwner(_RuleDropper, Command): + """ + These commands do not respect most forms of checks, and + should only be used with that in mind. + + This particular class is not supported for 3rd party use + """ + + async def can_run(self, ctx, *args, **kwargs) -> bool: + return ( + ctx.bot._sudo_ctx_var is not None + and not ctx.author.bot + and ctx.author.id in ctx.bot.all_owner_ids + ) + + can_see = can_run diff --git a/redbot/core/core_commands.py b/redbot/core/core_commands.py index 59883f17dbc..284041ba080 100644 --- a/redbot/core/core_commands.py +++ b/redbot/core/core_commands.py @@ -7,6 +7,8 @@ import logging import io import random +from copy import copy + import markdown import os import re @@ -38,7 +40,7 @@ ) from ._diagnoser import IssueDiagnoser from .utils import AsyncIter -from .utils._internal_utils import fetch_latest_red_version_info +from .utils._internal_utils import fetch_latest_red_version_info, is_sudo_enabled, timed_unsu from .utils.predicates import MessagePredicate from .utils.chat_formatting import ( box, @@ -2026,6 +2028,13 @@ async def set_showsettings(self, ctx: commands.Context): locale = global_data["locale"] regional_format = global_data["regional_format"] or locale colour = discord.Colour(global_data["color"]) + sudo_settings = ( + _("SU Timeout: {delay}\n").format( + delay=humanize_timedelta(seconds=global_data["sudotime"]) + ) + if ctx.bot._sudo_ctx_var is not None + else "" + ) prefix_string = " ".join(prefixes) settings = _( @@ -2034,7 +2043,8 @@ async def set_showsettings(self, ctx: commands.Context): "{guild_settings}" "Global locale: {locale}\n" "Global regional format: {regional_format}\n" - "Default embed colour: {colour}" + "Default embed colour: {colour}\n" + "{sudo_settings}" ).format( bot_name=ctx.bot.user.name, prefixes=prefix_string, @@ -2042,6 +2052,7 @@ async def set_showsettings(self, ctx: commands.Context): locale=locale, regional_format=regional_format, colour=colour, + sudo_settings=sudo_settings, ) for page in pagify(settings): await ctx.send(box(page)) @@ -4901,6 +4912,75 @@ async def count_ignored(self, ctx: commands.Context): ) return msg + @commands.command( + cls=commands.commands._IsTrueBotOwner, + name="su", + ) + async def su(self, ctx: commands.Context): + """Enable your bot owner privileges. + + SU permission is auto removed after interval set with `[p]set sutimeout` (Default to 15 minutes). + """ + if ctx.author.id not in self.bot.owner_ids: + self.bot._elevated_owner_ids |= {ctx.author.id} + await ctx.send(_("Your bot owner privileges have been enabled.")) + if ctx.author.id in self.bot._owner_sudo_tasks: + self.bot._owner_sudo_tasks[ctx.author.id].cancel() + del self.bot._owner_sudo_tasks[ctx.author.id] + self.bot._owner_sudo_tasks[ctx.author.id] = asyncio.create_task( + timed_unsu(ctx.author.id, self.bot) + ) + return + await ctx.send(_("Your bot owner privileges are already enabled.")) + + @commands.command( + cls=commands.commands._IsTrueBotOwner, + name="unsu", + ) + async def unsu(self, ctx: commands.Context): + """Disable your bot owner privileges.""" + if ctx.author.id in self.bot.owner_ids: + self.bot._elevated_owner_ids -= {ctx.author.id} + await ctx.send(_("Your bot owner privileges have been disabled.")) + return + await ctx.send(_("Your bot owner privileges are not currently enabled.")) + + @commands.command( + cls=commands.commands._IsTrueBotOwner, + name="sudo", + ) + async def sudo(self, ctx: commands.Context, *, command: str): + """Runs the specified command with bot owner permissions + + The prefix must not be entered. + """ + ids = self.bot._elevated_owner_ids.union({ctx.author.id}) + self.bot._sudo_ctx_var.set(ids) + msg = copy(ctx.message) + msg.content = ctx.prefix + command + ctx.bot.dispatch("message", msg) + + @_set.command() + @is_sudo_enabled() + @checks.is_owner() + async def sutimeout( + self, + ctx: commands.Context, + *, + interval: commands.TimedeltaConverter( + minimum=datetime.timedelta(minutes=1), + maximum=datetime.timedelta(days=1), + default_unit="minutes", + ) = datetime.timedelta(minutes=15), + ): + """ + Set the interval for SU permissions to auto expire. + """ + await self.bot._config.sudotime.set(interval.total_seconds()) + await ctx.send( + _("SU timer will expire after: {}.").format(humanize_timedelta(timedelta=interval)) + ) + # Removing this command from forks is a violation of the GPLv3 under which it is licensed. # Otherwise interfering with the ability for this command to be accessible is also a violation. @commands.cooldown(1, 180, lambda msg: (msg.channel.id, msg.author.id)) diff --git a/redbot/core/utils/_internal_utils.py b/redbot/core/utils/_internal_utils.py index 3a7ff1e7725..e03b573ddfd 100644 --- a/redbot/core/utils/_internal_utils.py +++ b/redbot/core/utils/_internal_utils.py @@ -32,6 +32,7 @@ import aiohttp import discord import pkg_resources +from discord.ext.commands import check from fuzzywuzzy import fuzz, process from rich.progress import ProgressColumn from rich.progress_bar import ProgressBar @@ -47,6 +48,7 @@ main_log = logging.getLogger("red") __all__ = ( + "timed_unsu", "safe_delete", "fuzzy_command_search", "format_fuzzy_results", @@ -57,6 +59,7 @@ "fetch_latest_red_version_info", "deprecated_removed", "RichIndefiniteBarColumn", + "is_sudo_enabled", ) _T = TypeVar("_T") @@ -357,3 +360,18 @@ def render(self, task): total=task.total, completed=task.completed, ) + + +def is_sudo_enabled(): + """Deny the command if sudo mechanic is not enabled.""" + + async def predicate(ctx): + return ctx.bot._sudo_ctx_var is not None + + return check(predicate) + + +async def timed_unsu(user_id: int, bot: Red): + await asyncio.sleep(delay=await bot._config.sudotime()) + bot._elevated_owner_ids -= {user_id} + bot._owner_sudo_tasks.pop(user_id, None)