From b80e4f774573bc0076fc1bfcc11c6d26b5315def Mon Sep 17 00:00:00 2001 From: BreadTube Date: Fri, 26 Sep 2025 23:41:51 +0900 Subject: [PATCH] Youtube Subscription Implementation (WIP) --- breadtube_bot/config.py | 7 +- breadtube_bot/manager.py | 333 ++++++++++++++++++-------- breadtube_bot/youtube_manager.py | 6 +- breadtube_bot/youtube_subscription.py | 50 ++++ breadtube_bot/yt_objects.py | 66 +++-- pyproject.toml | 4 +- 6 files changed, 333 insertions(+), 133 deletions(-) create mode 100644 breadtube_bot/youtube_subscription.py diff --git a/breadtube_bot/config.py b/breadtube_bot/config.py index 9c4256d..692061f 100644 --- a/breadtube_bot/config.py +++ b/breadtube_bot/config.py @@ -11,6 +11,7 @@ class Config: bot_channel_init_retries: int = 3 bot_message_duration: float = 150. request_timeout: float = 3. + unmanaged_categories: str = '' youtube_channel_refresh_interval: float = 3600 youtube_channel_video_count: int = 10 @@ -28,12 +29,12 @@ class Config: if lines[0] != 'config': raise RuntimeError('Cannot load config: first line is not "config"') config_dict = {} - for line_number, line in enumerate(lines[1:]): + for line_number, line in enumerate(lines[1:], start=1): key, value = line.split('=', maxsplit=1) if key not in annotations: - raise RuntimeError(f'Invalid config: invalid key {key} at line {line_number + 1}') + raise RuntimeError(f'Invalid config: invalid key {key} at line {line_number}') if key in config_dict: - raise RuntimeError(f'Invalid config: duplicated key {key} at line {line_number + 1}') + raise RuntimeError(f'Invalid config: duplicated key {key} at line {line_number}') config_dict[key] = value for key, value in config_dict.items(): diff --git a/breadtube_bot/manager.py b/breadtube_bot/manager.py index cb396ca..3f8c26b 100644 --- a/breadtube_bot/manager.py +++ b/breadtube_bot/manager.py @@ -7,6 +7,7 @@ import operator from pathlib import Path import json import random +import re import time import tomllib from typing import Any @@ -14,7 +15,6 @@ import urllib.error import urllib.request import traceback -from breadtube_bot.youtube_api import YoutubeManager from .api import Api, ApiAction, ApiVersion from .config import Config @@ -22,6 +22,8 @@ from .logger import create_logger from .objects import ( Attachment, ChannelCategory, ChannelType, FileMime, HTTPHeaders, Message, MessageReference, MessageReferenceType, Overwrite, OverwriteType, Permissions, Role, TextChannel, User) +from .youtube_manager import YoutubeManager +from .youtube_subscription import SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, Subscriptions class ApiEncoder(json.JSONEncoder): @@ -34,25 +36,21 @@ class ApiEncoder(json.JSONEncoder): class DiscordManager: - MAX_CONFIG_SIZE: int = 50_000 DEFAULT_MESSAGE_LIST_LIMIT = 50 INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n' 'You can upload a new one to update the configuration.') + MAX_DOWNLOAD_SIZE: int = 50_000 + MIN_API_VERSION = 9 @dataclass class RateLimit: remaining: int next_reset: float - @dataclass - class YoutTubeChannel: - name: str - channel_id: str - last_update: float - class Task(Enum): - SCAN_BOT_CHANNEL = 1 - DELETE_MESSAGES = 2 + DELETE_MESSAGES = 1 + SCAN_BOT_CHANNEL = 2 + INIT_SUBS = 3 @staticmethod def _get_code_version() -> str: @@ -75,19 +73,37 @@ class DiscordManager: self.logger.info('Retrieving bot user') self.bot_user = self.get_current_user() self.logger.info('Retrieving guild roles before init') - self.guild_roles: list = self.list_roles() - self.bot_channel: TextChannel | None = None + self.guild_roles: list[Role] = self.list_roles() + bot_role: Role | None = None + everyone_role: Role | None = None + for role in self.guild_roles: + if role.name == self.config.bot_role: + bot_role = role + elif role.name == '@everyone': + everyone_role = role + if bot_role is None: + raise RuntimeError('No BreadTube role found') + if everyone_role is None: + raise RuntimeError('No everyone role found') + self.bot_role: Role = bot_role + self.everyone_role: Role = everyone_role + + categories, text_channel = self.list_channels() + self.guild_text_channels: list[TextChannel] = text_channel + self.guild_categories: list[ChannelCategory] = categories self.init_message: Message | None = None + bot_channel: TextChannel | None = None for _ in range(self.config.bot_channel_init_retries): - while not self.init_bot_channel(): - time.sleep(10) - break - else: - self.logger.info('Bot init OK') + bot_channel = self.init_bot_channel() + if bot_channel is not None: break + time.sleep(5) + if bot_channel is None: raise RuntimeError("Couldn't initialize bot channel/role/permission") + self.bot_channel: TextChannel = bot_channel + self._yt_subscriptions: Subscriptions = {} self._scan_bot_channel() self.tasks.append(( self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) @@ -112,13 +128,12 @@ class DiscordManager: def _send_request(self, api_action: ApiAction, endpoint: str, api_version: ApiVersion = ApiVersion.V10, data: bytes | None = None, upload_files: list[tuple[str, FileMime, bytes]] | None = None, expected_code: int = 200) -> tuple[HTTPHeaders, dict | list | None]: - min_api_version = 9 - - if api_version.value < min_api_version: + if api_version.value < self.MIN_API_VERSION: self.logger.warning( 'Warning: using deprecated API version %d (minimum non deprecated is %d)', - api_version, min_api_version) + api_version, self.MIN_API_VERSION) url = f'https://discord.com/api/v{api_version.value}{endpoint}' + self.logger.debug('Discord API Request: %s %s', api_action.value, url) boundary: str = '' if upload_files: @@ -174,51 +189,30 @@ class DiscordManager: except urllib.error.URLError as error: raise RuntimeError(f'URL error downloading attachment ({attachment}): {error}') from error - def init_bot_channel(self) -> bool: - _, text_channel = self.list_channels() - breadtube_role: Role | None = None - everyone_role: Role | None = None - for role in self.guild_roles: - if role.name == self.config.bot_role: - breadtube_role = role - elif role.name == '@everyone': - everyone_role = role - if breadtube_role is None: - self.logger.info('No BreadTube role found') - return False - if everyone_role is None: - self.logger.info('No everyone role found') - return False - - for channel in text_channel: + def init_bot_channel(self) -> TextChannel | None: + for channel in self.guild_text_channels: if channel.name == self.config.bot_channel: - self.bot_channel = channel self.logger.info('Found breadtube bot channel') - for perm in self.bot_channel.permission_overwrites: - if perm.id == breadtube_role.id: + for perm in channel.permission_overwrites: + if perm.id == self.bot_role.id: if not perm.allow | Permissions.VIEW_CHANNEL: self.logger.info('BreadTube bot cannot view BreadTube channel: permission missing') - return False + return None self.logger.info('BreadTube channel permission OK') break - break - else: - self.bot_channel = self.create_text_channel({ - 'name': self.config.bot_channel, - 'permission_overwrites': [ - Overwrite(everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, - deny=Permissions.VIEW_CHANNEL), - Overwrite(breadtube_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL, - deny=Permissions.NONE)] - }) - self.logger.info('Created breadtube bot channel') - return True + return channel - def _scan_bot_channel(self): - if self.bot_channel is None: - self.logger.error('Cannot scan bot channel: bot channel is None') - return [] + self.logger.info('Creating breadtube bot channel') + return self.create_text_channel({ + 'name': self.config.bot_channel, + 'permission_overwrites': [ + Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, + deny=Permissions.VIEW_CHANNEL), + Overwrite(self.bot_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL, + deny=Permissions.NONE)] + }) + def _get_bot_channel_messages(self) -> list[Message]: messages_id_delete_task: set[int] = set() for task_type, _, task_params in self.tasks: if task_type == self.Task.DELETE_MESSAGES: @@ -232,62 +226,138 @@ class DiscordManager: if len(message_batch) < self.DEFAULT_MESSAGE_LIST_LIMIT: break last_message_id = message_batch[-1].id + return messages - self.init_message = None + def _scan_bot_channel(self): # noqa: PLR0915 + messages = self._get_bot_channel_messages() + init_message_found = False new_config: Config | None = None + new_subscriptions: Subscriptions | None = None delayed_delete: dict[int, Message] = {} immediate_delete: dict[int, Message] = {} for message in messages: - if message.id in delayed_delete: - self.logger.debug('Skipping message already marked to be deleted') + if init_message_found: + self.logger.debug('Marking message for immediate deletion (init found): %s', message) + immediate_delete[message.id] = message continue - if self.init_message is None and new_config is None and len(message.attachments) == 1: - attachment = message.attachments[0] - if attachment.size < self.MAX_CONFIG_SIZE: + if len(message.attachments) <= 0: + self.logger.debug('Marking message for immediate deletion (no attachment): %s', message) + immediate_delete[message.id] = message + continue + + if message.author.id == self.bot_user.id: + self.logger.debug('Found init message') + # If same init message: nothing to do + if self.init_message is not None and message.id == self.init_message.id: + continue + # Loading init message content + has_error = False + for attachment in message.attachments: try: _, content = self._download_attachment(attachment) - if content.startswith(b'config'): + if new_config is None and content.startswith(b'config'): try: - config = Config.from_str(content.decode()) - if message.author.id == self.bot_user.id: # keep using current config - self.logger.debug('Found previous init message') - self.init_message = message - if config != self.config: # First scan qill need to load config - self.config = config - continue - if config != self.config: # New config to update to - new_config = config - self.logger.debug('Marking new config message for immediate deletion: %s', message) - immediate_delete[message.id] = message - continue + self.config = Config.from_str(content.decode()) except RuntimeError as error: - self.logger.info('Invalid config file: %s', error) - bot_message = self.create_message(self.bot_channel, { - 'content': str(error), - 'message_reference': MessageReference( - type=MessageReferenceType.DEFAULT, - message_id=message.id, - channel_id=self.bot_channel.id, - guild_id=None, - fail_if_not_exists=None)}) - delayed_delete[bot_message.id] = bot_message - delayed_delete[message.id] = message - continue + self.logger.error('Cannot load config from init message: %s', error) + has_error = True + if new_subscriptions is None and content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]): + try: + subscriptions = SubscriptionHelper.read_text(content) + if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()): + SubscriptionHelper.update_subscriptions( + new=subscriptions, previous=self._yt_subscriptions) + self._yt_subscriptions = subscriptions + self.tasks.append((DiscordManager.Task.INIT_SUBS, time.time() + 1, None)) + except RuntimeError as error: + self.logger.error('Invalid init subscriptions file: %s', error) + has_error = True except Exception as error: self.logger.error('Error downloading attachment: %s', error) - self.logger.debug('Marking message for immediate deletion: %s', message) - immediate_delete[message.id] = message + has_error = True + if not has_error: + self.init_message = message + init_message_found = True + continue + + self.logger.debug('Reading attachment') + attachment = message.attachments[0] + if attachment.size > self.MAX_DOWNLOAD_SIZE: + self.logger.debug('Marking message for immediate deletion (attachment too big): %s', message) + immediate_delete[message.id] = message + continue + try: + _, content = self._download_attachment(attachment) + if content.startswith(b'config') and new_config is None: + try: + config = Config.from_str(content.decode()) + if config != self.config: # New config to update to + new_config = config + self.logger.debug('Marking new config message for immediate deletion: %s', message) + immediate_delete[message.id] = message + continue + except RuntimeError as error: + self.logger.info('Invalid config file: %s', error) + bot_message = self.create_message(self.bot_channel, { + 'content': str(error), + 'message_reference': MessageReference( + type=MessageReferenceType.DEFAULT, + message_id=message.id, + channel_id=self.bot_channel.id, + guild_id=None, + fail_if_not_exists=None)}) + delayed_delete[bot_message.id] = bot_message + delayed_delete[message.id] = message + continue + if content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]): + try: + subscriptions = SubscriptionHelper.read_text(content) + if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()): + new_subscriptions = subscriptions + self.logger.debug('Marking new subscriptions message for immediate deletion: %s', message) + immediate_delete[message.id] = message + continue + except RuntimeError as error: + self.logger.info('Invalid subscriptions file: %s', error) + bot_message = self.create_message(self.bot_channel, { + 'content': str(error), + 'message_reference': MessageReference( + type=MessageReferenceType.DEFAULT, + message_id=message.id, + channel_id=self.bot_channel.id, + guild_id=None, + fail_if_not_exists=None)}) + delayed_delete[bot_message.id] = bot_message + delayed_delete[message.id] = message + continue + except Exception as error: + self.logger.error('Error downloading attachment: %s', error) if new_config is not None: self.logger.info('Loading config: %s', new_config) self.config = new_config + if new_subscriptions is not None: + self.logger.info('Loading subscriptions: %s', new_subscriptions) + SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions) + self._yt_subscriptions = new_subscriptions + self.tasks.append((DiscordManager.Task.INIT_SUBS, time.time() + 1, None)) + + # New init message is needed, previous need to be deleted + if (new_config is not None or new_subscriptions is not None) and self.init_message is not None: + immediate_delete[self.init_message.id] = self.init_message + self.init_message = None + + # Init message absent or deleted if self.init_message is None: assert self.config is not None self.init_message = self.create_message( self.bot_channel, {'content': self.INIT_MESSAGE}, - upload_files=[('config.txt', FileMime.TEXT_PLAIN, self.config.to_str().encode())]) + upload_files=[ + ('config.txt', FileMime.TEXT_PLAIN, self.config.to_str().encode()), + ('subscriptions.csv', FileMime.TEXT_CSV, SubscriptionHelper.generate_text(self._yt_subscriptions)) + ]) for message in immediate_delete.values(): try: @@ -301,6 +371,60 @@ class DiscordManager: time.time() + self.config.bot_message_duration, list(delayed_delete.values()))) + def _init_subs(self): + categories, text_channel = self.list_channels() + self.guild_text_channels = text_channel + self.guild_categories = categories + + channel_dict: dict[str, TextChannel] = {c.name or '': c for c in self.guild_text_channels} + unmanaged_categories: set[str] = set(self.config.unmanaged_categories.split(',')) + category_ranges: list[tuple[int, int, ChannelCategory]] = [] + for category in self.guild_categories: + if category.name in unmanaged_categories: + self.logger.debug('Skipping unmanaged category: %s', category.name) + continue + range_info = (category.name or '').split('-') + if len(range_info) != 2: # noqa: PLR2004 + self.logger.warning('Cannot compute range for category: %s', category.name) + continue + category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category)) + category_ranges = sorted(category_ranges, key=operator.itemgetter(0)) + + name_regex = r'([^a-z])' + for sub_info in self._yt_subscriptions.values(): + discord_name = sub_info.name.lower() + discord_name = re.sub(name_regex, '-', discord_name) + category_value = ord(discord_name[0]) + sub_channel: TextChannel | None = channel_dict.get(discord_name) + if sub_channel is None: + selected_category: ChannelCategory | None = None + for start_range, stop_range, category in category_ranges: + if start_range <= category_value <= stop_range: + selected_category = category + break + if selected_category is None: + selected_category = category_ranges[-1][2] + sub_channel = self.create_text_channel({ + 'name': discord_name, + 'parent_id': selected_category.id, + 'permission_overwrites': [ + Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, + deny=Permissions.SEND_MESSAGES), + Overwrite(self.bot_role.id, OverwriteType.ROLE, + allow=Permissions.VIEW_CHANNEL | Permissions.SEND_MESSAGES, + deny=Permissions.NONE)] + }) + if sub_info.channel_info is None: + _, channel_info = self.yt_manager.request_channel_info( + sub_info.channel_id, request_timeout=self.config.request_timeout) + if not channel_info.items: + self.logger.error('No channel info return from YouTube API for channel: %s', discord_name) + continue + sub_info.channel_info = channel_info.items[0].snippet + channel_url = f'https://www.youtube.com/{sub_info.channel_info.customUrl}' + _ = self.create_message(sub_channel, {'content': channel_url}) + sub_info.last_update = time.time() + def run(self): while True: if self.tasks: @@ -312,14 +436,6 @@ class DiscordManager: if sleep_time > 0: time.sleep(sleep_time) match task_type: - case DiscordManager.Task.SCAN_BOT_CHANNEL: - try: - self._scan_bot_channel() - except Exception as error: - self.logger.error('Error scanning bot channel: %s -> %s', - error, traceback.format_exc().replace('\n', ' | ')) - self.tasks.append(( - self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) case DiscordManager.Task.DELETE_MESSAGES: if not isinstance(task_params, list): self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params) @@ -334,6 +450,20 @@ class DiscordManager: except Exception as error: self.logger.error('Error deleting message %s: %s -> %s', message, error, traceback.format_exc().replace('\n', ' | ')) + case DiscordManager.Task.SCAN_BOT_CHANNEL: + try: + self._scan_bot_channel() + except Exception as error: + self.logger.error('Error scanning bot channel: %s -> %s', + error, traceback.format_exc().replace('\n', ' | ')) + self.tasks.append(( + self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) + case DiscordManager.Task.INIT_SUBS: + try: + self._init_subs() + except Exception as error: + self.logger.error('Error initializing subscriptions : %s -> %s', + error, traceback.format_exc().replace('\n', ' | ')) time.sleep(1) def create_text_channel(self, params: Api.Guild.CreateTextChannelParams) -> TextChannel: @@ -394,8 +524,7 @@ class DiscordManager: params['before'] = before_id if after_id is not None: params['after'] = after_id - headers, messages_info = self._send_request( + _, messages_info = self._send_request( *Api.Message.list_by_channel(channel.id), data=json.dumps(params, cls=ApiEncoder).encode() if params else None) - self._update_rate_limit(headers) return [Message.from_dict(m) for m in messages_info or []] diff --git a/breadtube_bot/youtube_manager.py b/breadtube_bot/youtube_manager.py index 0f90403..9269ea4 100644 --- a/breadtube_bot/youtube_manager.py +++ b/breadtube_bot/youtube_manager.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING import urllib.error import urllib.request -from breadtube_bot.yt_objects import SearchResult +from .yt_objects import ChannelResult, SearchResult if TYPE_CHECKING: @@ -57,11 +57,11 @@ class YoutubeManager: raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error def request_channel_info(self, channel_id: str, request_timeout: float) -> tuple[ - HTTPHeaders, dict]: + HTTPHeaders, ChannelResult]: url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet' f'&id={channel_id}&key={self._api_key}') headers, info = self._request(url=url, request_timeout=request_timeout) - return headers, info + return headers, ChannelResult.from_dict(info) def request_channel_videos(self, channel_id: str, max_results: int, request_timeout: float) -> tuple[ HTTPHeaders, SearchResult]: diff --git a/breadtube_bot/youtube_subscription.py b/breadtube_bot/youtube_subscription.py new file mode 100644 index 0000000..eba5af5 --- /dev/null +++ b/breadtube_bot/youtube_subscription.py @@ -0,0 +1,50 @@ +from dataclasses import dataclass + +from .yt_objects import ChannelSnippet + + +@dataclass +class SubscriptionInfo: + name: str + channel_id: str + last_update: float + channel_info: ChannelSnippet | None = None + + +Subscriptions = dict[str, SubscriptionInfo] + + +SUBSCRIPTION_FILE_COLUMNS: list[bytes] = [b'Channel Name', b'Channel ID'] + + +class SubscriptionHelper: + @staticmethod + def read_text(content: bytes) -> Subscriptions: + content_lines = content.strip().splitlines() + if content_lines[0] != b';'.join(SUBSCRIPTION_FILE_COLUMNS): + raise RuntimeError(f"Unexpected header: {content_lines[0]} != {b','.join(SUBSCRIPTION_FILE_COLUMNS)}") + + subscriptions: Subscriptions = {} + for line_number, line in enumerate(content_lines[1:], start=1): + values = line.split(b';') + if len(values) != len(SUBSCRIPTION_FILE_COLUMNS): + raise RuntimeError(f'Unexpected value count (columns) at line {line_number}' + f', got {len(values)} but expected {len(SUBSCRIPTION_FILE_COLUMNS)}') + name = values[0].decode() + channel_id = values[1].decode() + subscriptions[channel_id] = SubscriptionInfo(name=name, channel_id=channel_id, last_update=0) + return subscriptions + + @staticmethod + def update_subscriptions(new: Subscriptions, previous: Subscriptions): + for channel_id, previous_info in previous.items(): + if channel_id in new: + new[channel_id].last_update = previous_info.last_update + new[channel_id].channel_info = previous_info.channel_info + + @staticmethod + def generate_text(subscriptions: Subscriptions) -> bytes: + content_lines: list[bytes] = [b';'.join(SUBSCRIPTION_FILE_COLUMNS)] + content_lines.extend([ + f"{sub_info.name.replace(';', ',')};{sub_info.channel_id}".encode() for sub_info in subscriptions.values()]) + return b'\n'.join(content_lines) diff --git a/breadtube_bot/yt_objects.py b/breadtube_bot/yt_objects.py index 54d74de..a70c593 100644 --- a/breadtube_bot/yt_objects.py +++ b/breadtube_bot/yt_objects.py @@ -33,16 +33,6 @@ class PageInfo(_Api): return PageInfo(totalResults=info['totalResults'], resultsPerPage=info['resultsPerPage']) -@dataclass -class SearchResultId(_Api): - kind: str - videoId: str - - @staticmethod - def from_dict(info: dict) -> SearchResultId: - return SearchResultId(kind=info['kind'], videoId=info['videoId']) - - @dataclass class ThumbnailInfo(_Api): url: str @@ -72,7 +62,7 @@ class Thumbnails(_Api): class Result(Generic[T_api]): kind: str etag: str - nextPageToken: str + nextPageToken: str | None pageInfo: PageInfo items: list[T_api] @@ -82,7 +72,7 @@ class Result(Generic[T_api]): return cls( kind=info['kind'], etag=info['etag'], - nextPageToken=info['nextPageToken'], + nextPageToken=info.get('nextPageToken'), pageInfo=PageInfo.from_dict(info['pageInfo']), items=[item_type.from_dict(i) for i in info['items']]) @@ -90,20 +80,40 @@ class Result(Generic[T_api]): # Channel Objects +@dataclass +class ChannelSnippet(_Api): + title: str + description: str + customUrl: str + publishedAt: datetime + thumbnails: Thumbnails + country: str + + @staticmethod + def from_dict(info: dict) -> ChannelSnippet: + return ChannelSnippet( + title=info['title'], + description=info['description'], + customUrl=info['customUrl'], + publishedAt=datetime.fromisoformat(info['publishedAt']), + thumbnails=Thumbnails.from_dict(info['thumbnails']), + country=info['country']) + + @dataclass class ChannelResultItem(_Api): kind: str etag: str - id: SearchResultId - snippet: SearchResultSnippet + id: str + snippet: ChannelSnippet @staticmethod - def from_dict(info: dict) -> SearchResultItem: - return SearchResultItem( + def from_dict(info: dict) -> ChannelResultItem: + return ChannelResultItem( kind=info['kind'], etag=info['etag'], - id=SearchResultId.from_dict(info['id']), - snippet=SearchResultSnippet.from_dict(info['snippet'])) + id=info['id'], + snippet=ChannelSnippet.from_dict(info['snippet'])) class ChannelResult(Result[ChannelResultItem]): @@ -114,7 +124,17 @@ class ChannelResult(Result[ChannelResultItem]): @dataclass -class SearchResultSnippet(_Api): +class SearchResultId(_Api): + kind: str + videoId: str + + @staticmethod + def from_dict(info: dict) -> SearchResultId: + return SearchResultId(kind=info['kind'], videoId=info['videoId']) + + +@dataclass +class SearchSnippet(_Api): publishedAt: datetime channelId: str title: str @@ -125,8 +145,8 @@ class SearchResultSnippet(_Api): publishTime: datetime @staticmethod - def from_dict(info: dict) -> SearchResultSnippet: - return SearchResultSnippet( + def from_dict(info: dict) -> SearchSnippet: + return SearchSnippet( publishedAt=datetime.fromisoformat(info['publishedAt']), channelId=info['channelId'], title=info['title'], @@ -142,7 +162,7 @@ class SearchResultItem(_Api): kind: str etag: str id: SearchResultId - snippet: SearchResultSnippet + snippet: SearchSnippet @staticmethod def from_dict(info: dict) -> SearchResultItem: @@ -150,7 +170,7 @@ class SearchResultItem(_Api): kind=info['kind'], etag=info['etag'], id=SearchResultId.from_dict(info['id']), - snippet=SearchResultSnippet.from_dict(info['snippet'])) + snippet=SearchSnippet.from_dict(info['snippet'])) class SearchResult(Result[SearchResultItem]): diff --git a/pyproject.toml b/pyproject.toml index 82e0ef0..b863de2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ inline-quotes = "single" [tool.ruff.lint.pylint] max-args=16 -max-branches=24 +max-branches=32 max-locals=16 max-nested-blocks=8 max-public-methods=16 @@ -55,7 +55,7 @@ max-returns=8 max-statements=96 [tool.ruff.lint.mccabe] -max-complexity = 24 +max-complexity = 32 [tool.coverage.run] command_line = "-m pytest"