From f510cc5aef298123aee79d0fdf93f24afbb6568c Mon Sep 17 00:00:00 2001 From: BreadTube Date: Mon, 29 Sep 2025 23:34:47 +0900 Subject: [PATCH] Subscription update --- breadtube_bot/bot.py | 240 ++++++++++++------ breadtube_bot/youtube_manager.py | 17 +- .../{yt_objects.py => youtube_objects.py} | 0 breadtube_bot/youtube_subscription.py | 6 +- 4 files changed, 182 insertions(+), 81 deletions(-) rename breadtube_bot/{yt_objects.py => youtube_objects.py} (100%) diff --git a/breadtube_bot/bot.py b/breadtube_bot/bot.py index f614d09..206cdf6 100644 --- a/breadtube_bot/bot.py +++ b/breadtube_bot/bot.py @@ -7,24 +7,30 @@ from pathlib import Path import re import time import tomllib -from typing import Any +from typing import Any, TYPE_CHECKING import traceback - from .config import Config from .discord_manager import DiscordManager from .logger import create_logger from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite, OverwriteType, Permissions, Role, TextChannel) from .youtube_manager import YoutubeManager -from .youtube_subscription import SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, Subscriptions +from .youtube_subscription import SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, SubscriptionInfo, Subscriptions + +if TYPE_CHECKING: + from breadtube_bot.youtube_objects import SearchResultItem class Bot: DEFAULT_MESSAGE_LIST_LIMIT = 50 + DISCORD_NAME_REGEX = r'([^a-z])' INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n' 'You can upload a new one to update the configuration.') MAX_DOWNLOAD_SIZE: int = 50_000 + SUBS_LIST_MIN_SIZE = 50 + SUBS_LIST_SHORTS_RATIO = 5 + SUBS_LIST_VIDEO_RATIO = 2 class Task(Enum): DELETE_MESSAGES = 1 @@ -40,7 +46,7 @@ class Bot: def __init__(self, bot_token: str, guild_id: int, yt_api_key: str, config: Config | None = None, log_level: int = logging.INFO): - self.config = config or Config() + self.config: Config = config or Config() self.guild_id = guild_id self.logger = create_logger('breadtube', log_level, stdout=True) @@ -115,7 +121,7 @@ class Bot: deny=Permissions.NONE)]}, request_timeout=self.config.request_timeout) - def _get_bot_channel_messages(self) -> list[Message]: + def _get_all_channel_messages(self, channel: TextChannel) -> list[Message]: messages_id_delete_task: set[int] = set() for task_type, _, task_params in self.tasks: if task_type == self.Task.DELETE_MESSAGES: @@ -125,7 +131,7 @@ class Bot: messages: list[Message] = [] while True: message_batch = self.discord_manager.list_text_channel_messages( - self.bot_channel, request_timeout=self.config.request_timeout, after_id=last_message_id) + channel, request_timeout=self.config.request_timeout, after_id=last_message_id) messages.extend([m for m in message_batch if m.id not in messages_id_delete_task]) if len(message_batch) < self.DEFAULT_MESSAGE_LIST_LIMIT: break @@ -133,7 +139,7 @@ class Bot: return messages def _scan_bot_channel(self): # noqa: PLR0915 - messages = self._get_bot_channel_messages() + messages = self._get_all_channel_messages(self.bot_channel) init_message_found = False new_config: Config | None = None new_subscriptions: Subscriptions | None = None @@ -277,7 +283,63 @@ class Bot: time.time() + self.config.bot_message_duration, list(delayed_delete.values()))) + def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], + category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel: + discord_name = re.sub(self.DISCORD_NAME_REGEX, '-', subscription.name.lower()) + category_value = ord(discord_name[0]) + sub_channel: TextChannel | None = channel_dict.get(discord_name) + if sub_channel is None: + selected_category: ChannelCategory | None = None + for start_range, stop_range, category in category_ranges: + if start_range <= category_value <= stop_range: + selected_category = category + break + if selected_category is None: + selected_category = category_ranges[-1][2] + sub_channel = self.discord_manager.create_text_channel( + self.guild_id, { + 'name': discord_name, + 'parent_id': selected_category.id, + 'permission_overwrites': [ + Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, + deny=Permissions.SEND_MESSAGES), + Overwrite(self.bot_role.id, OverwriteType.ROLE, + allow=Permissions.VIEW_CHANNEL | Permissions.SEND_MESSAGES, + deny=Permissions.NONE)]}, + request_timeout=self.config.request_timeout) + return sub_channel + + def _refresh_subscription(self, subscription: SubscriptionInfo): + _, yt_video_info = self.yt_manager.request_channel_videos( + channel_id=subscription.channel_id, + max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count, + request_timeout=self.config.request_timeout) + video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list} + for yt_info in yt_video_info.items: + if yt_info.id.videoId in video_ids: + continue + + if self.yt_manager.is_shorts(yt_info.id.videoId, request_timeout=self.config.request_timeout): + subscription.shorts_list.append(yt_info) + else: + subscription.video_list.append(yt_info) + video_ids.add(yt_info.id.videoId) + internal_size = min(self.SUBS_LIST_MIN_SIZE, + self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count) + subscription.shorts_list = sorted( + subscription.shorts_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] + internal_size = min(self.SUBS_LIST_MIN_SIZE, + self.SUBS_LIST_VIDEO_RATIO * self.config.youtube_channel_video_count) + subscription.video_list = sorted( + subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] + subscription.last_update = time.time() + + @staticmethod + def _video_message_content(video: SearchResultItem) -> str: + return f'https://www.youtube.com/video/{video.id.videoId}' + def _init_subs(self): + self.logger.info('Initialize all subs') categories, text_channel = self.discord_manager.list_channels( self.guild_id, request_timeout=self.config.request_timeout) self.guild_text_channels = text_channel @@ -297,81 +359,107 @@ class Bot: category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category)) category_ranges = sorted(category_ranges, key=operator.itemgetter(0)) - name_regex = r'([^a-z])' for sub_info in self._yt_subscriptions.values(): - discord_name = sub_info.name.lower() - discord_name = re.sub(name_regex, '-', discord_name) - category_value = ord(discord_name[0]) - sub_channel: TextChannel | None = channel_dict.get(discord_name) - if sub_channel is None: - selected_category: ChannelCategory | None = None - for start_range, stop_range, category in category_ranges: - if start_range <= category_value <= stop_range: - selected_category = category - break - if selected_category is None: - selected_category = category_ranges[-1][2] - sub_channel = self.discord_manager.create_text_channel( - self.guild_id, { - 'name': discord_name, - 'parent_id': selected_category.id, - 'permission_overwrites': [ - Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, - deny=Permissions.SEND_MESSAGES), - Overwrite(self.bot_role.id, OverwriteType.ROLE, - allow=Permissions.VIEW_CHANNEL | Permissions.SEND_MESSAGES, - deny=Permissions.NONE)]}, - request_timeout=self.config.request_timeout) + try: + sub_channel = self._get_subscription_channel(sub_info, channel_dict, category_ranges) + except RuntimeError as error: + self.logger.error(error) + continue if sub_info.channel_info is None: _, channel_info = self.yt_manager.request_channel_info( sub_info.channel_id, request_timeout=self.config.request_timeout) if not channel_info.items: - self.logger.error('No channel info return from YouTube API for channel: %s', discord_name) - continue + raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name) sub_info.channel_info = channel_info.items[0].snippet - channel_url = f'https://www.youtube.com/{sub_info.channel_info.customUrl}' - _ = self.discord_manager.create_message( - sub_channel, {'content': channel_url}, request_timeout=self.config.request_timeout) + + self._refresh_subscription(sub_info) + + sub_init_message = f'https://www.youtube.com/{sub_info.channel_info.customUrl}' + sub_messages = self._get_all_channel_messages(sub_channel) + if not sub_messages or sub_messages[-1].content != sub_init_message: + self.logger.debug('Clearing sub channel: %s', sub_channel.name) + for message in sub_messages: + self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) + _ = self.discord_manager.create_message( + sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout) + else: + messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count])) + yt_videos = list(reversed(sub_info.video_list[:self.config.youtube_channel_video_count])) + immediate_delete: dict[int, Message] = { + m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]} + last_matching_index = 0 + stop_scan = False + for yt_video in yt_videos: + for index, message in enumerate(messages[last_matching_index:], start=last_matching_index): + if message.content != self._video_message_content(yt_video): + if last_matching_index != 0: + stop_scan = True + break + self.logger.debug('Unmatched video: %s', yt_video.id.videoId) + immediate_delete[message.id] = message + else: + self.logger.debug('Matched video: %s', yt_video.id.videoId) + last_matching_index = index + 1 + break + else: + self.logger.debug('All videos scanned') + break + if stop_scan: + break + for message in messages[last_matching_index:]: + immediate_delete[message.id] = message + + for message in immediate_delete.values(): + try: + self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) + except RuntimeError as error: + self.logger.error('Error deleting message %s from channel %s : %s', + message.id, sub_channel.name, error) + for video in yt_videos[last_matching_index:]: + _ = self.discord_manager.create_message( + sub_channel, {'content': self._video_message_content(video)}, + request_timeout=self.config.request_timeout) + sub_info.last_update = time.time() def run(self): - while True: - if self.tasks: - self.tasks = sorted(self.tasks, key=operator.itemgetter(1), reverse=True) - task_type, task_time, task_params = self.tasks.pop() - sleep_time = task_time - time.time() - self.logger.debug( - 'Next task %s at %.03f (sleeping for %.03fs) : %s', task_type, task_time, sleep_time, task_params) - if sleep_time > 0: - time.sleep(sleep_time) - match task_type: - case Bot.Task.DELETE_MESSAGES: - if not isinstance(task_params, list): - self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params) - elif not task_params: - self.logger.error('Empty params for DELETE_MESSAGES: %s', task_params) - elif any(not isinstance(v, Message) for v in task_params): - self.logger.error('All params not int for DELETE_MESSAGES: %s', task_params) - else: - for message in task_params: - try: - self.discord_manager.delete_message( - message, request_timeout=self.config.request_timeout) - except Exception as error: - self.logger.error('Error deleting message %s: %s -> %s', - message, error, traceback.format_exc().replace('\n', ' | ')) - case Bot.Task.SCAN_BOT_CHANNEL: - try: - self._scan_bot_channel() - except Exception as error: - self.logger.error('Error scanning bot channel: %s -> %s', - error, traceback.format_exc().replace('\n', ' | ')) - self.tasks.append(( - self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) - case Bot.Task.INIT_SUBS: - try: - self._init_subs() - except Exception as error: - self.logger.error('Error initializing subscriptions : %s -> %s', - error, traceback.format_exc().replace('\n', ' | ')) - time.sleep(1) + while self.tasks: + self.tasks = sorted(self.tasks, key=operator.itemgetter(1), reverse=True) + task_type, task_time, task_params = self.tasks.pop() + sleep_time = task_time - time.time() + self.logger.debug( + 'Next task %s at %.03f (sleeping for %.03fs) : %s', task_type, task_time, sleep_time, task_params) + if sleep_time > 0: + time.sleep(sleep_time) + match task_type: + case Bot.Task.DELETE_MESSAGES: + if not isinstance(task_params, list): + self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params) + elif not task_params: + self.logger.error('Empty params for DELETE_MESSAGES: %s', task_params) + elif any(not isinstance(v, Message) for v in task_params): + self.logger.error('All params not int for DELETE_MESSAGES: %s', task_params) + else: + for message in task_params: + try: + self.discord_manager.delete_message( + message, request_timeout=self.config.request_timeout) + except Exception as error: + self.logger.error('Error deleting message %s: %s -> %s', + message, error, traceback.format_exc().replace('\n', ' | ')) + case Bot.Task.SCAN_BOT_CHANNEL: + try: + self._scan_bot_channel() + except Exception as error: + self.logger.error('Error scanning bot channel: %s -> %s', + error, traceback.format_exc().replace('\n', ' | ')) + self.tasks.append(( + self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) + case Bot.Task.INIT_SUBS: + try: + self._init_subs() + except Exception as error: + self.logger.error('Error initializing subscriptions : %s -> %s', + error, traceback.format_exc().replace('\n', ' | ')) + self.tasks.append(( + self.Task.INIT_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None)) diff --git a/breadtube_bot/youtube_manager.py b/breadtube_bot/youtube_manager.py index 9269ea4..f715bc9 100644 --- a/breadtube_bot/youtube_manager.py +++ b/breadtube_bot/youtube_manager.py @@ -1,13 +1,14 @@ from __future__ import annotations from dataclasses import dataclass +import http.client import json import time from typing import TYPE_CHECKING import urllib.error import urllib.request -from .yt_objects import ChannelResult, SearchResult +from .youtube_objects import ChannelResult, SearchResult if TYPE_CHECKING: @@ -17,6 +18,7 @@ if TYPE_CHECKING: class YoutubeManager: DEFAULT_DAILY_REQUESTS = 10_000 + SHORTS_CHECK_STATUS = 303 @dataclass class RateLimit: @@ -56,6 +58,15 @@ class YoutubeManager: except TimeoutError as error: raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error + def is_shorts(self, video_id: str, request_timeout: float) -> bool: + try: + connection = http.client.HTTPSConnection('www.youtube.com', timeout=request_timeout) + connection.request('GET', f'/shorts/{video_id}') + response = connection.getresponse() + return response.status != self.SHORTS_CHECK_STATUS + except Exception as error: + raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error + def request_channel_info(self, channel_id: str, request_timeout: float) -> tuple[ HTTPHeaders, ChannelResult]: url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet' @@ -65,7 +76,7 @@ class YoutubeManager: def request_channel_videos(self, channel_id: str, max_results: int, request_timeout: float) -> tuple[ HTTPHeaders, SearchResult]: - url = ('https://www.googleapis.com/youtube/v3/search?part=snippet' - f'&channelId={channel_id}&maxResults={max_results}&order=date&key={self._api_key}') + url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}' + f'&maxResults={max_results}&order=date&type=video&key={self._api_key}') headers, info = self._request(url=url, request_timeout=request_timeout) return headers, SearchResult.from_dict(info) diff --git a/breadtube_bot/yt_objects.py b/breadtube_bot/youtube_objects.py similarity index 100% rename from breadtube_bot/yt_objects.py rename to breadtube_bot/youtube_objects.py diff --git a/breadtube_bot/youtube_subscription.py b/breadtube_bot/youtube_subscription.py index eba5af5..4553883 100644 --- a/breadtube_bot/youtube_subscription.py +++ b/breadtube_bot/youtube_subscription.py @@ -1,6 +1,6 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field -from .yt_objects import ChannelSnippet +from .youtube_objects import ChannelSnippet, SearchResultItem @dataclass @@ -9,6 +9,8 @@ class SubscriptionInfo: channel_id: str last_update: float channel_info: ChannelSnippet | None = None + shorts_list: list[SearchResultItem] = field(default_factory=list) + video_list: list[SearchResultItem] = field(default_factory=list) Subscriptions = dict[str, SubscriptionInfo]