Compare commits

..

No commits in common. "4674cc926c5dcf4c4d9ffcf8942e613631e8f4ad" and "ef14b1cc07a6e2b2d0d60223d8a019da532078de" have entirely different histories.

6 changed files with 100 additions and 174 deletions

View file

@ -1,9 +1,6 @@
from __future__ import annotations from __future__ import annotations
from enum import Enum from enum import Enum
import html
import http.client
import json
import logging import logging
import operator import operator
from pathlib import Path from pathlib import Path
@ -14,7 +11,7 @@ from typing import Any, TYPE_CHECKING
import traceback import traceback
from .config import Config from .config import Config
from .discord_manager import ApiEncoder, DiscordManager from .discord_manager import DiscordManager
from .logger import create_logger from .logger import create_logger
from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite, from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite,
OverwriteType, Permissions, Role, TextChannel) OverwriteType, Permissions, Role, TextChannel)
@ -26,20 +23,19 @@ if TYPE_CHECKING:
class Bot: class Bot:
DEFAULT_MESSAGE_LIST_LIMIT: int = 50 DEFAULT_MESSAGE_LIST_LIMIT = 50
DISCORD_NAME_REGEX: str = r'([^a-z])' DISCORD_NAME_REGEX = r'([^a-z])'
INIT_MESSAGE: str = ('Bot initialized.\nThis is the current configuration used.\n' INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n'
'You can upload a new one to update the configuration.') 'You can upload a new one to update the configuration.')
MAX_DOWNLOAD_SIZE: int = 50_000 MAX_DOWNLOAD_SIZE: int = 50_000
SUBS_LIST_MIN_SIZE: int = 50 SUBS_LIST_MIN_SIZE = 50
SUBS_LIST_SHORTS_RATIO: int = 5 SUBS_LIST_SHORTS_RATIO = 5
SUBS_LIST_VIDEO_RATIO: int = 2 SUBS_LIST_VIDEO_RATIO = 2
SUBS_SAVE_PATH: Path = Path('/tmp/breadtube-bot_subs.json')
class Task(Enum): class Task(Enum):
DELETE_MESSAGES = 1 DELETE_MESSAGES = 1
SCAN_BOT_CHANNEL = 2 SCAN_BOT_CHANNEL = 2
REFRESH_SUBS = 3 INIT_SUBS = 3
@staticmethod @staticmethod
def _get_code_version() -> str: def _get_code_version() -> str:
@ -93,15 +89,12 @@ class Bot:
raise RuntimeError("Couldn't initialize bot channel/role/permission") raise RuntimeError("Couldn't initialize bot channel/role/permission")
self.bot_channel: TextChannel = bot_channel self.bot_channel: TextChannel = bot_channel
self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger) self._yt_subscriptions: Subscriptions = {}
self._yt_subscriptions: Subscriptions = {
name: SubscriptionInfo.from_dict(info) for name, info in json.loads(
self.SUBS_SAVE_PATH.read_text(encoding='utf-8')).items()} if self.SUBS_SAVE_PATH.exists() else {}
self._scan_bot_channel() self._scan_bot_channel()
self.tasks.append(( self.tasks.append((
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None)) self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger)
self.logger.info('Bot initialized') self.logger.info('Bot initialized')
def init_bot_channel(self) -> TextChannel | None: def init_bot_channel(self) -> TextChannel | None:
@ -146,7 +139,6 @@ class Bot:
return messages return messages
def _scan_bot_channel(self): # noqa: PLR0915 def _scan_bot_channel(self): # noqa: PLR0915
self.logger.debug('Starting scanning bot channel')
messages = self._get_all_channel_messages(self.bot_channel) messages = self._get_all_channel_messages(self.bot_channel)
init_message_found = False init_message_found = False
new_config: Config | None = None new_config: Config | None = None
@ -178,8 +170,6 @@ class Bot:
if new_config is None and content.startswith(b'config'): if new_config is None and content.startswith(b'config'):
try: try:
self.config = Config.from_str(content.decode()) self.config = Config.from_str(content.decode())
if self.config.to_str() != content.decode():
new_config = self.config
except RuntimeError as error: except RuntimeError as error:
self.logger.error('Cannot load config from init message: %s', error) self.logger.error('Cannot load config from init message: %s', error)
has_error = True has_error = True
@ -190,8 +180,7 @@ class Bot:
SubscriptionHelper.update_subscriptions( SubscriptionHelper.update_subscriptions(
new=subscriptions, previous=self._yt_subscriptions) new=subscriptions, previous=self._yt_subscriptions)
self._yt_subscriptions = subscriptions self._yt_subscriptions = subscriptions
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None))
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
except RuntimeError as error: except RuntimeError as error:
self.logger.error('Invalid init subscriptions file: %s', error) self.logger.error('Invalid init subscriptions file: %s', error)
has_error = True has_error = True
@ -265,8 +254,7 @@ class Bot:
self.logger.info('Loading subscriptions') self.logger.info('Loading subscriptions')
SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions) SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions)
self._yt_subscriptions = new_subscriptions self._yt_subscriptions = new_subscriptions
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks)) self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None))
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
# New init message is needed, previous need to be deleted # New init message is needed, previous need to be deleted
if (new_config is not None or new_subscriptions is not None) and self.init_message is not None: if (new_config is not None or new_subscriptions is not None) and self.init_message is not None:
@ -294,7 +282,6 @@ class Bot:
Bot.Task.DELETE_MESSAGES, Bot.Task.DELETE_MESSAGES,
time.time() + self.config.bot_message_duration, time.time() + self.config.bot_message_duration,
list(delayed_delete.values()))) list(delayed_delete.values())))
self.logger.debug('Bot channel scanned')
def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel],
category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel: category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel:
@ -328,12 +315,11 @@ class Bot:
max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count, max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count,
request_timeout=self.config.request_timeout) request_timeout=self.config.request_timeout)
video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list} video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list}
yt_connection = http.client.HTTPSConnection('www.youtube.com', timeout=self.config.request_timeout)
for yt_info in yt_video_info.items: for yt_info in yt_video_info.items:
if yt_info.id.videoId in video_ids: if yt_info.id.videoId in video_ids:
continue continue
if self.yt_manager.is_shorts(yt_connection, yt_info.id.videoId): if self.yt_manager.is_shorts(yt_info.id.videoId, request_timeout=self.config.request_timeout):
subscription.shorts_list.append(yt_info) subscription.shorts_list.append(yt_info)
else: else:
subscription.video_list.append(yt_info) subscription.video_list.append(yt_info)
@ -348,82 +334,12 @@ class Bot:
subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size]
subscription.last_update = time.time() subscription.last_update = time.time()
def _video_message_content(self, video: SearchResultItem) -> str: @staticmethod
return (self.config.youtube_channel_video_message def _video_message_content(video: SearchResultItem) -> str:
.replace('{{video_id}}', str(video.id.videoId)) return f'https://www.youtube.com/video/{video.id.videoId}'
.replace('{{video_title}}', str(html.unescape(video.snippet.title)))
.replace('{{video_description}}', str(video.snippet.description))
.replace('{{video_publish_time}}', video.snippet.publishTime.isoformat())
.replace('{{channel_id}}', str(video.snippet.channelId))
.replace('{{channel_title}}', str(video.snippet.channelTitle))
)
def _refresh_sub(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], def _init_subs(self):
category_ranges: list[tuple[int, int, ChannelCategory]]): self.logger.info('Initialize all subs')
try:
sub_channel = self._get_subscription_channel(subscription, channel_dict, category_ranges)
except RuntimeError as error:
self.logger.error(error)
return
if subscription.channel_info is None:
_, channel_info = self.yt_manager.request_channel_info(
subscription.channel_id, request_timeout=self.config.request_timeout)
if not channel_info.items:
raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name)
subscription.channel_info = channel_info.items[0].snippet
self._refresh_subscription(subscription)
sub_init_message = f'https://www.youtube.com/{subscription.channel_info.customUrl}'
sub_messages = self._get_all_channel_messages(sub_channel)
if not sub_messages or sub_messages[-1].content != sub_init_message:
self.logger.debug('Clearing sub channel: %s', sub_channel.name)
for message in sub_messages:
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
_ = self.discord_manager.create_message(
sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout)
else:
messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count]))
yt_videos = list(reversed(subscription.video_list[:self.config.youtube_channel_video_count]))
immediate_delete: dict[int, Message] = {
m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]}
last_matching_index = 0
stop_scan = False
for yt_video in yt_videos:
for index, message in enumerate(messages[last_matching_index:], start=last_matching_index):
if message.content != self._video_message_content(yt_video):
if last_matching_index != 0:
stop_scan = True
break
self.logger.debug('Unmatched video: %s', yt_video.id.videoId)
immediate_delete[message.id] = message
else:
self.logger.debug('Matched video: %s', yt_video.id.videoId)
last_matching_index = index + 1
break
else:
self.logger.debug('All videos scanned')
break
if stop_scan:
break
for message in messages[last_matching_index:]:
immediate_delete[message.id] = message
for message in immediate_delete.values():
try:
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
except RuntimeError as error:
self.logger.error('Error deleting message %s from channel %s : %s',
message.id, sub_channel.name, error)
for video in yt_videos[last_matching_index:]:
_ = self.discord_manager.create_message(
sub_channel, {'content': self._video_message_content(video)},
request_timeout=self.config.request_timeout)
subscription.last_update = time.time()
def _refresh_subs(self):
self.logger.info('Start refreshing subs')
categories, text_channel = self.discord_manager.list_channels( categories, text_channel = self.discord_manager.list_channels(
self.guild_id, request_timeout=self.config.request_timeout) self.guild_id, request_timeout=self.config.request_timeout)
self.guild_text_channels = text_channel self.guild_text_channels = text_channel
@ -443,16 +359,68 @@ class Bot:
category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category)) category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category))
category_ranges = sorted(category_ranges, key=operator.itemgetter(0)) category_ranges = sorted(category_ranges, key=operator.itemgetter(0))
sorted_subs = sorted(self._yt_subscriptions.values(), key=lambda s: s.last_update) for sub_info in self._yt_subscriptions.values():
for sub_info in sorted_subs:
try: try:
self._refresh_sub(sub_info, channel_dict, category_ranges) sub_channel = self._get_subscription_channel(sub_info, channel_dict, category_ranges)
except RuntimeError as error: except RuntimeError as error:
self.logger.error('Refreshing subscription %s failed: %s', sub_info.channel_id, error) self.logger.error(error)
except TimeoutError as error: continue
self.logger.error('Timeout error refreshing subcription: %s', error) if sub_info.channel_info is None:
break _, channel_info = self.yt_manager.request_channel_info(
self.logger.info('Subs refreshed') sub_info.channel_id, request_timeout=self.config.request_timeout)
if not channel_info.items:
raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name)
sub_info.channel_info = channel_info.items[0].snippet
self._refresh_subscription(sub_info)
sub_init_message = f'https://www.youtube.com/{sub_info.channel_info.customUrl}'
sub_messages = self._get_all_channel_messages(sub_channel)
if not sub_messages or sub_messages[-1].content != sub_init_message:
self.logger.debug('Clearing sub channel: %s', sub_channel.name)
for message in sub_messages:
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
_ = self.discord_manager.create_message(
sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout)
else:
messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count]))
yt_videos = list(reversed(sub_info.video_list[:self.config.youtube_channel_video_count]))
immediate_delete: dict[int, Message] = {
m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]}
last_matching_index = 0
stop_scan = False
for yt_video in yt_videos:
for index, message in enumerate(messages[last_matching_index:], start=last_matching_index):
if message.content != self._video_message_content(yt_video):
if last_matching_index != 0:
stop_scan = True
break
self.logger.debug('Unmatched video: %s', yt_video.id.videoId)
immediate_delete[message.id] = message
else:
self.logger.debug('Matched video: %s', yt_video.id.videoId)
last_matching_index = index + 1
break
else:
self.logger.debug('All videos scanned')
break
if stop_scan:
break
for message in messages[last_matching_index:]:
immediate_delete[message.id] = message
for message in immediate_delete.values():
try:
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
except RuntimeError as error:
self.logger.error('Error deleting message %s from channel %s : %s',
message.id, sub_channel.name, error)
for video in yt_videos[last_matching_index:]:
_ = self.discord_manager.create_message(
sub_channel, {'content': self._video_message_content(video)},
request_timeout=self.config.request_timeout)
sub_info.last_update = time.time()
def run(self): def run(self):
while self.tasks: while self.tasks:
@ -485,17 +453,13 @@ class Bot:
except Exception as error: except Exception as error:
self.logger.error('Error scanning bot channel: %s -> %s', self.logger.error('Error scanning bot channel: %s -> %s',
error, traceback.format_exc().replace('\n', ' | ')) error, traceback.format_exc().replace('\n', ' | '))
self.tasks = list(filter(lambda t: t[0] != Bot.Task.SCAN_BOT_CHANNEL, self.tasks))
self.tasks.append(( self.tasks.append((
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None)) self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
case Bot.Task.REFRESH_SUBS: case Bot.Task.INIT_SUBS:
try: try:
self._refresh_subs() self._init_subs()
except Exception as error: except Exception as error:
self.logger.error('Error initializing subscriptions : %s -> %s', self.logger.error('Error initializing subscriptions : %s -> %s',
error, traceback.format_exc().replace('\n', ' | ')) error, traceback.format_exc().replace('\n', ' | '))
self.SUBS_SAVE_PATH.write_text(
json.dumps(self._yt_subscriptions, cls=ApiEncoder, ensure_ascii=False), encoding='utf-8')
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
self.tasks.append(( self.tasks.append((
self.Task.REFRESH_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None)) self.Task.INIT_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None))

View file

@ -14,7 +14,6 @@ class Config:
unmanaged_categories: str = '' unmanaged_categories: str = ''
youtube_channel_refresh_interval: float = 3600 youtube_channel_refresh_interval: float = 3600
youtube_channel_video_count: int = 10 youtube_channel_video_count: int = 10
youtube_channel_video_message: str = '[{{video_title}}](https://www.youtube.com/video/{{video_id}})'
def to_str(self) -> str: def to_str(self) -> str:
return '\n'.join(['config', *[f'{k}={v}' for k, v in asdict(self).items()]]) return '\n'.join(['config', *[f'{k}={v}' for k, v in asdict(self).items()]])

View file

@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import asdict, dataclass, is_dataclass from dataclasses import asdict, dataclass, is_dataclass
from datetime import datetime
from enum import Enum from enum import Enum
import json import json
import random import random
@ -23,14 +22,11 @@ class ApiEncoder(json.JSONEncoder):
return asdict(o) # type: ignore return asdict(o) # type: ignore
if isinstance(o, Enum): if isinstance(o, Enum):
return o.value return o.value
if isinstance(o, datetime):
return o.isoformat()
return super().default(o) return super().default(o)
class DiscordManager: class DiscordManager:
MIN_API_VERSION = 9 MIN_API_VERSION = 9
TOO_MANY_REQUEST_STATUS = 429
@dataclass @dataclass
class RateLimit: class RateLimit:
@ -89,31 +85,15 @@ class DiscordManager:
else: else:
request.add_header('Content-Type', 'application/json') request.add_header('Content-Type', 'application/json')
request.add_header('Authorization', f'Bot {self._bot_token}') request.add_header('Authorization', f'Bot {self._bot_token}')
def _request() -> tuple[int, dict, bytes | None]:
nonlocal request, request_timeout
with urllib.request.urlopen(request, timeout=request_timeout) as response:
headers = dict(response.getheaders())
return response.status, headers, response.read()
try: try:
body = b'' with urllib.request.urlopen(request, timeout=request_timeout) as response:
try: if response.status != expected_code:
status, headers, body = _request() raise RuntimeError(
except urllib.error.HTTPError as error: f'Unexpected code {response.status} (expected: {expected_code}) -> {response.read().decode()}')
if error.status != self.TOO_MANY_REQUEST_STATUS: body = response.read()
raise error headers = dict(response.getheaders())
status = error.status
headers = dict(error.headers)
self._update_rate_limit(headers)
if status == self.TOO_MANY_REQUEST_STATUS:
self._logger.warning('Warning: too many request -> retrying')
status, headers, body = _request()
self._update_rate_limit(headers) self._update_rate_limit(headers)
if status != expected_code: return headers, json.loads(body.decode()) if body else None
raise RuntimeError(f'Unexpected code {status} (expected: {expected_code}) -> {body}')
return headers, json.loads(body.decode()) if body else None
except urllib.error.HTTPError as error: except urllib.error.HTTPError as error:
raise RuntimeError( raise RuntimeError(
f'HTTP error calling API ({url}): {error}:\nHeaders:\n{error.headers}Body:\n{error.read()}') from error f'HTTP error calling API ({url}): {error}:\nHeaders:\n{error.headers}Body:\n{error.read()}') from error

View file

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
import http.client
import json import json
import time import time
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -11,13 +12,12 @@ from .youtube_objects import ChannelResult, SearchResult
if TYPE_CHECKING: if TYPE_CHECKING:
import http.client
import logging import logging
from .objects import HTTPHeaders from .objects import HTTPHeaders
class YoutubeManager: class YoutubeManager:
DEFAULT_DAILY_POINTS = 10_000 DEFAULT_DAILY_REQUESTS = 10_000
SHORTS_CHECK_STATUS = 303 SHORTS_CHECK_STATUS = 303
@dataclass @dataclass
@ -28,12 +28,12 @@ class YoutubeManager:
def __init__(self, api_key: str, logger: logging.Logger): def __init__(self, api_key: str, logger: logging.Logger):
self._api_key = api_key self._api_key = api_key
self._logger = logger self._logger = logger
self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_POINTS, next_reset=time.time() + 24 * 3600) self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_REQUESTS, next_reset=time.time() + 24 * 3600)
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]: def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]:
if time.time() >= self.rate_limit.next_reset: if time.time() >= self.rate_limit.next_reset:
self.rate_limit.next_reset = time.time() + 24 * 3600 self.rate_limit.next_reset = time.time() + 24 * 3600
self.rate_limit.remaining = self.DEFAULT_DAILY_POINTS self.rate_limit.remaining = self.DEFAULT_DAILY_REQUESTS
elif self.rate_limit.remaining <= 0: elif self.rate_limit.remaining <= 0:
sleep_time = time.time() - self.rate_limit.next_reset sleep_time = time.time() - self.rate_limit.next_reset
self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time) self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time)
@ -58,13 +58,11 @@ class YoutubeManager:
except TimeoutError as error: except TimeoutError as error:
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
def is_shorts(self, connection: http.client.HTTPConnection, video_id: str) -> bool: def is_shorts(self, video_id: str, request_timeout: float) -> bool:
try: try:
endpoint = f'/shorts/{video_id}' connection = http.client.HTTPSConnection('www.youtube.com', timeout=request_timeout)
self._logger.debug('YoutubeManager: Checking for shorts: %s', endpoint) connection.request('GET', f'/shorts/{video_id}')
connection.request('GET', endpoint)
response = connection.getresponse() response = connection.getresponse()
response.read()
return response.status != self.SHORTS_CHECK_STATUS return response.status != self.SHORTS_CHECK_STATUS
except Exception as error: except Exception as error:
raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error
@ -73,7 +71,6 @@ class YoutubeManager:
HTTPHeaders, ChannelResult]: HTTPHeaders, ChannelResult]:
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet' url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
f'&id={channel_id}&key={self._api_key}') f'&id={channel_id}&key={self._api_key}')
self._logger.debug('YoutubeManager: request channel info for channel %s', channel_id)
headers, info = self._request(url=url, request_timeout=request_timeout) headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, ChannelResult.from_dict(info) return headers, ChannelResult.from_dict(info)
@ -81,6 +78,5 @@ class YoutubeManager:
HTTPHeaders, SearchResult]: HTTPHeaders, SearchResult]:
url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}' url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}'
f'&maxResults={max_results}&order=date&type=video&key={self._api_key}') f'&maxResults={max_results}&order=date&type=video&key={self._api_key}')
self._logger.debug('YoutubeManager: request channel videos for channel %s', channel_id)
headers, info = self._request(url=url, request_timeout=request_timeout) headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, SearchResult.from_dict(info) return headers, SearchResult.from_dict(info)

