from __future__ import annotations from enum import Enum import logging import operator from pathlib import Path import re import time import tomllib from typing import Any, TYPE_CHECKING import traceback from .config import Config from .discord_manager import DiscordManager from .logger import create_logger from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite, OverwriteType, Permissions, Role, TextChannel) from .youtube_manager import YoutubeManager from .youtube_subscription import SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, SubscriptionInfo, Subscriptions if TYPE_CHECKING: from breadtube_bot.youtube_objects import SearchResultItem class Bot: DEFAULT_MESSAGE_LIST_LIMIT = 50 DISCORD_NAME_REGEX = r'([^a-z])' INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n' 'You can upload a new one to update the configuration.') MAX_DOWNLOAD_SIZE: int = 50_000 SUBS_LIST_MIN_SIZE = 50 SUBS_LIST_SHORTS_RATIO = 5 SUBS_LIST_VIDEO_RATIO = 2 class Task(Enum): DELETE_MESSAGES = 1 SCAN_BOT_CHANNEL = 2 INIT_SUBS = 3 @staticmethod def _get_code_version() -> str: pyproject_path = Path(__file__).parents[1] / 'pyproject.toml' if not pyproject_path.exists(): raise RuntimeError('Cannot current bot version') return tomllib.loads(pyproject_path.read_text(encoding='utf-8'))['project']['version'] def __init__(self, bot_token: str, guild_id: int, yt_api_key: str, config: Config | None = None, log_level: int = logging.INFO): self.config: Config = config or Config() self.guild_id = guild_id self.logger = create_logger('breadtube', log_level, stdout=True) self.version = self._get_code_version() self.discord_manager = DiscordManager(bot_token=bot_token, bot_version=self.version, logger=self.logger) self.tasks: list[tuple[Bot.Task, float, Any]] = [] self.logger.info('Retrieving bot user') self.bot_user = self.discord_manager.get_current_user(request_timeout=self.config.request_timeout) self.logger.info('Retrieving guild roles before init') self.guild_roles: list[Role] = self.discord_manager.list_roles( self.guild_id, request_timeout=self.config.request_timeout) bot_role: Role | None = None everyone_role: Role | None = None for role in self.guild_roles: if role.name == self.config.bot_role: bot_role = role elif role.name == '@everyone': everyone_role = role if bot_role is None: raise RuntimeError('No BreadTube role found') if everyone_role is None: raise RuntimeError('No everyone role found') self.bot_role: Role = bot_role self.everyone_role: Role = everyone_role categories, text_channel = self.discord_manager.list_channels( self.guild_id, request_timeout=self.config.request_timeout) self.guild_text_channels: list[TextChannel] = text_channel self.guild_categories: list[ChannelCategory] = categories self.init_message: Message | None = None bot_channel: TextChannel | None = None for _ in range(self.config.bot_channel_init_retries): bot_channel = self.init_bot_channel() if bot_channel is not None: break time.sleep(5) if bot_channel is None: raise RuntimeError("Couldn't initialize bot channel/role/permission") self.bot_channel: TextChannel = bot_channel self._yt_subscriptions: Subscriptions = {} self._scan_bot_channel() self.tasks.append(( self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger) self.logger.info('Bot initialized') def init_bot_channel(self) -> TextChannel | None: for channel in self.guild_text_channels: if channel.name == self.config.bot_channel: self.logger.info('Found breadtube bot channel') for perm in channel.permission_overwrites: if perm.id == self.bot_role.id: if not perm.allow | Permissions.VIEW_CHANNEL: self.logger.info('BreadTube bot cannot view BreadTube channel: permission missing') return None self.logger.info('BreadTube channel permission OK') break return channel self.logger.info('Creating breadtube bot channel') return self.discord_manager.create_text_channel( self.guild_id, { 'name': self.config.bot_channel, 'permission_overwrites': [ Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, deny=Permissions.VIEW_CHANNEL), Overwrite(self.bot_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL, deny=Permissions.NONE)]}, request_timeout=self.config.request_timeout) def _get_all_channel_messages(self, channel: TextChannel) -> list[Message]: messages_id_delete_task: set[int] = set() for task_type, _, task_params in self.tasks: if task_type == self.Task.DELETE_MESSAGES: messages_id_delete_task.update(message.id for message in task_params) last_message_id: int | None = None messages: list[Message] = [] while True: message_batch = self.discord_manager.list_text_channel_messages( channel, request_timeout=self.config.request_timeout, after_id=last_message_id) messages.extend([m for m in message_batch if m.id not in messages_id_delete_task]) if len(message_batch) < self.DEFAULT_MESSAGE_LIST_LIMIT: break last_message_id = message_batch[-1].id return messages def _scan_bot_channel(self): # noqa: PLR0915 messages = self._get_all_channel_messages(self.bot_channel) init_message_found = False new_config: Config | None = None new_subscriptions: Subscriptions | None = None delayed_delete: dict[int, Message] = {} immediate_delete: dict[int, Message] = {} for message in messages: if init_message_found: self.logger.debug('Marking message for immediate deletion (init found): %s', message) immediate_delete[message.id] = message continue if len(message.attachments) <= 0: self.logger.debug('Marking message for immediate deletion (no attachment): %s', message) immediate_delete[message.id] = message continue if message.author.id == self.bot_user.id: self.logger.debug('Found init message') # If same init message: nothing to do if self.init_message is not None and message.id == self.init_message.id: continue # Loading init message content has_error = False for attachment in message.attachments: try: _, content = self.discord_manager.download_attachment( attachment, request_timeout=self.config.request_timeout) if new_config is None and content.startswith(b'config'): try: self.config = Config.from_str(content.decode()) except RuntimeError as error: self.logger.error('Cannot load config from init message: %s', error) has_error = True if new_subscriptions is None and content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]): try: subscriptions = SubscriptionHelper.read_text(content) if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()): SubscriptionHelper.update_subscriptions( new=subscriptions, previous=self._yt_subscriptions) self._yt_subscriptions = subscriptions self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None)) except RuntimeError as error: self.logger.error('Invalid init subscriptions file: %s', error) has_error = True except Exception as error: self.logger.error('Error downloading attachment: %s', error) has_error = True if not has_error: self.init_message = message init_message_found = True continue self.logger.debug('Reading attachment') attachment = message.attachments[0] if attachment.size > self.MAX_DOWNLOAD_SIZE: self.logger.debug('Marking message for immediate deletion (attachment too big): %s', message) immediate_delete[message.id] = message continue try: _, content = self.discord_manager.download_attachment( attachment, request_timeout=self.config.request_timeout) if content.startswith(b'config') and new_config is None: try: config = Config.from_str(content.decode()) if config != self.config: # New config to update to new_config = config self.logger.debug('Marking new config message for immediate deletion: %s', message) immediate_delete[message.id] = message continue except RuntimeError as error: self.logger.info('Invalid config file: %s', error) bot_message = self.discord_manager.create_message(self.bot_channel, { 'content': str(error), 'message_reference': MessageReference( type=MessageReferenceType.DEFAULT, message_id=message.id, channel_id=self.bot_channel.id, guild_id=None, fail_if_not_exists=None)}, request_timeout=self.config.request_timeout) delayed_delete[bot_message.id] = bot_message delayed_delete[message.id] = message continue if content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]): try: subscriptions = SubscriptionHelper.read_text(content) if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()): new_subscriptions = subscriptions self.logger.debug('Marking new subscriptions message for immediate deletion: %s', message) immediate_delete[message.id] = message continue except RuntimeError as error: self.logger.info('Invalid subscriptions file: %s', error) bot_message = self.discord_manager.create_message(self.bot_channel, { 'content': str(error), 'message_reference': MessageReference( type=MessageReferenceType.DEFAULT, message_id=message.id, channel_id=self.bot_channel.id, guild_id=None, fail_if_not_exists=None)}, request_timeout=self.config.request_timeout) delayed_delete[bot_message.id] = bot_message delayed_delete[message.id] = message continue except Exception as error: self.logger.error('Error downloading attachment: %s', error) if new_config is not None: self.logger.info('Loading config: %s', new_config) self.config = new_config if new_subscriptions is not None: self.logger.info('Loading subscriptions') SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions) self._yt_subscriptions = new_subscriptions self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None)) # New init message is needed, previous need to be deleted if (new_config is not None or new_subscriptions is not None) and self.init_message is not None: immediate_delete[self.init_message.id] = self.init_message self.init_message = None # Init message absent or deleted if self.init_message is None: assert self.config is not None self.init_message = self.discord_manager.create_message( self.bot_channel, {'content': self.INIT_MESSAGE}, request_timeout=self.config.request_timeout, upload_files=[ ('config.txt', FileMime.TEXT_PLAIN, self.config.to_str().encode()), ('subscriptions.csv', FileMime.TEXT_CSV, SubscriptionHelper.generate_text(self._yt_subscriptions)) ]) for message in immediate_delete.values(): try: self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) except RuntimeError as error: self.logger.error('Error deleting after bot channel scan (immediate): %s', error) if delayed_delete: self.tasks.append(( Bot.Task.DELETE_MESSAGES, time.time() + self.config.bot_message_duration, list(delayed_delete.values()))) def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel: discord_name = re.sub(self.DISCORD_NAME_REGEX, '-', subscription.name.lower()) category_value = ord(discord_name[0]) sub_channel: TextChannel | None = channel_dict.get(discord_name) if sub_channel is None: selected_category: ChannelCategory | None = None for start_range, stop_range, category in category_ranges: if start_range <= category_value <= stop_range: selected_category = category break if selected_category is None: selected_category = category_ranges[-1][2] sub_channel = self.discord_manager.create_text_channel( self.guild_id, { 'name': discord_name, 'parent_id': selected_category.id, 'permission_overwrites': [ Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, deny=Permissions.SEND_MESSAGES), Overwrite(self.bot_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL | Permissions.SEND_MESSAGES, deny=Permissions.NONE)]}, request_timeout=self.config.request_timeout) return sub_channel def _refresh_subscription(self, subscription: SubscriptionInfo): _, yt_video_info = self.yt_manager.request_channel_videos( channel_id=subscription.channel_id, max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count, request_timeout=self.config.request_timeout) video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list} for yt_info in yt_video_info.items: if yt_info.id.videoId in video_ids: continue if self.yt_manager.is_shorts(yt_info.id.videoId, request_timeout=self.config.request_timeout): subscription.shorts_list.append(yt_info) else: subscription.video_list.append(yt_info) video_ids.add(yt_info.id.videoId) internal_size = min(self.SUBS_LIST_MIN_SIZE, self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count) subscription.shorts_list = sorted( subscription.shorts_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] internal_size = min(self.SUBS_LIST_MIN_SIZE, self.SUBS_LIST_VIDEO_RATIO * self.config.youtube_channel_video_count) subscription.video_list = sorted( subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] subscription.last_update = time.time() @staticmethod def _video_message_content(video: SearchResultItem) -> str: return f'https://www.youtube.com/video/{video.id.videoId}' def _init_subs(self): self.logger.info('Initialize all subs') categories, text_channel = self.discord_manager.list_channels( self.guild_id, request_timeout=self.config.request_timeout) self.guild_text_channels = text_channel self.guild_categories = categories channel_dict: dict[str, TextChannel] = {c.name or '': c for c in self.guild_text_channels} unmanaged_categories: set[str] = set(self.config.unmanaged_categories.split(',')) category_ranges: list[tuple[int, int, ChannelCategory]] = [] for category in self.guild_categories: if category.name in unmanaged_categories: self.logger.debug('Skipping unmanaged category: %s', category.name) continue range_info = (category.name or '').split('-') if len(range_info) != 2: # noqa: PLR2004 self.logger.warning('Cannot compute range for category: %s', category.name) continue category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category)) category_ranges = sorted(category_ranges, key=operator.itemgetter(0)) for sub_info in self._yt_subscriptions.values(): try: sub_channel = self._get_subscription_channel(sub_info, channel_dict, category_ranges) except RuntimeError as error: self.logger.error(error) continue if sub_info.channel_info is None: _, channel_info = self.yt_manager.request_channel_info( sub_info.channel_id, request_timeout=self.config.request_timeout) if not channel_info.items: raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name) sub_info.channel_info = channel_info.items[0].snippet self._refresh_subscription(sub_info) sub_init_message = f'https://www.youtube.com/{sub_info.channel_info.customUrl}' sub_messages = self._get_all_channel_messages(sub_channel) if not sub_messages or sub_messages[-1].content != sub_init_message: self.logger.debug('Clearing sub channel: %s', sub_channel.name) for message in sub_messages: self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) _ = self.discord_manager.create_message( sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout) else: messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count])) yt_videos = list(reversed(sub_info.video_list[:self.config.youtube_channel_video_count])) immediate_delete: dict[int, Message] = { m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]} last_matching_index = 0 stop_scan = False for yt_video in yt_videos: for index, message in enumerate(messages[last_matching_index:], start=last_matching_index): if message.content != self._video_message_content(yt_video): if last_matching_index != 0: stop_scan = True break self.logger.debug('Unmatched video: %s', yt_video.id.videoId) immediate_delete[message.id] = message else: self.logger.debug('Matched video: %s', yt_video.id.videoId) last_matching_index = index + 1 break else: self.logger.debug('All videos scanned') break if stop_scan: break for message in messages[last_matching_index:]: immediate_delete[message.id] = message for message in immediate_delete.values(): try: self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) except RuntimeError as error: self.logger.error('Error deleting message %s from channel %s : %s', message.id, sub_channel.name, error) for video in yt_videos[last_matching_index:]: _ = self.discord_manager.create_message( sub_channel, {'content': self._video_message_content(video)}, request_timeout=self.config.request_timeout) sub_info.last_update = time.time() def run(self): while self.tasks: self.tasks = sorted(self.tasks, key=operator.itemgetter(1), reverse=True) task_type, task_time, task_params = self.tasks.pop() sleep_time = task_time - time.time() self.logger.debug( 'Next task %s at %.03f (sleeping for %.03fs) : %s', task_type, task_time, sleep_time, task_params) if sleep_time > 0: time.sleep(sleep_time) match task_type: case Bot.Task.DELETE_MESSAGES: if not isinstance(task_params, list): self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params) elif not task_params: self.logger.error('Empty params for DELETE_MESSAGES: %s', task_params) elif any(not isinstance(v, Message) for v in task_params): self.logger.error('All params not int for DELETE_MESSAGES: %s', task_params) else: for message in task_params: try: self.discord_manager.delete_message( message, request_timeout=self.config.request_timeout) except Exception as error: self.logger.error('Error deleting message %s: %s -> %s', message, error, traceback.format_exc().replace('\n', ' | ')) case Bot.Task.SCAN_BOT_CHANNEL: try: self._scan_bot_channel() except Exception as error: self.logger.error('Error scanning bot channel: %s -> %s', error, traceback.format_exc().replace('\n', ' | ')) self.tasks.append(( self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) case Bot.Task.INIT_SUBS: try: self._init_subs() except Exception as error: self.logger.error('Error initializing subscriptions : %s -> %s', error, traceback.format_exc().replace('\n', ' | ')) self.tasks.append(( self.Task.INIT_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None))