from __future__ import annotations from enum import Enum import html import http.client import json import logging import operator from pathlib import Path import re import time import tomllib from typing import Any, TYPE_CHECKING import traceback from .config import Config from .discord_manager import ApiEncoder, DiscordManager from .logger import create_logger from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite, OverwriteType, Permissions, Role, TextChannel) from .youtube_manager import YoutubeManager from .youtube_subscription import SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, SubscriptionInfo, Subscriptions if TYPE_CHECKING: from breadtube_bot.youtube_objects import SearchResultItem class Bot: DEFAULT_MESSAGE_LIST_LIMIT: int = 50 DISCORD_NAME_REGEX: str = r'([^a-z])' INIT_MESSAGE: str = ('Bot initialized.\nThis is the current configuration used.\n' 'You can upload a new one to update the configuration.') MAX_DOWNLOAD_SIZE: int = 50_000 SUBS_LIST_MIN_SIZE: int = 50 SUBS_LIST_SHORTS_RATIO: int = 5 SUBS_LIST_VIDEO_RATIO: int = 2 SUBS_SAVE_PATH: Path = Path('/tmp/breadtube-bot_subs.json') class Task(Enum): DELETE_MESSAGES = 1 SCAN_BOT_CHANNEL = 2 REFRESH_SUBS = 3 @staticmethod def _get_code_version() -> str: pyproject_path = Path(__file__).parents[1] / 'pyproject.toml' if not pyproject_path.exists(): raise RuntimeError('Cannot current bot version') return tomllib.loads(pyproject_path.read_text(encoding='utf-8'))['project']['version'] def __init__(self, bot_token: str, guild_id: int, yt_api_key: str, config: Config | None = None, log_level: int = logging.INFO): self.config: Config = config or Config() self.guild_id = guild_id self.logger = create_logger('breadtube', log_level, stdout=True) self.version = self._get_code_version() self.discord_manager = DiscordManager(bot_token=bot_token, bot_version=self.version, logger=self.logger) self.tasks: list[tuple[Bot.Task, float, Any]] = [] self.logger.info('Retrieving bot user') self.bot_user = self.discord_manager.get_current_user(request_timeout=self.config.request_timeout) self.logger.info('Retrieving guild roles before init') self.guild_roles: list[Role] = self.discord_manager.list_roles( self.guild_id, request_timeout=self.config.request_timeout) bot_role: Role | None = None everyone_role: Role | None = None for role in self.guild_roles: if role.name == self.config.bot_role: bot_role = role elif role.name == '@everyone': everyone_role = role if bot_role is None: raise RuntimeError('No BreadTube role found') if everyone_role is None: raise RuntimeError('No everyone role found') self.bot_role: Role = bot_role self.everyone_role: Role = everyone_role categories, text_channel = self.discord_manager.list_channels( self.guild_id, request_timeout=self.config.request_timeout) self.guild_text_channels: list[TextChannel] = text_channel self.guild_categories: list[ChannelCategory] = categories self.init_message: Message | None = None bot_channel: TextChannel | None = None for _ in range(self.config.bot_channel_init_retries): bot_channel = self.init_bot_channel() if bot_channel is not None: break time.sleep(5) if bot_channel is None: raise RuntimeError("Couldn't initialize bot channel/role/permission") self.bot_channel: TextChannel = bot_channel self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger) self._yt_subscriptions: Subscriptions = { name: SubscriptionInfo.from_dict(info) for name, info in json.loads( self.SUBS_SAVE_PATH.read_text(encoding='utf-8')).items()} if self.SUBS_SAVE_PATH.exists() else {} self._scan_bot_channel() self.tasks.append(( self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None)) self.logger.info('Bot initialized') def init_bot_channel(self) -> TextChannel | None: for channel in self.guild_text_channels: if channel.name == self.config.bot_channel: self.logger.info('Found breadtube bot channel') for perm in channel.permission_overwrites: if perm.id == self.bot_role.id: if not perm.allow | Permissions.VIEW_CHANNEL: self.logger.info('BreadTube bot cannot view BreadTube channel: permission missing') return None self.logger.info('BreadTube channel permission OK') break return channel self.logger.info('Creating breadtube bot channel') return self.discord_manager.create_text_channel( self.guild_id, { 'name': self.config.bot_channel, 'permission_overwrites': [ Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, deny=Permissions.VIEW_CHANNEL), Overwrite(self.bot_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL, deny=Permissions.NONE)]}, request_timeout=self.config.request_timeout) def _get_all_channel_messages(self, channel: TextChannel) -> list[Message]: messages_id_delete_task: set[int] = set() for task_type, _, task_params in self.tasks: if task_type == self.Task.DELETE_MESSAGES: messages_id_delete_task.update(message.id for message in task_params) last_message_id: int | None = None messages: list[Message] = [] while True: message_batch = self.discord_manager.list_text_channel_messages( channel, request_timeout=self.config.request_timeout, after_id=last_message_id) messages.extend([m for m in message_batch if m.id not in messages_id_delete_task]) if len(message_batch) < self.DEFAULT_MESSAGE_LIST_LIMIT: break last_message_id = message_batch[-1].id return messages def _scan_bot_channel(self): # noqa: PLR0915 self.logger.debug('Starting scanning bot channel') messages = self._get_all_channel_messages(self.bot_channel) init_message_found = False new_config: Config | None = None new_subscriptions: Subscriptions | None = None delayed_delete: dict[int, Message] = {} immediate_delete: dict[int, Message] = {} for message in messages: if init_message_found: self.logger.debug('Marking message for immediate deletion (init found): %s', message) immediate_delete[message.id] = message continue if len(message.attachments) <= 0: self.logger.debug('Marking message for immediate deletion (no attachment): %s', message) immediate_delete[message.id] = message continue if message.author.id == self.bot_user.id: self.logger.debug('Found init message') # If same init message: nothing to do if self.init_message is not None and message.id == self.init_message.id: continue # Loading init message content has_error = False for attachment in message.attachments: try: _, content = self.discord_manager.download_attachment( attachment, request_timeout=self.config.request_timeout) if new_config is None and content.startswith(b'config'): try: self.config = Config.from_str(content.decode()) if self.config.to_str() != content.decode(): new_config = self.config except RuntimeError as error: self.logger.error('Cannot load config from init message: %s', error) has_error = True if new_subscriptions is None and content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]): try: subscriptions = SubscriptionHelper.read_text(content) if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()): SubscriptionHelper.update_subscriptions( new=subscriptions, previous=self._yt_subscriptions) self._yt_subscriptions = subscriptions self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None)) except RuntimeError as error: self.logger.error('Invalid init subscriptions file: %s', error) has_error = True except Exception as error: self.logger.error('Error downloading attachment: %s', error) has_error = True if not has_error: self.init_message = message init_message_found = True continue self.logger.debug('Reading attachment') attachment = message.attachments[0] if attachment.size > self.MAX_DOWNLOAD_SIZE: self.logger.debug('Marking message for immediate deletion (attachment too big): %s', message) immediate_delete[message.id] = message continue try: _, content = self.discord_manager.download_attachment( attachment, request_timeout=self.config.request_timeout) if content.startswith(b'config') and new_config is None: try: config = Config.from_str(content.decode()) if config != self.config: # New config to update to new_config = config self.logger.debug('Marking new config message for immediate deletion: %s', message) immediate_delete[message.id] = message continue except RuntimeError as error: self.logger.info('Invalid config file: %s', error) bot_message = self.discord_manager.create_message(self.bot_channel, { 'content': str(error), 'message_reference': MessageReference( type=MessageReferenceType.DEFAULT, message_id=message.id, channel_id=self.bot_channel.id, guild_id=None, fail_if_not_exists=None)}, request_timeout=self.config.request_timeout) delayed_delete[bot_message.id] = bot_message delayed_delete[message.id] = message continue if content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]): try: subscriptions = SubscriptionHelper.read_text(content) if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()): new_subscriptions = subscriptions self.logger.debug('Marking new subscriptions message for immediate deletion: %s', message) immediate_delete[message.id] = message continue except RuntimeError as error: self.logger.info('Invalid subscriptions file: %s', error) bot_message = self.discord_manager.create_message(self.bot_channel, { 'content': str(error), 'message_reference': MessageReference( type=MessageReferenceType.DEFAULT, message_id=message.id, channel_id=self.bot_channel.id, guild_id=None, fail_if_not_exists=None)}, request_timeout=self.config.request_timeout) delayed_delete[bot_message.id] = bot_message delayed_delete[message.id] = message continue except Exception as error: self.logger.error('Error downloading attachment: %s', error) if new_config is not None: self.logger.info('Loading config: %s', new_config) self.config = new_config if new_subscriptions is not None: self.logger.info('Loading subscriptions') SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions) self._yt_subscriptions = new_subscriptions self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None)) # New init message is needed, previous need to be deleted if (new_config is not None or new_subscriptions is not None) and self.init_message is not None: immediate_delete[self.init_message.id] = self.init_message self.init_message = None # Init message absent or deleted if self.init_message is None: assert self.config is not None self.init_message = self.discord_manager.create_message( self.bot_channel, {'content': self.INIT_MESSAGE}, request_timeout=self.config.request_timeout, upload_files=[ ('config.txt', FileMime.TEXT_PLAIN, self.config.to_str().encode()), ('subscriptions.csv', FileMime.TEXT_CSV, SubscriptionHelper.generate_text(self._yt_subscriptions)) ]) for message in immediate_delete.values(): try: self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) except RuntimeError as error: self.logger.error('Error deleting after bot channel scan (immediate): %s', error) if delayed_delete: self.tasks.append(( Bot.Task.DELETE_MESSAGES, time.time() + self.config.bot_message_duration, list(delayed_delete.values()))) self.logger.debug('Bot channel scanned') def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel: discord_name = re.sub(self.DISCORD_NAME_REGEX, '-', subscription.name.lower()) category_value = ord(discord_name[0]) sub_channel: TextChannel | None = channel_dict.get(discord_name) if sub_channel is None: selected_category: ChannelCategory | None = None for start_range, stop_range, category in category_ranges: if start_range <= category_value <= stop_range: selected_category = category break if selected_category is None: selected_category = category_ranges[-1][2] sub_channel = self.discord_manager.create_text_channel( self.guild_id, { 'name': discord_name, 'parent_id': selected_category.id, 'permission_overwrites': [ Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, deny=Permissions.SEND_MESSAGES), Overwrite(self.bot_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL | Permissions.SEND_MESSAGES, deny=Permissions.NONE)]}, request_timeout=self.config.request_timeout) return sub_channel def _refresh_subscription(self, subscription: SubscriptionInfo): _, yt_video_info = self.yt_manager.request_channel_videos( channel_id=subscription.channel_id, max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count, request_timeout=self.config.request_timeout) video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list} yt_connection = http.client.HTTPSConnection('www.youtube.com', timeout=self.config.request_timeout) for yt_info in yt_video_info.items: if yt_info.id.videoId in video_ids: continue if self.yt_manager.is_shorts(yt_connection, yt_info.id.videoId): subscription.shorts_list.append(yt_info) else: subscription.video_list.append(yt_info) video_ids.add(yt_info.id.videoId) internal_size = min(self.SUBS_LIST_MIN_SIZE, self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count) subscription.shorts_list = sorted( subscription.shorts_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] internal_size = min(self.SUBS_LIST_MIN_SIZE, self.SUBS_LIST_VIDEO_RATIO * self.config.youtube_channel_video_count) subscription.video_list = sorted( subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] subscription.last_update = time.time() def _video_message_content(self, video: SearchResultItem) -> str: return (self.config.youtube_channel_video_message .replace('{{video_id}}', str(video.id.videoId)) .replace('{{video_title}}', str(html.unescape(video.snippet.title))) .replace('{{video_description}}', str(video.snippet.description)) .replace('{{video_publish_time}}', video.snippet.publishTime.isoformat()) .replace('{{channel_id}}', str(video.snippet.channelId)) .replace('{{channel_title}}', str(video.snippet.channelTitle)) ) def _refresh_sub(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], category_ranges: list[tuple[int, int, ChannelCategory]]): try: sub_channel = self._get_subscription_channel(subscription, channel_dict, category_ranges) except RuntimeError as error: self.logger.error(error) return if subscription.channel_info is None: _, channel_info = self.yt_manager.request_channel_info( subscription.channel_id, request_timeout=self.config.request_timeout) if not channel_info.items: raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name) subscription.channel_info = channel_info.items[0].snippet self._refresh_subscription(subscription) sub_init_message = f'https://www.youtube.com/{subscription.channel_info.customUrl}' sub_messages = self._get_all_channel_messages(sub_channel) if not sub_messages or sub_messages[-1].content != sub_init_message: self.logger.debug('Clearing sub channel: %s', sub_channel.name) for message in sub_messages: self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) _ = self.discord_manager.create_message( sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout) else: messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count])) yt_videos = list(reversed(subscription.video_list[:self.config.youtube_channel_video_count])) immediate_delete: dict[int, Message] = { m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]} last_matching_index = 0 stop_scan = False for yt_video in yt_videos: for index, message in enumerate(messages[last_matching_index:], start=last_matching_index): if message.content != self._video_message_content(yt_video): if last_matching_index != 0: stop_scan = True break self.logger.debug('Unmatched video: %s', yt_video.id.videoId) immediate_delete[message.id] = message else: self.logger.debug('Matched video: %s', yt_video.id.videoId) last_matching_index = index + 1 break else: self.logger.debug('All videos scanned') break if stop_scan: break for message in messages[last_matching_index:]: immediate_delete[message.id] = message for message in immediate_delete.values(): try: self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) except RuntimeError as error: self.logger.error('Error deleting message %s from channel %s : %s', message.id, sub_channel.name, error) for video in yt_videos[last_matching_index:]: _ = self.discord_manager.create_message( sub_channel, {'content': self._video_message_content(video)}, request_timeout=self.config.request_timeout) subscription.last_update = time.time() def _refresh_subs(self): self.logger.info('Start refreshing subs') categories, text_channel = self.discord_manager.list_channels( self.guild_id, request_timeout=self.config.request_timeout) self.guild_text_channels = text_channel self.guild_categories = categories channel_dict: dict[str, TextChannel] = {c.name or '': c for c in self.guild_text_channels} unmanaged_categories: set[str] = set(self.config.unmanaged_categories.split(',')) category_ranges: list[tuple[int, int, ChannelCategory]] = [] for category in self.guild_categories: if category.name in unmanaged_categories: self.logger.debug('Skipping unmanaged category: %s', category.name) continue range_info = (category.name or '').split('-') if len(range_info) != 2: # noqa: PLR2004 self.logger.warning('Cannot compute range for category: %s', category.name) continue category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category)) category_ranges = sorted(category_ranges, key=operator.itemgetter(0)) sorted_subs = sorted(self._yt_subscriptions.values(), key=lambda s: s.last_update) for sub_info in sorted_subs: try: self._refresh_sub(sub_info, channel_dict, category_ranges) except TimeoutError as error: self.logger.error('Timeout error refreshing subcription: %s', error) break self.logger.info('Subs refreshed') def run(self): while self.tasks: self.tasks = sorted(self.tasks, key=operator.itemgetter(1), reverse=True) task_type, task_time, task_params = self.tasks.pop() sleep_time = task_time - time.time() self.logger.debug( 'Next task %s at %.03f (sleeping for %.03fs) : %s', task_type, task_time, sleep_time, task_params) if sleep_time > 0: time.sleep(sleep_time) match task_type: case Bot.Task.DELETE_MESSAGES: if not isinstance(task_params, list): self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params) elif not task_params: self.logger.error('Empty params for DELETE_MESSAGES: %s', task_params) elif any(not isinstance(v, Message) for v in task_params): self.logger.error('All params not int for DELETE_MESSAGES: %s', task_params) else: for message in task_params: try: self.discord_manager.delete_message( message, request_timeout=self.config.request_timeout) except Exception as error: self.logger.error('Error deleting message %s: %s -> %s', message, error, traceback.format_exc().replace('\n', ' | ')) case Bot.Task.SCAN_BOT_CHANNEL: try: self._scan_bot_channel() except Exception as error: self.logger.error('Error scanning bot channel: %s -> %s', error, traceback.format_exc().replace('\n', ' | ')) self.tasks = list(filter(lambda t: t[0] != Bot.Task.SCAN_BOT_CHANNEL, self.tasks)) self.tasks.append(( self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) case Bot.Task.REFRESH_SUBS: try: self._refresh_subs() except Exception as error: self.logger.error('Error initializing subscriptions : %s -> %s', error, traceback.format_exc().replace('\n', ' | ')) self.SUBS_SAVE_PATH.write_text( json.dumps(self._yt_subscriptions, cls=ApiEncoder, ensure_ascii=False), encoding='utf-8') self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) self.tasks.append(( self.Task.REFRESH_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None))