From 693564bb044454c8fbaa8e7d008328b43221163e Mon Sep 17 00:00:00 2001 From: BreadTube Date: Tue, 30 Sep 2025 23:01:30 +0900 Subject: [PATCH] Refresh older sub first + save subscriptions --- breadtube_bot/bot.py | 185 +++++++++++++++----------- breadtube_bot/discord_manager.py | 3 + breadtube_bot/youtube_manager.py | 12 +- breadtube_bot/youtube_subscription.py | 13 ++ 4 files changed, 128 insertions(+), 85 deletions(-) diff --git a/breadtube_bot/bot.py b/breadtube_bot/bot.py index 782c2d0..10f0f4e 100644 --- a/breadtube_bot/bot.py +++ b/breadtube_bot/bot.py @@ -1,10 +1,11 @@ from __future__ import annotations from enum import Enum +import http.client +import json import logging import operator from pathlib import Path -import http.client import re import time import tomllib @@ -12,7 +13,7 @@ from typing import Any, TYPE_CHECKING import traceback from .config import Config -from .discord_manager import DiscordManager +from .discord_manager import ApiEncoder, DiscordManager from .logger import create_logger from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite, OverwriteType, Permissions, Role, TextChannel) @@ -24,19 +25,20 @@ if TYPE_CHECKING: class Bot: - DEFAULT_MESSAGE_LIST_LIMIT = 50 - DISCORD_NAME_REGEX = r'([^a-z])' - INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n' - 'You can upload a new one to update the configuration.') + DEFAULT_MESSAGE_LIST_LIMIT: int = 50 + DISCORD_NAME_REGEX: str = r'([^a-z])' + INIT_MESSAGE: str = ('Bot initialized.\nThis is the current configuration used.\n' + 'You can upload a new one to update the configuration.') MAX_DOWNLOAD_SIZE: int = 50_000 - SUBS_LIST_MIN_SIZE = 50 - SUBS_LIST_SHORTS_RATIO = 5 - SUBS_LIST_VIDEO_RATIO = 2 + SUBS_LIST_MIN_SIZE: int = 50 + SUBS_LIST_SHORTS_RATIO: int = 5 + SUBS_LIST_VIDEO_RATIO: int = 2 + SUBS_SAVE_PATH: Path = Path('/tmp/breadtube-bot_subs.json') class Task(Enum): DELETE_MESSAGES = 1 SCAN_BOT_CHANNEL = 2 - INIT_SUBS = 3 + REFRESH_SUBS = 3 @staticmethod def _get_code_version() -> str: @@ -90,12 +92,15 @@ class Bot: raise RuntimeError("Couldn't initialize bot channel/role/permission") self.bot_channel: TextChannel = bot_channel - self._yt_subscriptions: Subscriptions = {} + self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger) + self._yt_subscriptions: Subscriptions = { + name: SubscriptionInfo.from_dict(info) for name, info in json.loads( + self.SUBS_SAVE_PATH.read_text(encoding='utf-8')).items()} if self.SUBS_SAVE_PATH.exists() else {} self._scan_bot_channel() self.tasks.append(( self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) - - self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger) + self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) + self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None)) self.logger.info('Bot initialized') def init_bot_channel(self) -> TextChannel | None: @@ -140,6 +145,7 @@ class Bot: return messages def _scan_bot_channel(self): # noqa: PLR0915 + self.logger.info('Starting scanning bot channel') messages = self._get_all_channel_messages(self.bot_channel) init_message_found = False new_config: Config | None = None @@ -181,7 +187,8 @@ class Bot: SubscriptionHelper.update_subscriptions( new=subscriptions, previous=self._yt_subscriptions) self._yt_subscriptions = subscriptions - self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None)) + self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) + self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None)) except RuntimeError as error: self.logger.error('Invalid init subscriptions file: %s', error) has_error = True @@ -255,7 +262,8 @@ class Bot: self.logger.info('Loading subscriptions') SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions) self._yt_subscriptions = new_subscriptions - self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None)) + self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) + self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None)) # New init message is needed, previous need to be deleted if (new_config is not None or new_subscriptions is not None) and self.init_message is not None: @@ -283,6 +291,7 @@ class Bot: Bot.Task.DELETE_MESSAGES, time.time() + self.config.bot_message_duration, list(delayed_delete.values()))) + self.logger.info('Bot channel scanned') def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel: @@ -340,8 +349,72 @@ class Bot: def _video_message_content(video: SearchResultItem) -> str: return f'https://www.youtube.com/video/{video.id.videoId}' - def _init_subs(self): - self.logger.info('Initialize all subs') + def _refresh_sub(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], + category_ranges: list[tuple[int, int, ChannelCategory]]): + try: + sub_channel = self._get_subscription_channel(subscription, channel_dict, category_ranges) + except RuntimeError as error: + self.logger.error(error) + return + if subscription.channel_info is None: + _, channel_info = self.yt_manager.request_channel_info( + subscription.channel_id, request_timeout=self.config.request_timeout) + if not channel_info.items: + raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name) + subscription.channel_info = channel_info.items[0].snippet + + self._refresh_subscription(subscription) + + sub_init_message = f'https://www.youtube.com/{subscription.channel_info.customUrl}' + sub_messages = self._get_all_channel_messages(sub_channel) + if not sub_messages or sub_messages[-1].content != sub_init_message: + self.logger.debug('Clearing sub channel: %s', sub_channel.name) + for message in sub_messages: + self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) + _ = self.discord_manager.create_message( + sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout) + else: + messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count])) + yt_videos = list(reversed(subscription.video_list[:self.config.youtube_channel_video_count])) + immediate_delete: dict[int, Message] = { + m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]} + last_matching_index = 0 + stop_scan = False + for yt_video in yt_videos: + for index, message in enumerate(messages[last_matching_index:], start=last_matching_index): + if message.content != self._video_message_content(yt_video): + if last_matching_index != 0: + stop_scan = True + break + self.logger.debug('Unmatched video: %s', yt_video.id.videoId) + immediate_delete[message.id] = message + else: + self.logger.debug('Matched video: %s', yt_video.id.videoId) + last_matching_index = index + 1 + break + else: + self.logger.debug('All videos scanned') + break + if stop_scan: + break + for message in messages[last_matching_index:]: + immediate_delete[message.id] = message + + for message in immediate_delete.values(): + try: + self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) + except RuntimeError as error: + self.logger.error('Error deleting message %s from channel %s : %s', + message.id, sub_channel.name, error) + for video in yt_videos[last_matching_index:]: + _ = self.discord_manager.create_message( + sub_channel, {'content': self._video_message_content(video)}, + request_timeout=self.config.request_timeout) + + subscription.last_update = time.time() + + def _refresh_subs(self): + self.logger.info('Start refreshing subs') categories, text_channel = self.discord_manager.list_channels( self.guild_id, request_timeout=self.config.request_timeout) self.guild_text_channels = text_channel @@ -361,68 +434,14 @@ class Bot: category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category)) category_ranges = sorted(category_ranges, key=operator.itemgetter(0)) - for sub_info in self._yt_subscriptions.values(): + sorted_subs = sorted(self._yt_subscriptions.values(), key=lambda s: s.last_update) + for sub_info in sorted_subs: try: - sub_channel = self._get_subscription_channel(sub_info, channel_dict, category_ranges) - except RuntimeError as error: - self.logger.error(error) - continue - if sub_info.channel_info is None: - _, channel_info = self.yt_manager.request_channel_info( - sub_info.channel_id, request_timeout=self.config.request_timeout) - if not channel_info.items: - raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name) - sub_info.channel_info = channel_info.items[0].snippet - - self._refresh_subscription(sub_info) - - sub_init_message = f'https://www.youtube.com/{sub_info.channel_info.customUrl}' - sub_messages = self._get_all_channel_messages(sub_channel) - if not sub_messages or sub_messages[-1].content != sub_init_message: - self.logger.debug('Clearing sub channel: %s', sub_channel.name) - for message in sub_messages: - self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) - _ = self.discord_manager.create_message( - sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout) - else: - messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count])) - yt_videos = list(reversed(sub_info.video_list[:self.config.youtube_channel_video_count])) - immediate_delete: dict[int, Message] = { - m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]} - last_matching_index = 0 - stop_scan = False - for yt_video in yt_videos: - for index, message in enumerate(messages[last_matching_index:], start=last_matching_index): - if message.content != self._video_message_content(yt_video): - if last_matching_index != 0: - stop_scan = True - break - self.logger.debug('Unmatched video: %s', yt_video.id.videoId) - immediate_delete[message.id] = message - else: - self.logger.debug('Matched video: %s', yt_video.id.videoId) - last_matching_index = index + 1 - break - else: - self.logger.debug('All videos scanned') - break - if stop_scan: - break - for message in messages[last_matching_index:]: - immediate_delete[message.id] = message - - for message in immediate_delete.values(): - try: - self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout) - except RuntimeError as error: - self.logger.error('Error deleting message %s from channel %s : %s', - message.id, sub_channel.name, error) - for video in yt_videos[last_matching_index:]: - _ = self.discord_manager.create_message( - sub_channel, {'content': self._video_message_content(video)}, - request_timeout=self.config.request_timeout) - - sub_info.last_update = time.time() + self._refresh_sub(sub_info, channel_dict, category_ranges) + except TimeoutError as error: + self.logger.error('Timeout error refreshing subcription: %s', error) + break + self.logger.info('Subs refreshed') def run(self): while self.tasks: @@ -455,13 +474,17 @@ class Bot: except Exception as error: self.logger.error('Error scanning bot channel: %s -> %s', error, traceback.format_exc().replace('\n', ' | ')) + self.tasks = list(filter(lambda t: t[0] != Bot.Task.SCAN_BOT_CHANNEL, self.tasks)) self.tasks.append(( self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) - case Bot.Task.INIT_SUBS: + case Bot.Task.REFRESH_SUBS: try: - self._init_subs() + self._refresh_subs() except Exception as error: self.logger.error('Error initializing subscriptions : %s -> %s', error, traceback.format_exc().replace('\n', ' | ')) + self.SUBS_SAVE_PATH.write_text( + json.dumps(self._yt_subscriptions, cls=ApiEncoder, ensure_ascii=False), encoding='utf-8') + self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) self.tasks.append(( - self.Task.INIT_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None)) + self.Task.REFRESH_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None)) diff --git a/breadtube_bot/discord_manager.py b/breadtube_bot/discord_manager.py index f3d0686..f597e99 100644 --- a/breadtube_bot/discord_manager.py +++ b/breadtube_bot/discord_manager.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import asdict, dataclass, is_dataclass +from datetime import datetime from enum import Enum import json import random @@ -22,6 +23,8 @@ class ApiEncoder(json.JSONEncoder): return asdict(o) # type: ignore if isinstance(o, Enum): return o.value + if isinstance(o, datetime): + return o.isoformat() return super().default(o) diff --git a/breadtube_bot/youtube_manager.py b/breadtube_bot/youtube_manager.py index bb0e95c..05094f4 100644 --- a/breadtube_bot/youtube_manager.py +++ b/breadtube_bot/youtube_manager.py @@ -17,7 +17,7 @@ if TYPE_CHECKING: class YoutubeManager: - DEFAULT_DAILY_REQUESTS = 10_000 + DEFAULT_DAILY_POINTS = 10_000 SHORTS_CHECK_STATUS = 303 @dataclass @@ -28,12 +28,12 @@ class YoutubeManager: def __init__(self, api_key: str, logger: logging.Logger): self._api_key = api_key self._logger = logger - self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_REQUESTS, next_reset=time.time() + 24 * 3600) + self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_POINTS, next_reset=time.time() + 24 * 3600) def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]: if time.time() >= self.rate_limit.next_reset: self.rate_limit.next_reset = time.time() + 24 * 3600 - self.rate_limit.remaining = self.DEFAULT_DAILY_REQUESTS + self.rate_limit.remaining = self.DEFAULT_DAILY_POINTS elif self.rate_limit.remaining <= 0: sleep_time = time.time() - self.rate_limit.next_reset self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time) @@ -60,7 +60,9 @@ class YoutubeManager: def is_shorts(self, connection: http.client.HTTPConnection, video_id: str) -> bool: try: - connection.request('GET', f'/shorts/{video_id}') + endpoint = f'/shorts/{video_id}' + self._logger.debug('YoutubeManager: Checking for shorts: %s', endpoint) + connection.request('GET', endpoint) response = connection.getresponse() response.read() return response.status != self.SHORTS_CHECK_STATUS @@ -71,6 +73,7 @@ class YoutubeManager: HTTPHeaders, ChannelResult]: url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet' f'&id={channel_id}&key={self._api_key}') + self._logger.debug('YoutubeManager: request channel info for channel %s', channel_id) headers, info = self._request(url=url, request_timeout=request_timeout) return headers, ChannelResult.from_dict(info) @@ -78,5 +81,6 @@ class YoutubeManager: HTTPHeaders, SearchResult]: url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}' f'&maxResults={max_results}&order=date&type=video&key={self._api_key}') + self._logger.debug('YoutubeManager: request channel videos for channel %s', channel_id) headers, info = self._request(url=url, request_timeout=request_timeout) return headers, SearchResult.from_dict(info) diff --git a/breadtube_bot/youtube_subscription.py b/breadtube_bot/youtube_subscription.py index 4553883..eac730d 100644 --- a/breadtube_bot/youtube_subscription.py +++ b/breadtube_bot/youtube_subscription.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass, field from .youtube_objects import ChannelSnippet, SearchResultItem @@ -12,6 +14,17 @@ class SubscriptionInfo: shorts_list: list[SearchResultItem] = field(default_factory=list) video_list: list[SearchResultItem] = field(default_factory=list) + @staticmethod + def from_dict(info: dict) -> SubscriptionInfo: + channel_info: dict | None = info.get('channel_info') + return SubscriptionInfo( + name=info['name'], + channel_id=info['channel_id'], + last_update=info['last_update'], + channel_info=ChannelSnippet.from_dict(channel_info) if channel_info is not None else None, + shorts_list=[SearchResultItem.from_dict(s) for s in info['shorts_list']], + video_list=[SearchResultItem.from_dict(s) for s in info['video_list']]) + Subscriptions = dict[str, SubscriptionInfo]