478 lines
26 KiB
Python
478 lines
26 KiB
Python
from __future__ import annotations
|
|
|
|
from enum import Enum
|
|
import html
|
|
import http.client
|
|
import logging
|
|
import operator
|
|
from pathlib import Path
|
|
import re
|
|
import time
|
|
import tomllib
|
|
from typing import Any
|
|
import traceback
|
|
|
|
from .config import Config
|
|
from .discord_manager import DiscordManager
|
|
from .logger import create_logger
|
|
from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite,
|
|
OverwriteType, Permissions, Role, TextChannel)
|
|
from .youtube_manager import YoutubeManager
|
|
from .youtube_subscription import (
|
|
SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, SubscriptionInfo, Subscriptions, VideoInfo)
|
|
|
|
|
|
class Bot:
|
|
DEFAULT_MESSAGE_LIST_LIMIT: int = 50
|
|
DISCORD_NAME_REGEX: str = r'([^a-z])'
|
|
INIT_MESSAGE: str = ('Bot initialized.\nThis is the current configuration used.\n'
|
|
'You can upload a new one to update the configuration.')
|
|
MAX_DOWNLOAD_SIZE: int = 50_000
|
|
|
|
class Task(Enum):
|
|
DELETE_MESSAGES = 1
|
|
SCAN_BOT_CHANNEL = 2
|
|
REFRESH_SUBS = 3
|
|
|
|
@staticmethod
|
|
def _get_code_version() -> str:
|
|
pyproject_path = Path(__file__).parents[1] / 'pyproject.toml'
|
|
if not pyproject_path.exists():
|
|
raise RuntimeError('Cannot current bot version')
|
|
return tomllib.loads(pyproject_path.read_text(encoding='utf-8'))['project']['version']
|
|
|
|
def __init__(self, bot_token: str, guild_id: int, config: Config | None = None,
|
|
log_level: int = logging.INFO):
|
|
self.config: Config = config or Config()
|
|
self.guild_id = guild_id
|
|
self.logger = create_logger('breadtube', log_level, stdout=True)
|
|
|
|
self.version = self._get_code_version()
|
|
self.discord_manager = DiscordManager(bot_token=bot_token, bot_version=self.version, logger=self.logger)
|
|
self.tasks: list[tuple[Bot.Task, float, Any]] = []
|
|
|
|
self.logger.info('Retrieving bot user')
|
|
self.bot_user = self.discord_manager.get_current_user(request_timeout=self.config.request_timeout)
|
|
self.logger.info('Retrieving guild roles before init')
|
|
self.guild_roles: list[Role] = self.discord_manager.list_roles(
|
|
self.guild_id, request_timeout=self.config.request_timeout)
|
|
bot_role: Role | None = None
|
|
everyone_role: Role | None = None
|
|
for role in self.guild_roles:
|
|
if role.name == self.config.bot_role:
|
|
bot_role = role
|
|
elif role.name == '@everyone':
|
|
everyone_role = role
|
|
if bot_role is None:
|
|
raise RuntimeError('No BreadTube role found')
|
|
if everyone_role is None:
|
|
raise RuntimeError('No everyone role found')
|
|
self.bot_role: Role = bot_role
|
|
self.everyone_role: Role = everyone_role
|
|
|
|
categories, text_channel = self.discord_manager.list_channels(
|
|
self.guild_id, request_timeout=self.config.request_timeout)
|
|
self.guild_text_channels: list[TextChannel] = text_channel
|
|
self.guild_categories: list[ChannelCategory] = categories
|
|
self.init_message: Message | None = None
|
|
|
|
bot_channel: TextChannel | None = None
|
|
for _ in range(self.config.bot_channel_init_retries):
|
|
bot_channel = self.init_bot_channel()
|
|
if bot_channel is not None:
|
|
break
|
|
time.sleep(5)
|
|
if bot_channel is None:
|
|
raise RuntimeError("Couldn't initialize bot channel/role/permission")
|
|
self.bot_channel: TextChannel = bot_channel
|
|
|
|
self.yt_manager = YoutubeManager(logger=self.logger)
|
|
self._yt_subscriptions: Subscriptions = {}
|
|
self._scan_bot_channel()
|
|
self.tasks.append((
|
|
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
|
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
|
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
|
|
self.logger.info('Bot initialized')
|
|
|
|
def init_bot_channel(self) -> TextChannel | None:
|
|
for channel in self.guild_text_channels:
|
|
if channel.name == self.config.bot_channel:
|
|
self.logger.info('Found breadtube bot channel')
|
|
for perm in channel.permission_overwrites:
|
|
if perm.id == self.bot_role.id:
|
|
if not perm.allow | Permissions.VIEW_CHANNEL:
|
|
self.logger.info('BreadTube bot cannot view BreadTube channel: permission missing')
|
|
return None
|
|
self.logger.info('BreadTube channel permission OK')
|
|
break
|
|
return channel
|
|
|
|
self.logger.info('Creating breadtube bot channel')
|
|
return self.discord_manager.create_text_channel(
|
|
self.guild_id, {
|
|
'name': self.config.bot_channel,
|
|
'permission_overwrites': [
|
|
Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE,
|
|
deny=Permissions.VIEW_CHANNEL),
|
|
Overwrite(self.bot_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL,
|
|
deny=Permissions.NONE)]},
|
|
request_timeout=self.config.request_timeout)
|
|
|
|
def _get_all_channel_messages(self, channel: TextChannel) -> list[Message]:
|
|
messages_id_delete_task: set[int] = set()
|
|
for task_type, _, task_params in self.tasks:
|
|
if task_type == self.Task.DELETE_MESSAGES:
|
|
messages_id_delete_task.update(message.id for message in task_params)
|
|
|
|
last_message_id: int | None = None
|
|
messages: list[Message] = []
|
|
while True:
|
|
message_batch = self.discord_manager.list_text_channel_messages(
|
|
channel, request_timeout=self.config.request_timeout, after_id=last_message_id)
|
|
messages.extend([m for m in message_batch if m.id not in messages_id_delete_task])
|
|
if len(message_batch) < self.DEFAULT_MESSAGE_LIST_LIMIT:
|
|
break
|
|
last_message_id = message_batch[-1].id
|
|
return messages
|
|
|
|
def _scan_bot_channel(self): # noqa: PLR0915
|
|
self.logger.debug('Starting scanning bot channel')
|
|
messages = self._get_all_channel_messages(self.bot_channel)
|
|
init_message_found = False
|
|
new_config: Config | None = None
|
|
new_subscriptions: Subscriptions | None = None
|
|
delayed_delete: dict[int, Message] = {}
|
|
immediate_delete: dict[int, Message] = {}
|
|
for message in messages:
|
|
if init_message_found:
|
|
self.logger.debug('Marking message for immediate deletion (init found): %s', message)
|
|
immediate_delete[message.id] = message
|
|
continue
|
|
|
|
if len(message.attachments) <= 0:
|
|
self.logger.debug('Marking message for immediate deletion (no attachment): %s', message)
|
|
immediate_delete[message.id] = message
|
|
continue
|
|
|
|
if message.author.id == self.bot_user.id:
|
|
self.logger.debug('Found init message')
|
|
# If same init message: nothing to do
|
|
if self.init_message is not None and message.id == self.init_message.id:
|
|
continue
|
|
# Loading init message content
|
|
has_error = False
|
|
for attachment in message.attachments:
|
|
try:
|
|
_, content = self.discord_manager.download_attachment(
|
|
attachment, request_timeout=self.config.request_timeout)
|
|
if new_config is None and content.startswith(b'config'):
|
|
try:
|
|
self.config = Config.from_str(content.decode())
|
|
if self.config.to_str() != content.decode():
|
|
new_config = self.config
|
|
except RuntimeError as error:
|
|
self.logger.error('Cannot load config from init message: %s', error)
|
|
has_error = True
|
|
if new_subscriptions is None and content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]):
|
|
try:
|
|
subscriptions = SubscriptionHelper.read_text(content)
|
|
if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()):
|
|
SubscriptionHelper.update_subscriptions(
|
|
new=subscriptions, previous=self._yt_subscriptions)
|
|
self._yt_subscriptions = subscriptions
|
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
|
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
|
|
except RuntimeError as error:
|
|
self.logger.error('Invalid init subscriptions file: %s', error)
|
|
has_error = True
|
|
except Exception as error:
|
|
self.logger.error('Error downloading attachment: %s', error)
|
|
has_error = True
|
|
if not has_error:
|
|
self.init_message = message
|
|
init_message_found = True
|
|
continue
|
|
|
|
self.logger.debug('Reading attachment')
|
|
attachment = message.attachments[0]
|
|
if attachment.size > self.MAX_DOWNLOAD_SIZE:
|
|
self.logger.debug('Marking message for immediate deletion (attachment too big): %s', message)
|
|
immediate_delete[message.id] = message
|
|
continue
|
|
try:
|
|
_, content = self.discord_manager.download_attachment(
|
|
attachment, request_timeout=self.config.request_timeout)
|
|
if content.startswith(b'config') and new_config is None:
|
|
try:
|
|
config = Config.from_str(content.decode())
|
|
if config != self.config: # New config to update to
|
|
new_config = config
|
|
self.logger.debug('Marking new config message for immediate deletion: %s', message)
|
|
immediate_delete[message.id] = message
|
|
continue
|
|
except RuntimeError as error:
|
|
self.logger.info('Invalid config file: %s', error)
|
|
bot_message = self.discord_manager.create_message(self.bot_channel, {
|
|
'content': str(error),
|
|
'message_reference': MessageReference(
|
|
type=MessageReferenceType.DEFAULT,
|
|
message_id=message.id,
|
|
channel_id=self.bot_channel.id,
|
|
guild_id=None,
|
|
fail_if_not_exists=None)}, request_timeout=self.config.request_timeout)
|
|
delayed_delete[bot_message.id] = bot_message
|
|
delayed_delete[message.id] = message
|
|
continue
|
|
if content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]):
|
|
try:
|
|
subscriptions = SubscriptionHelper.read_text(content)
|
|
if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()):
|
|
new_subscriptions = subscriptions
|
|
self.logger.debug('Marking new subscriptions message for immediate deletion: %s', message)
|
|
immediate_delete[message.id] = message
|
|
continue
|
|
except RuntimeError as error:
|
|
self.logger.info('Invalid subscriptions file: %s', error)
|
|
bot_message = self.discord_manager.create_message(self.bot_channel, {
|
|
'content': str(error),
|
|
'message_reference': MessageReference(
|
|
type=MessageReferenceType.DEFAULT,
|
|
message_id=message.id,
|
|
channel_id=self.bot_channel.id,
|
|
guild_id=None,
|
|
fail_if_not_exists=None)}, request_timeout=self.config.request_timeout)
|
|
delayed_delete[bot_message.id] = bot_message
|
|
delayed_delete[message.id] = message
|
|
continue
|
|
except Exception as error:
|
|
self.logger.error('Error downloading attachment: %s', error)
|
|
|
|
if new_config is not None:
|
|
self.logger.info('Loading config: %s', new_config)
|
|
self.config = new_config
|
|
|
|
if new_subscriptions is not None:
|
|
self.logger.info('Loading subscriptions')
|
|
SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions)
|
|
self._yt_subscriptions = new_subscriptions
|
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
|
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
|
|
|
|
# New init message is needed, previous need to be deleted
|
|
if (new_config is not None or new_subscriptions is not None) and self.init_message is not None:
|
|
immediate_delete[self.init_message.id] = self.init_message
|
|
self.init_message = None
|
|
|
|
# Init message absent or deleted
|
|
if self.init_message is None:
|
|
assert self.config is not None
|
|
self.init_message = self.discord_manager.create_message(
|
|
self.bot_channel, {'content': self.INIT_MESSAGE}, request_timeout=self.config.request_timeout,
|
|
upload_files=[
|
|
('config.txt', FileMime.TEXT_PLAIN, self.config.to_str().encode()),
|
|
('subscriptions.csv', FileMime.TEXT_CSV, SubscriptionHelper.generate_text(self._yt_subscriptions))
|
|
])
|
|
|
|
for message in immediate_delete.values():
|
|
try:
|
|
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
|
|
except RuntimeError as error:
|
|
self.logger.error('Error deleting after bot channel scan (immediate): %s', error)
|
|
|
|
if delayed_delete:
|
|
self.tasks.append((
|
|
Bot.Task.DELETE_MESSAGES,
|
|
time.time() + self.config.bot_message_duration,
|
|
list(delayed_delete.values())))
|
|
self.logger.debug('Bot channel scanned')
|
|
|
|
def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel],
|
|
category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel:
|
|
discord_name = re.sub(self.DISCORD_NAME_REGEX, '-', subscription.name.lower())
|
|
category_value = ord(discord_name[0])
|
|
sub_channel: TextChannel | None = channel_dict.get(discord_name)
|
|
if sub_channel is None:
|
|
selected_category: ChannelCategory | None = None
|
|
for start_range, stop_range, category in category_ranges:
|
|
if start_range <= category_value <= stop_range:
|
|
selected_category = category
|
|
break
|
|
if selected_category is None:
|
|
selected_category = category_ranges[-1][2]
|
|
sub_channel = self.discord_manager.create_text_channel(
|
|
self.guild_id, {
|
|
'name': discord_name,
|
|
'parent_id': selected_category.id,
|
|
'permission_overwrites': [
|
|
Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE,
|
|
deny=Permissions.SEND_MESSAGES),
|
|
Overwrite(self.bot_role.id, OverwriteType.ROLE,
|
|
allow=Permissions.VIEW_CHANNEL | Permissions.SEND_MESSAGES,
|
|
deny=Permissions.NONE)]},
|
|
request_timeout=self.config.request_timeout)
|
|
return sub_channel
|
|
|
|
def _refresh_subscription(self, connection: http.client.HTTPSConnection, subscription: SubscriptionInfo):
|
|
_, yt_channel_info, yt_video_info = self.yt_manager.request_channel_videos(
|
|
connection=connection, channel_id=subscription.channel_id)
|
|
if subscription.channel_info is None:
|
|
subscription.channel_info = yt_channel_info
|
|
video_ids: set[str] = {v.video_id for v in subscription.video_list}
|
|
new_videos = [video for video in yt_video_info if video.video_id not in video_ids]
|
|
if new_videos:
|
|
subscription.video_list = sorted(
|
|
subscription.video_list + new_videos, key=lambda x: x.published,
|
|
reverse=True)[:self.config.youtube_channel_video_count]
|
|
subscription.last_update = time.time()
|
|
|
|
def _video_message_content(self, video: VideoInfo, subscription: SubscriptionInfo) -> str:
|
|
return (self.config.youtube_channel_video_message
|
|
.replace('{{video_id}}', str(video.video_id))
|
|
.replace('{{video_title}}', str(html.unescape(video.title)))
|
|
.replace('{{video_description}}', str(video.description))
|
|
.replace('{{video_publish_time}}', video.published.isoformat())
|
|
.replace('{{channel_id}}', str(subscription.channel_info.channel_id)
|
|
if subscription.channel_info is not None else 'NO_CHANNEL_ID')
|
|
.replace('{{channel_title}}', str(subscription.channel_info.title
|
|
if subscription.channel_info is not None else 'NO_CHANNEL_TITLE')))
|
|
|
|
def _refresh_sub(self, connection: http.client.HTTPSConnection, subscription: SubscriptionInfo,
|
|
channel_dict: dict[str, TextChannel], category_ranges: list[tuple[int, int, ChannelCategory]]):
|
|
try:
|
|
sub_channel = self._get_subscription_channel(subscription, channel_dict, category_ranges)
|
|
except RuntimeError as error:
|
|
self.logger.error(error)
|
|
return
|
|
|
|
self._refresh_subscription(connection, subscription)
|
|
if subscription.channel_info is None:
|
|
raise RuntimeError('No channel info after refreshing subscription')
|
|
sub_init_message = subscription.channel_info.url
|
|
sub_messages = self._get_all_channel_messages(sub_channel)
|
|
if not sub_messages or sub_messages[-1].content != sub_init_message:
|
|
self.logger.debug('Clearing sub channel: %s', sub_channel.name)
|
|
for message in sub_messages:
|
|
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
|
|
_ = self.discord_manager.create_message(
|
|
sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout)
|
|
else:
|
|
messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count]))
|
|
yt_videos = list(reversed(subscription.video_list[:self.config.youtube_channel_video_count]))
|
|
immediate_delete: dict[int, Message] = {
|
|
m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]}
|
|
last_matching_index = 0
|
|
stop_scan = False
|
|
for yt_video in yt_videos:
|
|
for index, message in enumerate(messages[last_matching_index:], start=last_matching_index):
|
|
if message.content != self._video_message_content(yt_video, subscription):
|
|
if last_matching_index != 0:
|
|
stop_scan = True
|
|
break
|
|
self.logger.debug('Unmatched video: %s', yt_video.video_id)
|
|
immediate_delete[message.id] = message
|
|
else:
|
|
self.logger.debug('Matched video: %s', yt_video.video_id)
|
|
last_matching_index = index + 1
|
|
break
|
|
else:
|
|
self.logger.debug('All videos scanned')
|
|
break
|
|
if stop_scan:
|
|
break
|
|
for message in messages[last_matching_index:]:
|
|
immediate_delete[message.id] = message
|
|
|
|
for message in immediate_delete.values():
|
|
try:
|
|
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
|
|
except RuntimeError as error:
|
|
self.logger.error('Error deleting message %s from channel %s : %s',
|
|
message.id, sub_channel.name, error)
|
|
for video in yt_videos[last_matching_index:]:
|
|
_ = self.discord_manager.create_message(
|
|
sub_channel, {'content': self._video_message_content(video, subscription)},
|
|
request_timeout=self.config.request_timeout)
|
|
|
|
subscription.last_update = time.time()
|
|
|
|
def _refresh_subs(self):
|
|
self.logger.info('Start refreshing subs')
|
|
categories, text_channel = self.discord_manager.list_channels(
|
|
self.guild_id, request_timeout=self.config.request_timeout)
|
|
self.guild_text_channels = text_channel
|
|
self.guild_categories = categories
|
|
|
|
channel_dict: dict[str, TextChannel] = {c.name or '': c for c in self.guild_text_channels}
|
|
unmanaged_categories: set[str] = set(self.config.unmanaged_categories.split(','))
|
|
category_ranges: list[tuple[int, int, ChannelCategory]] = []
|
|
for category in self.guild_categories:
|
|
if category.name in unmanaged_categories:
|
|
self.logger.debug('Skipping unmanaged category: %s', category.name)
|
|
continue
|
|
range_info = (category.name or '').split('-')
|
|
if len(range_info) != 2: # noqa: PLR2004
|
|
self.logger.warning('Cannot compute range for category: %s', category.name)
|
|
continue
|
|
category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category))
|
|
category_ranges = sorted(category_ranges, key=operator.itemgetter(0))
|
|
|
|
yt_connection = http.client.HTTPSConnection('www.youtube.com', timeout=self.config.request_timeout)
|
|
sorted_subs = sorted(self._yt_subscriptions.values(), key=lambda s: s.last_update)
|
|
for sub_info in sorted_subs:
|
|
try:
|
|
self._refresh_sub(yt_connection, sub_info, channel_dict, category_ranges)
|
|
except RuntimeError as error:
|
|
self.logger.error('Refreshing subscription %s failed: %s', sub_info.channel_id, error)
|
|
except TimeoutError as error:
|
|
self.logger.error('Timeout error refreshing subcription: %s', error)
|
|
break
|
|
except Exception as error:
|
|
self.logger.error('Refreshing subscription %s unexpectedly failed: %s', sub_info.channel_id, error)
|
|
break
|
|
yt_connection.close()
|
|
self.logger.info('Subs refreshed')
|
|
|
|
def run(self):
|
|
while self.tasks:
|
|
self.tasks = sorted(self.tasks, key=operator.itemgetter(1), reverse=True)
|
|
task_type, task_time, task_params = self.tasks.pop()
|
|
sleep_time = task_time - time.time()
|
|
self.logger.debug(
|
|
'Next task %s at %.03f (sleeping for %.03fs) : %s', task_type, task_time, sleep_time, task_params)
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
match task_type:
|
|
case Bot.Task.DELETE_MESSAGES:
|
|
if not isinstance(task_params, list):
|
|
self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params)
|
|
elif not task_params:
|
|
self.logger.error('Empty params for DELETE_MESSAGES: %s', task_params)
|
|
elif any(not isinstance(v, Message) for v in task_params):
|
|
self.logger.error('All params not int for DELETE_MESSAGES: %s', task_params)
|
|
else:
|
|
for message in task_params:
|
|
try:
|
|
self.discord_manager.delete_message(
|
|
message, request_timeout=self.config.request_timeout)
|
|
except Exception as error:
|
|
self.logger.error('Error deleting message %s: %s -> %s',
|
|
message, error, traceback.format_exc().replace('\n', ' | '))
|
|
case Bot.Task.SCAN_BOT_CHANNEL:
|
|
try:
|
|
self._scan_bot_channel()
|
|
except Exception as error:
|
|
self.logger.error('Error scanning bot channel: %s -> %s',
|
|
error, traceback.format_exc().replace('\n', ' | '))
|
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.SCAN_BOT_CHANNEL, self.tasks))
|
|
self.tasks.append((
|
|
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
|
|
case Bot.Task.REFRESH_SUBS:
|
|
try:
|
|
self._refresh_subs()
|
|
except Exception as error:
|
|
self.logger.error('Error initializing subscriptions : %s -> %s',
|
|
error, traceback.format_exc().replace('\n', ' | '))
|
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
|
self.tasks.append((
|
|
self.Task.REFRESH_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None))
|