diff --git a/.gitignore b/.gitignore index 386d785..0215dd6 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ _deps .vs *.project *.workspace +discord_bot.json +/zerotier diff --git a/discord_bot.json.template b/discord_bot.json.template new file mode 100644 index 0000000..da233f6 --- /dev/null +++ b/discord_bot.json.template @@ -0,0 +1,3 @@ +{ + "token": "" +} \ No newline at end of file diff --git a/discord_bot.py b/discord_bot.py index 90e7b93..86ff0fd 100644 --- a/discord_bot.py +++ b/discord_bot.py @@ -1,34 +1,36 @@ -from typing import List, Dict, Any, Optional -import discord -import json -import time import asyncio +from collections import deque import datetime +import discord +import json +import logging import math import re -import warnings -from typing import Optional +import time +from typing import Any, Deque, Dict, List, Optional -# Constants -DISCORD_CHANNEL_ID = 1061483226767556719 +logger = logging.getLogger(__name__) intents = discord.Intents.default() intents.message_content = True client = discord.Client(intents=intents) -current_online = 0 -global_online_list_message: Optional[discord.Message] = None -global_channel: Optional[discord.TextChannel] = None -gameTTL = 120 # games are marked as active for x seconds every time they show up +channel: Optional[discord.TextChannel] = None +config: Dict[str, Any] = { + 'channel': 1061483226767556719, + 'game_ttl': 120, + 'refresh_seconds': 60, + 'banlist_file': './banlist', + 'gamelist_program': './devilutionx-gamelist' +} def escape_discord_formatting_characters(text: str) -> str: return re.sub(r'([-\\*_#|~:@[\]()<>`])', r'\\\1', text) -def format_game(game: Dict[str, Any]) -> str: - global gameTTL - ended = time.time() - game['last_seen'] >= gameTTL +def format_game_message(game: Dict[str, Any]) -> str: + ended = time.time() - game['last_seen'] >= config['game_ttl'] text = ''; if ended: text += '~~' + game['id'].upper() + '~~' @@ -103,37 +105,10 @@ def format_game(game: Dict[str, Any]) -> str: return text -async def update_status_message() -> None: - global current_online - global global_channel - global global_online_list_message - if global_online_list_message is not None: - try: - await global_online_list_message.delete() - except discord.errors.NotFound: - pass - global_online_list_message = None - text = 'There are currently **' + str(current_online) + '** public games.' +def format_status_message(current_online: int) -> str: if current_online == 1: - text = 'There is currently **' + str(current_online) + '** public game.' - assert isinstance(global_channel, discord.TextChannel) - global_online_list_message = await global_channel.send(text) - - -async def update_game_message(game_id: str) -> None: - global global_channel - text = format_game(game_list[game_id]) - if 'message' in game_list[game_id]: - message = game_list[game_id]['message']; - if isinstance(message, discord.Message): - if message.content != text: - try: - await message.edit(content=text) - except discord.errors.NotFound: - pass - return - assert isinstance(global_channel, discord.TextChannel) - game_list[game_id]['message'] = await global_channel.send(text) + return 'There is currently **' + str(current_online) + '** public game.' + return 'There are currently **' + str(current_online) + '** public games.' def format_time_delta(minutes: int) -> str: @@ -157,30 +132,6 @@ def format_time_delta(minutes: int) -> str: return text -async def end_game_message(game_id: str) -> None: - if 'message' in game_list[game_id]: - message = game_list[game_id]['message']; - if not isinstance(message, discord.Message): - return - try: - await message.edit(content=format_game(game_list[game_id])) - except discord.errors.NotFound: - pass - - -async def remove_game_messages(game_ids: List[str]) -> None: - for gameId in game_ids: - if 'message' in game_list[gameId]: - message = game_list[gameId]['message']; - if not isinstance(message, discord.Message): - continue - try: - await message.delete() - except discord.errors.NotFound: - pass - del game_list[gameId]['message'] - - def any_player_name_is_invalid(players: List[str]) -> bool: for name in players: # using the same restricted character list as DevilutionX, see @@ -199,36 +150,50 @@ def any_player_name_is_invalid(players: List[str]) -> bool: def any_player_name_contains_a_banned_word(players: List[str]) -> bool: - with open('./banlist', 'r') as ban_list_file: - words = set([line.strip().upper() for line in ban_list_file.read().split('\n') if line.strip()]) + if config['banlist_file'] != '': + try: + with open(config['banlist_file'], 'r') as ban_list_file: + words = set([line.strip().upper() for line in ban_list_file.read().split('\n') if line.strip()]) - for name in players: - for word in words: - if word in name.upper(): - return True + for name in players: + for word in words: + if word in name.upper(): + return True + except: + logger.warn('Unable to load banlist file') return False -game_list: Dict[str, Dict[str, Any]] = {} -background_task_running = 0 +async def update_message(message: discord.Message, text: str) -> Optional[discord.Message]: + if message.content != text: + try: + await message.edit(content=text) + except discord.errors.NotFound: + return None + return message + + +async def send_message(text: str) -> discord.Message: + assert isinstance(channel, discord.TextChannel) + return await channel.send(text) async def background_task() -> None: - global gameTTL - global current_online + known_games: Dict[str, Dict[str, Any]] = {} + active_messages: Deque[discord.Message] = deque() + last_refresh = 0.0 - refresh_seconds = 60 # refresh gamelist every x seconds while True: try: - sleep_time = refresh_seconds - (time.time() - last_refresh) + sleep_time = config['refresh_seconds'] - (time.time() - last_refresh) if sleep_time > 0: await asyncio.sleep(sleep_time) last_refresh = time.time() # Call the external program and get the output proc = await asyncio.create_subprocess_shell( - './devilutionx-gamelist', + config['gamelist_program'], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) @@ -244,61 +209,87 @@ async def background_task() -> None: # Load the output as a JSON list games = json.loads(output) - ct = datetime.datetime.now() - print('[' + str(ct) + '] Refreshing game list - ' + str(len(games)) + ' games') + logger.info('Refreshing game list - ' + str(len(games)) + ' games') for game in games: if any_player_name_is_invalid(game['players']) or any_player_name_contains_a_banned_word(game['players']): continue key = game['id'].upper() - if key in game_list: - game_list[key]['players'] = game['players'] - game_list[key]['last_seen'] = time.time() - continue + if key in known_games: + known_games[key]['players'] = game['players'] + else: + known_games[key] = game + known_games[key]['first_seen'] = time.time() - game_list[key] = game - game_list[key]['first_seen'] = time.time() - game_list[key]['last_seen'] = time.time() + known_games[key]['last_seen'] = time.time() - ended_games = [] - for key, game in game_list.items(): - if time.time() - game['last_seen'] < gameTTL: - continue - ended_games.append(key) - await end_game_message(key) + ended_games = [key for key, game in known_games.items() if time.time() - game['last_seen'] >= config['game_ttl']] for key in ended_games: - del game_list[key] - - if len(ended_games) != 0: - await remove_game_messages(list(game_list.keys())) - - for gameId in game_list.keys(): - await update_game_message(gameId) - - if (current_online == len(game_list)) or len(ended_games) != 0: - continue - - current_online = len(game_list) - await update_status_message() - - activity = discord.Activity(name='Games online: '+str(current_online), type=discord.ActivityType.watching) + if active_messages: + await update_message(active_messages.popleft(), format_game_message(known_games[key])) + del known_games[key] + + message_index = 0 + for game in known_games.values(): + message_text = format_game_message(game) + if message_index < len(active_messages): + message = await update_message(active_messages[message_index], message_text) + assert message is not None + else: + message = await send_message(message_text) + assert message is not None + active_messages.append(message) + message_index += 1 + + game_count = len(known_games) + if (len(active_messages) <= game_count): + message = await send_message(format_status_message(game_count)) + assert message is not None + active_messages.append(message) + else: + await update_message(active_messages[game_count], format_status_message(game_count)) + + activity = discord.Activity(name='Games online: '+str(game_count), type=discord.ActivityType.watching) await client.change_presence(activity=activity) except discord.errors.DiscordServerError as server_error: - warnings.warn(repr(server_error)) + logger.warn(repr(server_error)) @client.event async def on_ready() -> None: - print(f'We have logged in as {client.user}') - global global_channel - channel = client.get_channel(DISCORD_CHANNEL_ID) - assert isinstance(channel, discord.TextChannel) - global_channel = channel + logger.info(f'We have logged in as {client.user}') + + maybeChannel = client.get_channel(config['channel']) + assert isinstance(maybeChannel, discord.TextChannel) + + global channel + channel = maybeChannel await background_task() -with open('./discord_bot_token', 'r') as file: - token = file.readline() -client.run(token) +def run(runtimeConfig: Dict[str, Any]) -> None: + assert 'token' in runtimeConfig + + for key, value in runtimeConfig.items(): + config[key] = value + + client.run(config['token']) + + +def main() -> None: + logger.setLevel(logging.INFO) + handler = logging.StreamHandler() + formatter = logging.Formatter('[{asctime}] [{levelname:<8}] {name}: {message}', '%Y-%m-%d %H:%M:%S', style='{') + handler.setFormatter(formatter) + logger.addHandler(handler) + + with open('./discord_bot.json', 'r') as file: + runtimeConfig = json.load(file) + + run(runtimeConfig) + + +if __name__ == '__main__': + main()