Youtube Subscription Implementation (WIP)

This commit is contained in:
BreadTube 2025-09-26 23:41:51 +09:00 committed by Corentin
commit b80e4f7745
6 changed files with 333 additions and 133 deletions

View file

@ -11,6 +11,7 @@ class Config:
bot_channel_init_retries: int = 3 bot_channel_init_retries: int = 3
bot_message_duration: float = 150. bot_message_duration: float = 150.
request_timeout: float = 3. request_timeout: float = 3.
unmanaged_categories: str = ''
youtube_channel_refresh_interval: float = 3600 youtube_channel_refresh_interval: float = 3600
youtube_channel_video_count: int = 10 youtube_channel_video_count: int = 10
@ -28,12 +29,12 @@ class Config:
if lines[0] != 'config': if lines[0] != 'config':
raise RuntimeError('Cannot load config: first line is not "config"') raise RuntimeError('Cannot load config: first line is not "config"')
config_dict = {} config_dict = {}
for line_number, line in enumerate(lines[1:]): for line_number, line in enumerate(lines[1:], start=1):
key, value = line.split('=', maxsplit=1) key, value = line.split('=', maxsplit=1)
if key not in annotations: if key not in annotations:
raise RuntimeError(f'Invalid config: invalid key {key} at line {line_number + 1}') raise RuntimeError(f'Invalid config: invalid key {key} at line {line_number}')
if key in config_dict: if key in config_dict:
raise RuntimeError(f'Invalid config: duplicated key {key} at line {line_number + 1}') raise RuntimeError(f'Invalid config: duplicated key {key} at line {line_number}')
config_dict[key] = value config_dict[key] = value
for key, value in config_dict.items(): for key, value in config_dict.items():

View file