View file

@ -74,7 +74,7 @@ class Result(Generic[T_api]):
etag=info['etag'], etag=info['etag'],
nextPageToken=info.get('nextPageToken'), nextPageToken=info.get('nextPageToken'),
pageInfo=PageInfo.from_dict(info['pageInfo']), pageInfo=PageInfo.from_dict(info['pageInfo']),
items=[item_type.from_dict(i) for i in info.get('items', [])]) items=[item_type.from_dict(i) for i in info['items']])
# Channel Objects # Channel Objects
@ -87,7 +87,7 @@ class ChannelSnippet(_Api):
customUrl: str customUrl: str
publishedAt: datetime publishedAt: datetime
thumbnails: Thumbnails thumbnails: Thumbnails
country: str | None country: str
@staticmethod @staticmethod
def from_dict(info: dict) -> ChannelSnippet: def from_dict(info: dict) -> ChannelSnippet:
@ -97,7 +97,7 @@ class ChannelSnippet(_Api):
customUrl=info['customUrl'], customUrl=info['customUrl'],
publishedAt=datetime.fromisoformat(info['publishedAt']), publishedAt=datetime.fromisoformat(info['publishedAt']),
thumbnails=Thumbnails.from_dict(info['thumbnails']), thumbnails=Thumbnails.from_dict(info['thumbnails']),
country=info.get('country')) country=info['country'])
@dataclass @dataclass

View file

@ -1,5 +1,3 @@
from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from .youtube_objects import ChannelSnippet, SearchResultItem from .youtube_objects import ChannelSnippet, SearchResultItem
@ -14,17 +12,6 @@ class SubscriptionInfo:
shorts_list: list[SearchResultItem] = field(default_factory=list) shorts_list: list[SearchResultItem] = field(default_factory=list)
video_list: list[SearchResultItem] = field(default_factory=list) video_list: list[SearchResultItem] = field(default_factory=list)
@staticmethod
def from_dict(info: dict) -> SubscriptionInfo:
channel_info: dict | None = info.get('channel_info')
return SubscriptionInfo(
name=info['name'],
channel_id=info['channel_id'],
last_update=info['last_update'],
channel_info=ChannelSnippet.from_dict(channel_info) if channel_info is not None else None,
shorts_list=[SearchResultItem.from_dict(s) for s in info['shorts_list']],
video_list=[SearchResultItem.from_dict(s) for s in info['video_list']])
Subscriptions = dict[str, SubscriptionInfo] Subscriptions = dict[str, SubscriptionInfo]