@ -7,6 +7,7 @@ import operator
from pathlib import Path from pathlib import Path
import json import json
import random import random
import re
import time import time
import tomllib import tomllib
from typing import Any from typing import Any
@ -14,7 +15,6 @@ import urllib.error
import urllib.request import urllib.request
import traceback import traceback
from breadtube_bot.youtube_api import YoutubeManager
from .api import Api, ApiAction, ApiVersion from .api import Api, ApiAction, ApiVersion
from .config import Config from .config import Config
@ -22,6 +22,8 @@ from .logger import create_logger
from .objects import ( from .objects import (
Attachment, ChannelCategory, ChannelType, FileMime, HTTPHeaders, Message, MessageReference, MessageReferenceType, Attachment, ChannelCategory, ChannelType, FileMime, HTTPHeaders, Message, MessageReference, MessageReferenceType,
Overwrite, OverwriteType, Permissions, Role, TextChannel, User) Overwrite, OverwriteType, Permissions, Role, TextChannel, User)
from .youtube_manager import YoutubeManager
from .youtube_subscription import SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, Subscriptions
class ApiEncoder(json.JSONEncoder): class ApiEncoder(json.JSONEncoder):
@ -34,25 +36,21 @@ class ApiEncoder(json.JSONEncoder):
class DiscordManager: class DiscordManager:
MAX_CONFIG_SIZE: int = 50_000
DEFAULT_MESSAGE_LIST_LIMIT = 50 DEFAULT_MESSAGE_LIST_LIMIT = 50
INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n' INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n'
'You can upload a new one to update the configuration.') 'You can upload a new one to update the configuration.')
MAX_DOWNLOAD_SIZE: int = 50_000
MIN_API_VERSION = 9
@dataclass @dataclass
class RateLimit: class RateLimit:
remaining: int remaining: int
next_reset: float next_reset: float
@dataclass
class YoutTubeChannel:
name: str
channel_id: str
last_update: float
class Task(Enum): class Task(Enum):
SCAN_BOT_CHANNEL = 1 DELETE_MESSAGES = 1
DELETE_MESSAGES = 2 SCAN_BOT_CHANNEL = 2
INIT_SUBS = 3
@staticmethod @staticmethod
def _get_code_version() -> str: def _get_code_version() -> str:
@ -75,19 +73,37 @@ class DiscordManager:
self.logger.info('Retrieving bot user') self.logger.info('Retrieving bot user')
self.bot_user = self.get_current_user() self.bot_user = self.get_current_user()
self.logger.info('Retrieving guild roles before init') self.logger.info('Retrieving guild roles before init')
self.guild_roles: list = self.list_roles() self.guild_roles: list[Role] = self.list_roles()
self.bot_channel: TextChannel | None = None bot_role: Role | None = None
everyone_role: Role | None = None
for role in self.guild_roles:
if role.name == self.config.bot_role:
bot_role = role
elif role.name == '@everyone':
everyone_role = role
if bot_role is None:
raise RuntimeError('No BreadTube role found')
if everyone_role is None:
raise RuntimeError('No everyone role found')
self.bot_role: Role = bot_role
self.everyone_role: Role = everyone_role
categories, text_channel = self.list_channels()
self.guild_text_channels: list[TextChannel] = text_channel
self.guild_categories: list[ChannelCategory] = categories
self.init_message: Message | None = None self.init_message: Message | None = None
bot_channel: TextChannel | None = None
for _ in range(self.config.bot_channel_init_retries): for _ in range(self.config.bot_channel_init_retries):
while not self.init_bot_channel(): bot_channel = self.init_bot_channel()
time.sleep(10) if bot_channel is not None:
break
else:
self.logger.info('Bot init OK')
break break
time.sleep(5)
if bot_channel is None:
raise RuntimeError("Couldn't initialize bot channel/role/permission") raise RuntimeError("Couldn't initialize bot channel/role/permission")
self.bot_channel: TextChannel = bot_channel
self._yt_subscriptions: Subscriptions = {}
self._scan_bot_channel() self._scan_bot_channel()
self.tasks.append(( self.tasks.append((
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
@ -112,13 +128,12 @@ class DiscordManager:
def _send_request(self, api_action: ApiAction, endpoint: str, api_version: ApiVersion = ApiVersion.V10, def _send_request(self, api_action: ApiAction, endpoint: str, api_version: ApiVersion = ApiVersion.V10,
data: bytes | None = None, upload_files: list[tuple[str, FileMime, bytes]] | None = None, data: bytes | None = None, upload_files: list[tuple[str, FileMime, bytes]] | None = None,
expected_code: int = 200) -> tuple[HTTPHeaders, dict | list | None]: expected_code: int = 200) -> tuple[HTTPHeaders, dict | list | None]:
min_api_version = 9 if api_version.value < self.MIN_API_VERSION:
if api_version.value < min_api_version:
self.logger.warning( self.logger.warning(
'Warning: using deprecated API version %d (minimum non deprecated is %d)', 'Warning: using deprecated API version %d (minimum non deprecated is %d)',
api_version, min_api_version) api_version, self.MIN_API_VERSION)
url = f'https://discord.com/api/v{api_version.value}{endpoint}' url = f'https://discord.com/api/v{api_version.value}{endpoint}'
self.logger.debug('Discord API Request: %s %s', api_action.value, url)
boundary: str = '' boundary: str = ''
if upload_files: if upload_files:
@ -174,51 +189,30 @@ class DiscordManager:
except urllib.error.URLError as error: except urllib.error.URLError as error:
raise RuntimeError(f'URL error downloading attachment ({attachment}): {error}') from error raise RuntimeError(f'URL error downloading attachment ({attachment}): {error}') from error
def init_bot_channel(self) -> bool: def init_bot_channel(self) -> TextChannel | None:
_, text_channel = self.list_channels() for channel in self.guild_text_channels:
breadtube_role: Role | None = None
everyone_role: Role | None = None
for role in self.guild_roles:
if role.name == self.config.bot_role:
breadtube_role = role
elif role.name == '@everyone':
everyone_role = role
if breadtube_role is None:
self.logger.info('No BreadTube role found')
return False
if everyone_role is None:
self.logger.info('No everyone role found')
return False
for channel in text_channel:
if channel.name == self.config.bot_channel: if channel.name == self.config.bot_channel:
self.bot_channel = channel
self.logger.info('Found breadtube bot channel') self.logger.info('Found breadtube bot channel')
for perm in self.bot_channel.permission_overwrites: for perm in channel.permission_overwrites:
if perm.id == breadtube_role.id: if perm.id == self.bot_role.id:
if not perm.allow | Permissions.VIEW_CHANNEL: if not perm.allow | Permissions.VIEW_CHANNEL:
self.logger.info('BreadTube bot cannot view BreadTube channel: permission missing') self.logger.info('BreadTube bot cannot view BreadTube channel: permission missing')
return False return None
self.logger.info('BreadTube channel permission OK') self.logger.info('BreadTube channel permission OK')
break break
break return channel
else:
self.bot_channel = self.create_text_channel({ self.logger.info('Creating breadtube bot channel')
return self.create_text_channel({
'name': self.config.bot_channel, 'name': self.config.bot_channel,
'permission_overwrites': [ 'permission_overwrites': [
Overwrite(everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE, Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE,
deny=Permissions.VIEW_CHANNEL), deny=Permissions.VIEW_CHANNEL),
Overwrite(breadtube_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL, Overwrite(self.bot_role.id, OverwriteType.ROLE, allow=Permissions.VIEW_CHANNEL,
deny=Permissions.NONE)] deny=Permissions.NONE)]
}) })
self.logger.info('Created breadtube bot channel')
return True
def _scan_bot_channel(self):
if self.bot_channel is None:
self.logger.error('Cannot scan bot channel: bot channel is None')
return []
def _get_bot_channel_messages(self) -> list[Message]:
messages_id_delete_task: set[int] = set() messages_id_delete_task: set[int] = set()
for task_type, _, task_params in self.tasks: for task_type, _, task_params in self.tasks:
if task_type == self.Task.DELETE_MESSAGES: if task_type == self.Task.DELETE_MESSAGES:
@ -232,30 +226,72 @@ class DiscordManager:
if len(message_batch) < self.DEFAULT_MESSAGE_LIST_LIMIT: if len(message_batch) < self.DEFAULT_MESSAGE_LIST_LIMIT:
break break
last_message_id = message_batch[-1].id last_message_id = message_batch[-1].id
return messages
self.init_message = None def _scan_bot_channel(self): # noqa: PLR0915
messages = self._get_bot_channel_messages()
init_message_found = False
new_config: Config | None = None new_config: Config | None = None
new_subscriptions: Subscriptions | None = None
delayed_delete: dict[int, Message] = {} delayed_delete: dict[int, Message] = {}
immediate_delete: dict[int, Message] = {} immediate_delete: dict[int, Message] = {}
for message in messages: for message in messages:
if message.id in delayed_delete: if init_message_found:
self.logger.debug('Skipping message already marked to be deleted') self.logger.debug('Marking message for immediate deletion (init found): %s', message)
immediate_delete[message.id] = message
continue continue
if self.init_message is None and new_config is None and len(message.attachments) == 1: if len(message.attachments) <= 0:
attachment = message.attachments[0] self.logger.debug('Marking message for immediate deletion (no attachment): %s', message)
if attachment.size < self.MAX_CONFIG_SIZE: immediate_delete[message.id] = message
continue
if message.author.id == self.bot_user.id:
self.logger.debug('Found init message')
# If same init message: nothing to do
if self.init_message is not None and message.id == self.init_message.id:
continue
# Loading init message content
has_error = False
for attachment in message.attachments:
try: try:
_, content = self._download_attachment(attachment) _, content = self._download_attachment(attachment)
if content.startswith(b'config'): if new_config is None and content.startswith(b'config'):
try:
self.config = Config.from_str(content.decode())
except RuntimeError as error:
self.logger.error('Cannot load config from init message: %s', error)
has_error = True
if new_subscriptions is None and content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]):
try:
subscriptions = SubscriptionHelper.read_text(content)
if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()):
SubscriptionHelper.update_subscriptions(
new=subscriptions, previous=self._yt_subscriptions)
self._yt_subscriptions = subscriptions
self.tasks.append((DiscordManager.Task.INIT_SUBS, time.time() + 1, None))
except RuntimeError as error:
self.logger.error('Invalid init subscriptions file: %s', error)
has_error = True
except Exception as error:
self.logger.error('Error downloading attachment: %s', error)
has_error = True
if not has_error:
self.init_message = message
init_message_found = True
continue
self.logger.debug('Reading attachment')
attachment = message.attachments[0]
if attachment.size > self.MAX_DOWNLOAD_SIZE:
self.logger.debug('Marking message for immediate deletion (attachment too big): %s', message)
immediate_delete[message.id] = message
continue
try:
_, content = self._download_attachment(attachment)
if content.startswith(b'config') and new_config is None:
try: try:
config = Config.from_str(content.decode()) config = Config.from_str(content.decode())
if message.author.id == self.bot_user.id: # keep using current config
self.logger.debug('Found previous init message')
self.init_message = message
if config != self.config: # First scan qill need to load config
self.config = config
continue
if config != self.config: # New config to update to if config != self.config: # New config to update to
new_config = config new_config = config
self.logger.debug('Marking new config message for immediate deletion: %s', message) self.logger.debug('Marking new config message for immediate deletion: %s', message)
@ -274,20 +310,54 @@ class DiscordManager:
delayed_delete[bot_message.id] = bot_message delayed_delete[bot_message.id] = bot_message
delayed_delete[message.id] = message delayed_delete[message.id] = message
continue continue
if content.startswith(SUBSCRIPTION_FILE_COLUMNS[0]):
try:
subscriptions = SubscriptionHelper.read_text(content)
if set(subscriptions.keys()) != set(self._yt_subscriptions.keys()):
new_subscriptions = subscriptions
self.logger.debug('Marking new subscriptions message for immediate deletion: %s', message)
immediate_delete[message.id] = message
continue
except RuntimeError as error:
self.logger.info('Invalid subscriptions file: %s', error)
bot_message = self.create_message(self.bot_channel, {
'content': str(error),
'message_reference': MessageReference(
type=MessageReferenceType.DEFAULT,
message_id=message.id,
channel_id=self.bot_channel.id,
guild_id=None,
fail_if_not_exists=None)})
delayed_delete[bot_message.id] = bot_message
delayed_delete[message.id] = message
continue
except Exception as error: except Exception as error:
self.logger.error('Error downloading attachment: %s', error) self.logger.error('Error downloading attachment: %s', error)
self.logger.debug('Marking message for immediate deletion: %s', message)
immediate_delete[message.id] = message
if new_config is not None: if new_config is not None:
self.logger.info('Loading config: %s', new_config) self.logger.info('Loading config: %s', new_config)
self.config = new_config self.config = new_config
if new_subscriptions is not None:
self.logger.info('Loading subscriptions: %s', new_subscriptions)
SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions)
self._yt_subscriptions = new_subscriptions
self.tasks.append((DiscordManager.Task.INIT_SUBS, time.time() + 1, None))
# New init message is needed, previous need to be deleted
if (new_config is not None or new_subscriptions is not None) and self.init_message is not None:
immediate_delete[self.init_message.id] = self.init_message
self.init_message = None
# Init message absent or deleted
if self.init_message is None: if self.init_message is None:
assert self.config is not None assert self.config is not None
self.init_message = self.create_message( self.init_message = self.create_message(
self.bot_channel, {'content': self.INIT_MESSAGE}, self.bot_channel, {'content': self.INIT_MESSAGE},
upload_files=[('config.txt', FileMime.TEXT_PLAIN, self.config.to_str().encode())]) upload_files=[
('config.txt', FileMime.TEXT_PLAIN, self.config.to_str().encode()),
('subscriptions.csv', FileMime.TEXT_CSV, SubscriptionHelper.generate_text(self._yt_subscriptions))
])
for message in immediate_delete.values(): for message in immediate_delete.values():
try: try:
@ -301,6 +371,60 @@ class DiscordManager:
time.time() + self.config.bot_message_duration, time.time() + self.config.bot_message_duration,
list(delayed_delete.values()))) list(delayed_delete.values())))
def _init_subs(self):
categories, text_channel = self.list_channels()
self.guild_text_channels = text_channel
self.guild_categories = categories
channel_dict: dict[str, TextChannel] = {c.name or '': c for c in self.guild_text_channels}
unmanaged_categories: set[str] = set(self.config.unmanaged_categories.split(','))
category_ranges: list[tuple[int, int, ChannelCategory]] = []
for category in self.guild_categories:
if category.name in unmanaged_categories:
self.logger.debug('Skipping unmanaged category: %s', category.name)
continue
range_info = (category.name or '').split('-')
if len(range_info) != 2: # noqa: PLR2004
self.logger.warning('Cannot compute range for category: %s', category.name)
continue
category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category))
category_ranges = sorted(category_ranges, key=operator.itemgetter(0))
name_regex = r'([^a-z])'
for sub_info in self._yt_subscriptions.values():
discord_name = sub_info.name.lower()
discord_name = re.sub(name_regex, '-', discord_name)
category_value = ord(discord_name[0])
sub_channel: TextChannel | None = channel_dict.get(discord_name)
if sub_channel is None:
selected_category: ChannelCategory | None = None
for start_range, stop_range, category in category_ranges:
if start_range <= category_value <= stop_range:
selected_category = category
break
if selected_category is None:
selected_category = category_ranges[-1][2]
sub_channel = self.create_text_channel({
'name': discord_name,
'parent_id': selected_category.id,
'permission_overwrites': [
Overwrite(self.everyone_role.id, OverwriteType.ROLE, allow=Permissions.NONE,
deny=Permissions.SEND_MESSAGES),
Overwrite(self.bot_role.id, OverwriteType.ROLE,
allow=Permissions.VIEW_CHANNEL | Permissions.SEND_MESSAGES,
deny=Permissions.NONE)]
})
if sub_info.channel_info is None:
_, channel_info = self.yt_manager.request_channel_info(
sub_info.channel_id, request_timeout=self.config.request_timeout)
if not channel_info.items:
self.logger.error('No channel info return from YouTube API for channel: %s', discord_name)
continue
sub_info.channel_info = channel_info.items[0].snippet
channel_url = f'https://www.youtube.com/{sub_info.channel_info.customUrl}'
_ = self.create_message(sub_channel, {'content': channel_url})
sub_info.last_update = time.time()
def run(self): def run(self):
while True: while True:
if self.tasks: if self.tasks:
@ -312,14 +436,6 @@ class DiscordManager:
if sleep_time > 0: if sleep_time > 0:
time.sleep(sleep_time) time.sleep(sleep_time)
match task_type: match task_type:
case DiscordManager.Task.SCAN_BOT_CHANNEL:
try:
self._scan_bot_channel()
except Exception as error:
self.logger.error('Error scanning bot channel: %s -> %s',
error, traceback.format_exc().replace('\n', ' | '))
self.tasks.append((
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
case DiscordManager.Task.DELETE_MESSAGES: case DiscordManager.Task.DELETE_MESSAGES:
if not isinstance(task_params, list): if not isinstance(task_params, list):
self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params) self.logger.error('Wrong task params for DELETE_MESSAGES: %s', task_params)
@ -334,6 +450,20 @@ class DiscordManager:
except Exception as error: except Exception as error:
self.logger.error('Error deleting message %s: %s -> %s', self.logger.error('Error deleting message %s: %s -> %s',
message, error, traceback.format_exc().replace('\n', ' | ')) message, error, traceback.format_exc().replace('\n', ' | '))
case DiscordManager.Task.SCAN_BOT_CHANNEL:
try:
self._scan_bot_channel()
except Exception as error:
self.logger.error('Error scanning bot channel: %s -> %s',
error, traceback.format_exc().replace('\n', ' | '))
self.tasks.append((
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
case DiscordManager.Task.INIT_SUBS:
try:
self._init_subs()
except Exception as error:
self.logger.error('Error initializing subscriptions : %s -> %s',
error, traceback.format_exc().replace('\n', ' | '))
time.sleep(1) time.sleep(1)
def create_text_channel(self, params: Api.Guild.CreateTextChannelParams) -> TextChannel: def create_text_channel(self, params: Api.Guild.CreateTextChannelParams) -> TextChannel:
@ -394,8 +524,7 @@ class DiscordManager:
params['before'] = before_id params['before'] = before_id
if after_id is not None: if after_id is not None:
params['after'] = after_id params['after'] = after_id
headers, messages_info = self._send_request( _, messages_info = self._send_request(
*Api.Message.list_by_channel(channel.id), *Api.Message.list_by_channel(channel.id),
data=json.dumps(params, cls=ApiEncoder).encode() if params else None) data=json.dumps(params, cls=ApiEncoder).encode() if params else None)
self._update_rate_limit(headers)
return [Message.from_dict(m) for m in messages_info or []] return [Message.from_dict(m) for m in messages_info or []]

View file

@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
import urllib.error import urllib.error
import urllib.request import urllib.request
from breadtube_bot.yt_objects import SearchResult from .yt_objects import ChannelResult, SearchResult
if TYPE_CHECKING: if TYPE_CHECKING:
@ -57,11 +57,11 @@ class YoutubeManager:
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
def request_channel_info(self, channel_id: str, request_timeout: float) -> tuple[ def request_channel_info(self, channel_id: str, request_timeout: float) -> tuple[
HTTPHeaders, dict]: HTTPHeaders, ChannelResult]:
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet' url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
f'&id={channel_id}&key={self._api_key}') f'&id={channel_id}&key={self._api_key}')
headers, info = self._request(url=url, request_timeout=request_timeout) headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, info return headers, ChannelResult.from_dict(info)
def request_channel_videos(self, channel_id: str, max_results: int, request_timeout: float) -> tuple[ def request_channel_videos(self, channel_id: str, max_results: int, request_timeout: float) -> tuple[
HTTPHeaders, SearchResult]: HTTPHeaders, SearchResult]:

View file

@ -0,0 +1,50 @@
from dataclasses import dataclass
from .yt_objects import ChannelSnippet
@dataclass
class SubscriptionInfo:
name: str
channel_id: str
last_update: float
channel_info: ChannelSnippet | None = None
Subscriptions = dict[str, SubscriptionInfo]
SUBSCRIPTION_FILE_COLUMNS: list[bytes] = [b'Channel Name', b'Channel ID']
class SubscriptionHelper:
@staticmethod
def read_text(content: bytes) -> Subscriptions:
content_lines = content.strip().splitlines()
if content_lines[0] != b';'.join(SUBSCRIPTION_FILE_COLUMNS):
raise RuntimeError(f"Unexpected header: {content_lines[0]} != {b','.join(SUBSCRIPTION_FILE_COLUMNS)}")
subscriptions: Subscriptions = {}
for line_number, line in enumerate(content_lines[1:], start=1):
values = line.split(b';')
if len(values) != len(SUBSCRIPTION_FILE_COLUMNS):
raise RuntimeError(f'Unexpected value count (columns) at line {line_number}'
f', got {len(values)} but expected {len(SUBSCRIPTION_FILE_COLUMNS)}')
name = values[0].decode()
channel_id = values[1].decode()
subscriptions[channel_id] = SubscriptionInfo(name=name, channel_id=channel_id, last_update=0)
return subscriptions
@staticmethod
def update_subscriptions(new: Subscriptions, previous: Subscriptions):
for channel_id, previous_info in previous.items():
if channel_id in new:
new[channel_id].last_update = previous_info.last_update
new[channel_id].channel_info = previous_info.channel_info
@staticmethod
def generate_text(subscriptions: Subscriptions) -> bytes:
content_lines: list[bytes] = [b';'.join(SUBSCRIPTION_FILE_COLUMNS)]
content_lines.extend([
f"{sub_info.name.replace(';', ',')};{sub_info.channel_id}".encode() for sub_info in subscriptions.values()])
return b'\n'.join(content_lines)

View file

@ -33,16 +33,6 @@ class PageInfo(_Api):
return PageInfo(totalResults=info['totalResults'], resultsPerPage=info['resultsPerPage']) return PageInfo(totalResults=info['totalResults'], resultsPerPage=info['resultsPerPage'])
@dataclass
class SearchResultId(_Api):
kind: str
videoId: str
@staticmethod
def from_dict(info: dict) -> SearchResultId:
return SearchResultId(kind=info['kind'], videoId=info['videoId'])
@dataclass @dataclass
class ThumbnailInfo(_Api): class ThumbnailInfo(_Api):
url: str url: str
@ -72,7 +62,7 @@ class Thumbnails(_Api):
class Result(Generic[T_api]): class Result(Generic[T_api]):
kind: str kind: str
etag: str etag: str
nextPageToken: str nextPageToken: str | None
pageInfo: PageInfo pageInfo: PageInfo
items: list[T_api] items: list[T_api]
@ -82,7 +72,7 @@ class Result(Generic[T_api]):
return cls( return cls(
kind=info['kind'], kind=info['kind'],
etag=info['etag'], etag=info['etag'],
nextPageToken=info['nextPageToken'], nextPageToken=info.get('nextPageToken'),
pageInfo=PageInfo.from_dict(info['pageInfo']), pageInfo=PageInfo.from_dict(info['pageInfo']),
items=[item_type.from_dict(i) for i in info['items']]) items=[item_type.from_dict(i) for i in info['items']])
@ -90,20 +80,40 @@ class Result(Generic[T_api]):
# Channel Objects # Channel Objects
@dataclass
class ChannelSnippet(_Api):
title: str
description: str
customUrl: str
publishedAt: datetime
thumbnails: Thumbnails
country: str
@staticmethod
def from_dict(info: dict) -> ChannelSnippet:
return ChannelSnippet(
title=info['title'],
description=info['description'],
customUrl=info['customUrl'],
publishedAt=datetime.fromisoformat(info['publishedAt']),
thumbnails=Thumbnails.from_dict(info['thumbnails']),
country=info['country'])
@dataclass @dataclass
class ChannelResultItem(_Api): class ChannelResultItem(_Api):
kind: str kind: str
etag: str etag: str
id: SearchResultId id: str
snippet: SearchResultSnippet snippet: ChannelSnippet
@staticmethod @staticmethod
def from_dict(info: dict) -> SearchResultItem: def from_dict(info: dict) -> ChannelResultItem:
return SearchResultItem( return ChannelResultItem(
kind=info['kind'], kind=info['kind'],
etag=info['etag'], etag=info['etag'],
id=SearchResultId.from_dict(info['id']), id=info['id'],
snippet=SearchResultSnippet.from_dict(info['snippet'])) snippet=ChannelSnippet.from_dict(info['snippet']))
class ChannelResult(Result[ChannelResultItem]): class ChannelResult(Result[ChannelResultItem]):
@ -114,7 +124,17 @@ class ChannelResult(Result[ChannelResultItem]):
@dataclass @dataclass
class SearchResultSnippet(_Api): class SearchResultId(_Api):
kind: str
videoId: str
@staticmethod
def from_dict(info: dict) -> SearchResultId:
return SearchResultId(kind=info['kind'], videoId=info['videoId'])
@dataclass
class SearchSnippet(_Api):
publishedAt: datetime publishedAt: datetime
channelId: str channelId: str
title: str title: str
@ -125,8 +145,8 @@ class SearchResultSnippet(_Api):
publishTime: datetime publishTime: datetime
@staticmethod @staticmethod
def from_dict(info: dict) -> SearchResultSnippet: def from_dict(info: dict) -> SearchSnippet:
return SearchResultSnippet( return SearchSnippet(
publishedAt=datetime.fromisoformat(info['publishedAt']), publishedAt=datetime.fromisoformat(info['publishedAt']),
channelId=info['channelId'], channelId=info['channelId'],
title=info['title'], title=info['title'],
@ -142,7 +162,7 @@ class SearchResultItem(_Api):
kind: str kind: str
etag: str etag: str
id: SearchResultId id: SearchResultId
snippet: SearchResultSnippet snippet: SearchSnippet
@staticmethod @staticmethod
def from_dict(info: dict) -> SearchResultItem: def from_dict(info: dict) -> SearchResultItem:
@ -150,7 +170,7 @@ class SearchResultItem(_Api):
kind=info['kind'], kind=info['kind'],
etag=info['etag'], etag=info['etag'],
id=SearchResultId.from_dict(info['id']), id=SearchResultId.from_dict(info['id']),
snippet=SearchResultSnippet.from_dict(info['snippet'])) snippet=SearchSnippet.from_dict(info['snippet']))
class SearchResult(Result[SearchResultItem]): class SearchResult(Result[SearchResultItem]):

View file

@ -47,7 +47,7 @@ inline-quotes = "single"
[tool.ruff.lint.pylint] [tool.ruff.lint.pylint]
max-args=16 max-args=16
max-branches=24 max-branches=32
max-locals=16 max-locals=16
max-nested-blocks=8 max-nested-blocks=8
max-public-methods=16 max-public-methods=16
@ -55,7 +55,7 @@ max-returns=8
max-statements=96 max-statements=96
[tool.ruff.lint.mccabe] [tool.ruff.lint.mccabe]
max-complexity = 24 max-complexity = 32
[tool.coverage.run] [tool.coverage.run]
command_line = "-m pytest" command_line = "-m pytest"