Compare commits
10 commits
ef14b1cc07
...
4674cc926c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4674cc926c | ||
|
|
f7b0dd48a9 | ||
|
|
45eac39d30 | ||
|
|
3ddffe2774 | ||
|
|
f59982f745 | ||
|
|
0e6f2a8588 | ||
|
|
693564bb04 | ||
|
|
a1c07d2aac | ||
|
|
2759d5dc1f | ||
|
|
a9e0fd26d7 |
6 changed files with 173 additions and 99 deletions
|
|
@ -1,6 +1,9 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
import html
|
||||||
|
import http.client
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import operator
|
import operator
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
@ -11,7 +14,7 @@ from typing import Any, TYPE_CHECKING
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
from .config import Config
|
from .config import Config
|
||||||
from .discord_manager import DiscordManager
|
from .discord_manager import ApiEncoder, DiscordManager
|
||||||
from .logger import create_logger
|
from .logger import create_logger
|
||||||
from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite,
|
from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite,
|
||||||
OverwriteType, Permissions, Role, TextChannel)
|
OverwriteType, Permissions, Role, TextChannel)
|
||||||
|
|
@ -23,19 +26,20 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
|
|
||||||
class Bot:
|
class Bot:
|
||||||
DEFAULT_MESSAGE_LIST_LIMIT = 50
|
DEFAULT_MESSAGE_LIST_LIMIT: int = 50
|
||||||
DISCORD_NAME_REGEX = r'([^a-z])'
|
DISCORD_NAME_REGEX: str = r'([^a-z])'
|
||||||
INIT_MESSAGE = ('Bot initialized.\nThis is the current configuration used.\n'
|
INIT_MESSAGE: str = ('Bot initialized.\nThis is the current configuration used.\n'
|
||||||
'You can upload a new one to update the configuration.')
|
'You can upload a new one to update the configuration.')
|
||||||
MAX_DOWNLOAD_SIZE: int = 50_000
|
MAX_DOWNLOAD_SIZE: int = 50_000
|
||||||
SUBS_LIST_MIN_SIZE = 50
|
SUBS_LIST_MIN_SIZE: int = 50
|
||||||
SUBS_LIST_SHORTS_RATIO = 5
|
SUBS_LIST_SHORTS_RATIO: int = 5
|
||||||
SUBS_LIST_VIDEO_RATIO = 2
|
SUBS_LIST_VIDEO_RATIO: int = 2
|
||||||
|
SUBS_SAVE_PATH: Path = Path('/tmp/breadtube-bot_subs.json')
|
||||||
|
|
||||||
class Task(Enum):
|
class Task(Enum):
|
||||||
DELETE_MESSAGES = 1
|
DELETE_MESSAGES = 1
|
||||||
SCAN_BOT_CHANNEL = 2
|
SCAN_BOT_CHANNEL = 2
|
||||||
INIT_SUBS = 3
|
REFRESH_SUBS = 3
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_code_version() -> str:
|
def _get_code_version() -> str:
|
||||||
|
|
@ -89,12 +93,15 @@ class Bot:
|
||||||
raise RuntimeError("Couldn't initialize bot channel/role/permission")
|
raise RuntimeError("Couldn't initialize bot channel/role/permission")
|
||||||
self.bot_channel: TextChannel = bot_channel
|
self.bot_channel: TextChannel = bot_channel
|
||||||
|
|
||||||
self._yt_subscriptions: Subscriptions = {}
|
self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger)
|
||||||
|
self._yt_subscriptions: Subscriptions = {
|
||||||
|
name: SubscriptionInfo.from_dict(info) for name, info in json.loads(
|
||||||
|
self.SUBS_SAVE_PATH.read_text(encoding='utf-8')).items()} if self.SUBS_SAVE_PATH.exists() else {}
|
||||||
self._scan_bot_channel()
|
self._scan_bot_channel()
|
||||||
self.tasks.append((
|
self.tasks.append((
|
||||||
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
|
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
|
||||||
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
||||||
self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger)
|
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
|
||||||
self.logger.info('Bot initialized')
|
self.logger.info('Bot initialized')
|
||||||
|
|
||||||
def init_bot_channel(self) -> TextChannel | None:
|
def init_bot_channel(self) -> TextChannel | None:
|
||||||
|
|
@ -139,6 +146,7 @@ class Bot:
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
def _scan_bot_channel(self): # noqa: PLR0915
|
def _scan_bot_channel(self): # noqa: PLR0915
|
||||||
|
self.logger.debug('Starting scanning bot channel')
|
||||||
messages = self._get_all_channel_messages(self.bot_channel)
|
messages = self._get_all_channel_messages(self.bot_channel)
|
||||||
init_message_found = False
|
init_message_found = False
|
||||||
new_config: Config | None = None
|
new_config: Config | None = None
|
||||||
|
|
@ -170,6 +178,8 @@ class Bot:
|
||||||
if new_config is None and content.startswith(b'config'):
|
if new_config is None and content.startswith(b'config'):
|
||||||
try:
|
try:
|
||||||
self.config = Config.from_str(content.decode())
|
self.config = Config.from_str(content.decode())
|
||||||
|
if self.config.to_str() != content.decode():
|
||||||
|
new_config = self.config
|
||||||
except RuntimeError as error:
|
except RuntimeError as error:
|
||||||
self.logger.error('Cannot load config from init message: %s', error)
|
self.logger.error('Cannot load config from init message: %s', error)
|
||||||
has_error = True
|
has_error = True
|
||||||
|
|
@ -180,7 +190,8 @@ class Bot:
|
||||||
SubscriptionHelper.update_subscriptions(
|
SubscriptionHelper.update_subscriptions(
|
||||||
new=subscriptions, previous=self._yt_subscriptions)
|
new=subscriptions, previous=self._yt_subscriptions)
|
||||||
self._yt_subscriptions = subscriptions
|
self._yt_subscriptions = subscriptions
|
||||||
self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None))
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
||||||
|
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
|
||||||
except RuntimeError as error:
|
except RuntimeError as error:
|
||||||
self.logger.error('Invalid init subscriptions file: %s', error)
|
self.logger.error('Invalid init subscriptions file: %s', error)
|
||||||
has_error = True
|
has_error = True
|
||||||
|
|
@ -254,7 +265,8 @@ class Bot:
|
||||||
self.logger.info('Loading subscriptions')
|
self.logger.info('Loading subscriptions')
|
||||||
SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions)
|
SubscriptionHelper.update_subscriptions(new=new_subscriptions, previous=self._yt_subscriptions)
|
||||||
self._yt_subscriptions = new_subscriptions
|
self._yt_subscriptions = new_subscriptions
|
||||||
self.tasks.append((Bot.Task.INIT_SUBS, time.time() + 1, None))
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
||||||
|
self.tasks.append((Bot.Task.REFRESH_SUBS, time.time() + 1, None))
|
||||||
|
|
||||||
# New init message is needed, previous need to be deleted
|
# New init message is needed, previous need to be deleted
|
||||||
if (new_config is not None or new_subscriptions is not None) and self.init_message is not None:
|
if (new_config is not None or new_subscriptions is not None) and self.init_message is not None:
|
||||||
|
|
@ -282,6 +294,7 @@ class Bot:
|
||||||
Bot.Task.DELETE_MESSAGES,
|
Bot.Task.DELETE_MESSAGES,
|
||||||
time.time() + self.config.bot_message_duration,
|
time.time() + self.config.bot_message_duration,
|
||||||
list(delayed_delete.values())))
|
list(delayed_delete.values())))
|
||||||
|
self.logger.debug('Bot channel scanned')
|
||||||
|
|
||||||
def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel],
|
def _get_subscription_channel(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel],
|
||||||
category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel:
|
category_ranges: list[tuple[int, int, ChannelCategory]]) -> TextChannel:
|
||||||
|
|
@ -315,11 +328,12 @@ class Bot:
|
||||||
max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count,
|
max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count,
|
||||||
request_timeout=self.config.request_timeout)
|
request_timeout=self.config.request_timeout)
|
||||||
video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list}
|
video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list}
|
||||||
|
yt_connection = http.client.HTTPSConnection('www.youtube.com', timeout=self.config.request_timeout)
|
||||||
for yt_info in yt_video_info.items:
|
for yt_info in yt_video_info.items:
|
||||||
if yt_info.id.videoId in video_ids:
|
if yt_info.id.videoId in video_ids:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if self.yt_manager.is_shorts(yt_info.id.videoId, request_timeout=self.config.request_timeout):
|
if self.yt_manager.is_shorts(yt_connection, yt_info.id.videoId):
|
||||||
subscription.shorts_list.append(yt_info)
|
subscription.shorts_list.append(yt_info)
|
||||||
else:
|
else:
|
||||||
subscription.video_list.append(yt_info)
|
subscription.video_list.append(yt_info)
|
||||||
|
|
@ -334,12 +348,82 @@ class Bot:
|
||||||
subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size]
|
subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size]
|
||||||
subscription.last_update = time.time()
|
subscription.last_update = time.time()
|
||||||
|
|
||||||
@staticmethod
|
def _video_message_content(self, video: SearchResultItem) -> str:
|
||||||
def _video_message_content(video: SearchResultItem) -> str:
|
return (self.config.youtube_channel_video_message
|
||||||
return f'https://www.youtube.com/video/{video.id.videoId}'
|
.replace('{{video_id}}', str(video.id.videoId))
|
||||||
|
.replace('{{video_title}}', str(html.unescape(video.snippet.title)))
|
||||||
|
.replace('{{video_description}}', str(video.snippet.description))
|
||||||
|
.replace('{{video_publish_time}}', video.snippet.publishTime.isoformat())
|
||||||
|
.replace('{{channel_id}}', str(video.snippet.channelId))
|
||||||
|
.replace('{{channel_title}}', str(video.snippet.channelTitle))
|
||||||
|
)
|
||||||
|
|
||||||
def _init_subs(self):
|
def _refresh_sub(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel],
|
||||||
self.logger.info('Initialize all subs')
|
category_ranges: list[tuple[int, int, ChannelCategory]]):
|
||||||
|
try:
|
||||||
|
sub_channel = self._get_subscription_channel(subscription, channel_dict, category_ranges)
|
||||||
|
except RuntimeError as error:
|
||||||
|
self.logger.error(error)
|
||||||
|
return
|
||||||
|
if subscription.channel_info is None:
|
||||||
|
_, channel_info = self.yt_manager.request_channel_info(
|
||||||
|
subscription.channel_id, request_timeout=self.config.request_timeout)
|
||||||
|
if not channel_info.items:
|
||||||
|
raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name)
|
||||||
|
subscription.channel_info = channel_info.items[0].snippet
|
||||||
|
|
||||||
|
self._refresh_subscription(subscription)
|
||||||
|
|
||||||
|
sub_init_message = f'https://www.youtube.com/{subscription.channel_info.customUrl}'
|
||||||
|
sub_messages = self._get_all_channel_messages(sub_channel)
|
||||||
|
if not sub_messages or sub_messages[-1].content != sub_init_message:
|
||||||
|
self.logger.debug('Clearing sub channel: %s', sub_channel.name)
|
||||||
|
for message in sub_messages:
|
||||||
|
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
|
||||||
|
_ = self.discord_manager.create_message(
|
||||||
|
sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout)
|
||||||
|
else:
|
||||||
|
messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count]))
|
||||||
|
yt_videos = list(reversed(subscription.video_list[:self.config.youtube_channel_video_count]))
|
||||||
|
immediate_delete: dict[int, Message] = {
|
||||||
|
m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]}
|
||||||
|
last_matching_index = 0
|
||||||
|
stop_scan = False
|
||||||
|
for yt_video in yt_videos:
|
||||||
|
for index, message in enumerate(messages[last_matching_index:], start=last_matching_index):
|
||||||
|
if message.content != self._video_message_content(yt_video):
|
||||||
|
if last_matching_index != 0:
|
||||||
|
stop_scan = True
|
||||||
|
break
|
||||||
|
self.logger.debug('Unmatched video: %s', yt_video.id.videoId)
|
||||||
|
immediate_delete[message.id] = message
|
||||||
|
else:
|
||||||
|
self.logger.debug('Matched video: %s', yt_video.id.videoId)
|
||||||
|
last_matching_index = index + 1
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.logger.debug('All videos scanned')
|
||||||
|
break
|
||||||
|
if stop_scan:
|
||||||
|
break
|
||||||
|
for message in messages[last_matching_index:]:
|
||||||
|
immediate_delete[message.id] = message
|
||||||
|
|
||||||
|
for message in immediate_delete.values():
|
||||||
|
try:
|
||||||
|
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
|
||||||
|
except RuntimeError as error:
|
||||||
|
self.logger.error('Error deleting message %s from channel %s : %s',
|
||||||
|
message.id, sub_channel.name, error)
|
||||||
|
for video in yt_videos[last_matching_index:]:
|
||||||
|
_ = self.discord_manager.create_message(
|
||||||
|
sub_channel, {'content': self._video_message_content(video)},
|
||||||
|
request_timeout=self.config.request_timeout)
|
||||||
|
|
||||||
|
subscription.last_update = time.time()
|
||||||
|
|
||||||
|
def _refresh_subs(self):
|
||||||
|
self.logger.info('Start refreshing subs')
|
||||||
categories, text_channel = self.discord_manager.list_channels(
|
categories, text_channel = self.discord_manager.list_channels(
|
||||||
self.guild_id, request_timeout=self.config.request_timeout)
|
self.guild_id, request_timeout=self.config.request_timeout)
|
||||||
self.guild_text_channels = text_channel
|
self.guild_text_channels = text_channel
|
||||||
|
|
@ -359,68 +443,16 @@ class Bot:
|
||||||
category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category))
|
category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category))
|
||||||
category_ranges = sorted(category_ranges, key=operator.itemgetter(0))
|
category_ranges = sorted(category_ranges, key=operator.itemgetter(0))
|
||||||
|
|
||||||
for sub_info in self._yt_subscriptions.values():
|
sorted_subs = sorted(self._yt_subscriptions.values(), key=lambda s: s.last_update)
|
||||||
|
for sub_info in sorted_subs:
|
||||||
try:
|
try:
|
||||||
sub_channel = self._get_subscription_channel(sub_info, channel_dict, category_ranges)
|
self._refresh_sub(sub_info, channel_dict, category_ranges)
|
||||||
except RuntimeError as error:
|
except RuntimeError as error:
|
||||||
self.logger.error(error)
|
self.logger.error('Refreshing subscription %s failed: %s', sub_info.channel_id, error)
|
||||||
continue
|
except TimeoutError as error:
|
||||||
if sub_info.channel_info is None:
|
self.logger.error('Timeout error refreshing subcription: %s', error)
|
||||||
_, channel_info = self.yt_manager.request_channel_info(
|
break
|
||||||
sub_info.channel_id, request_timeout=self.config.request_timeout)
|
self.logger.info('Subs refreshed')
|
||||||
if not channel_info.items:
|
|
||||||
raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name)
|
|
||||||
sub_info.channel_info = channel_info.items[0].snippet
|
|
||||||
|
|
||||||
self._refresh_subscription(sub_info)
|
|
||||||
|
|
||||||
sub_init_message = f'https://www.youtube.com/{sub_info.channel_info.customUrl}'
|
|
||||||
sub_messages = self._get_all_channel_messages(sub_channel)
|
|
||||||
if not sub_messages or sub_messages[-1].content != sub_init_message:
|
|
||||||
self.logger.debug('Clearing sub channel: %s', sub_channel.name)
|
|
||||||
for message in sub_messages:
|
|
||||||
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
|
|
||||||
_ = self.discord_manager.create_message(
|
|
||||||
sub_channel, {'content': sub_init_message}, request_timeout=self.config.request_timeout)
|
|
||||||
else:
|
|
||||||
messages = list(reversed(sub_messages[:-1][:self.config.youtube_channel_video_count]))
|
|
||||||
yt_videos = list(reversed(sub_info.video_list[:self.config.youtube_channel_video_count]))
|
|
||||||
immediate_delete: dict[int, Message] = {
|
|
||||||
m.id: m for m in sub_messages[self.config.youtube_channel_video_count:-1]}
|
|
||||||
last_matching_index = 0
|
|
||||||
stop_scan = False
|
|
||||||
for yt_video in yt_videos:
|
|
||||||
for index, message in enumerate(messages[last_matching_index:], start=last_matching_index):
|
|
||||||
if message.content != self._video_message_content(yt_video):
|
|
||||||
if last_matching_index != 0:
|
|
||||||
stop_scan = True
|
|
||||||
break
|
|
||||||
self.logger.debug('Unmatched video: %s', yt_video.id.videoId)
|
|
||||||
immediate_delete[message.id] = message
|
|
||||||
else:
|
|
||||||
self.logger.debug('Matched video: %s', yt_video.id.videoId)
|
|
||||||
last_matching_index = index + 1
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.logger.debug('All videos scanned')
|
|
||||||
break
|
|
||||||
if stop_scan:
|
|
||||||
break
|
|
||||||
for message in messages[last_matching_index:]:
|
|
||||||
immediate_delete[message.id] = message
|
|
||||||
|
|
||||||
for message in immediate_delete.values():
|
|
||||||
try:
|
|
||||||
self.discord_manager.delete_message(message, request_timeout=self.config.request_timeout)
|
|
||||||
except RuntimeError as error:
|
|
||||||
self.logger.error('Error deleting message %s from channel %s : %s',
|
|
||||||
message.id, sub_channel.name, error)
|
|
||||||
for video in yt_videos[last_matching_index:]:
|
|
||||||
_ = self.discord_manager.create_message(
|
|
||||||
sub_channel, {'content': self._video_message_content(video)},
|
|
||||||
request_timeout=self.config.request_timeout)
|
|
||||||
|
|
||||||
sub_info.last_update = time.time()
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while self.tasks:
|
while self.tasks:
|
||||||
|
|
@ -453,13 +485,17 @@ class Bot:
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
self.logger.error('Error scanning bot channel: %s -> %s',
|
self.logger.error('Error scanning bot channel: %s -> %s',
|
||||||
error, traceback.format_exc().replace('\n', ' | '))
|
error, traceback.format_exc().replace('\n', ' | '))
|
||||||
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.SCAN_BOT_CHANNEL, self.tasks))
|
||||||
self.tasks.append((
|
self.tasks.append((
|
||||||
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
|
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
|
||||||
case Bot.Task.INIT_SUBS:
|
case Bot.Task.REFRESH_SUBS:
|
||||||
try:
|
try:
|
||||||
self._init_subs()
|
self._refresh_subs()
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
self.logger.error('Error initializing subscriptions : %s -> %s',
|
self.logger.error('Error initializing subscriptions : %s -> %s',
|
||||||
error, traceback.format_exc().replace('\n', ' | '))
|
error, traceback.format_exc().replace('\n', ' | '))
|
||||||
|
self.SUBS_SAVE_PATH.write_text(
|
||||||
|
json.dumps(self._yt_subscriptions, cls=ApiEncoder, ensure_ascii=False), encoding='utf-8')
|
||||||
|
self.tasks = list(filter(lambda t: t[0] != Bot.Task.REFRESH_SUBS, self.tasks))
|
||||||
self.tasks.append((
|
self.tasks.append((
|
||||||
self.Task.INIT_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None))
|
self.Task.REFRESH_SUBS, time.time() + self.config.youtube_channel_refresh_interval, None))
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ class Config:
|
||||||
unmanaged_categories: str = ''
|
unmanaged_categories: str = ''
|
||||||
youtube_channel_refresh_interval: float = 3600
|
youtube_channel_refresh_interval: float = 3600
|
||||||
youtube_channel_video_count: int = 10
|
youtube_channel_video_count: int = 10
|
||||||
|
youtube_channel_video_message: str = '[{{video_title}}](https://www.youtube.com/video/{{video_id}})'
|
||||||
|
|
||||||
def to_str(self) -> str:
|
def to_str(self) -> str:
|
||||||
return '\n'.join(['config', *[f'{k}={v}' for k, v in asdict(self).items()]])
|
return '\n'.join(['config', *[f'{k}={v}' for k, v in asdict(self).items()]])
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import asdict, dataclass, is_dataclass
|
from dataclasses import asdict, dataclass, is_dataclass
|
||||||
|
from datetime import datetime
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
import json
|
import json
|
||||||
import random
|
import random
|
||||||
|
|
@ -22,11 +23,14 @@ class ApiEncoder(json.JSONEncoder):
|
||||||
return asdict(o) # type: ignore
|
return asdict(o) # type: ignore
|
||||||
if isinstance(o, Enum):
|
if isinstance(o, Enum):
|
||||||
return o.value
|
return o.value
|
||||||
|
if isinstance(o, datetime):
|
||||||
|
return o.isoformat()
|
||||||
return super().default(o)
|
return super().default(o)
|
||||||
|
|
||||||
|
|
||||||
class DiscordManager:
|
class DiscordManager:
|
||||||
MIN_API_VERSION = 9
|
MIN_API_VERSION = 9
|
||||||
|
TOO_MANY_REQUEST_STATUS = 429
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RateLimit:
|
class RateLimit:
|
||||||
|
|
@ -85,15 +89,31 @@ class DiscordManager:
|
||||||
else:
|
else:
|
||||||
request.add_header('Content-Type', 'application/json')
|
request.add_header('Content-Type', 'application/json')
|
||||||
request.add_header('Authorization', f'Bot {self._bot_token}')
|
request.add_header('Authorization', f'Bot {self._bot_token}')
|
||||||
try:
|
|
||||||
|
def _request() -> tuple[int, dict, bytes | None]:
|
||||||
|
nonlocal request, request_timeout
|
||||||
with urllib.request.urlopen(request, timeout=request_timeout) as response:
|
with urllib.request.urlopen(request, timeout=request_timeout) as response:
|
||||||
if response.status != expected_code:
|
|
||||||
raise RuntimeError(
|
|
||||||
f'Unexpected code {response.status} (expected: {expected_code}) -> {response.read().decode()}')
|
|
||||||
body = response.read()
|
|
||||||
headers = dict(response.getheaders())
|
headers = dict(response.getheaders())
|
||||||
|
return response.status, headers, response.read()
|
||||||
|
|
||||||
|
try:
|
||||||
|
body = b''
|
||||||
|
try:
|
||||||
|
status, headers, body = _request()
|
||||||
|
except urllib.error.HTTPError as error:
|
||||||
|
if error.status != self.TOO_MANY_REQUEST_STATUS:
|
||||||
|
raise error
|
||||||
|
status = error.status
|
||||||
|
headers = dict(error.headers)
|
||||||
|
|
||||||
|
self._update_rate_limit(headers)
|
||||||
|
if status == self.TOO_MANY_REQUEST_STATUS:
|
||||||
|
self._logger.warning('Warning: too many request -> retrying')
|
||||||
|
status, headers, body = _request()
|
||||||
self._update_rate_limit(headers)
|
self._update_rate_limit(headers)
|
||||||
return headers, json.loads(body.decode()) if body else None
|
if status != expected_code:
|
||||||
|
raise RuntimeError(f'Unexpected code {status} (expected: {expected_code}) -> {body}')
|
||||||
|
return headers, json.loads(body.decode()) if body else None
|
||||||
except urllib.error.HTTPError as error:
|
except urllib.error.HTTPError as error:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'HTTP error calling API ({url}): {error}:\nHeaders:\n{error.headers}Body:\n{error.read()}') from error
|
f'HTTP error calling API ({url}): {error}:\nHeaders:\n{error.headers}Body:\n{error.read()}') from error
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import http.client
|
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
@ -12,12 +11,13 @@ from .youtube_objects import ChannelResult, SearchResult
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
import http.client
|
||||||
import logging
|
import logging
|
||||||
from .objects import HTTPHeaders
|
from .objects import HTTPHeaders
|
||||||
|
|
||||||
|
|
||||||
class YoutubeManager:
|
class YoutubeManager:
|
||||||
DEFAULT_DAILY_REQUESTS = 10_000
|
DEFAULT_DAILY_POINTS = 10_000
|
||||||
SHORTS_CHECK_STATUS = 303
|
SHORTS_CHECK_STATUS = 303
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
@ -28,12 +28,12 @@ class YoutubeManager:
|
||||||
def __init__(self, api_key: str, logger: logging.Logger):
|
def __init__(self, api_key: str, logger: logging.Logger):
|
||||||
self._api_key = api_key
|
self._api_key = api_key
|
||||||
self._logger = logger
|
self._logger = logger
|
||||||
self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_REQUESTS, next_reset=time.time() + 24 * 3600)
|
self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_POINTS, next_reset=time.time() + 24 * 3600)
|
||||||
|
|
||||||
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]:
|
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]:
|
||||||
if time.time() >= self.rate_limit.next_reset:
|
if time.time() >= self.rate_limit.next_reset:
|
||||||
self.rate_limit.next_reset = time.time() + 24 * 3600
|
self.rate_limit.next_reset = time.time() + 24 * 3600
|
||||||
self.rate_limit.remaining = self.DEFAULT_DAILY_REQUESTS
|
self.rate_limit.remaining = self.DEFAULT_DAILY_POINTS
|
||||||
elif self.rate_limit.remaining <= 0:
|
elif self.rate_limit.remaining <= 0:
|
||||||
sleep_time = time.time() - self.rate_limit.next_reset
|
sleep_time = time.time() - self.rate_limit.next_reset
|
||||||
self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time)
|
self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time)
|
||||||
|
|
@ -58,11 +58,13 @@ class YoutubeManager:
|
||||||
except TimeoutError as error:
|
except TimeoutError as error:
|
||||||
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
|
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
|
||||||
|
|
||||||
def is_shorts(self, video_id: str, request_timeout: float) -> bool:
|
def is_shorts(self, connection: http.client.HTTPConnection, video_id: str) -> bool:
|
||||||
try:
|
try:
|
||||||
connection = http.client.HTTPSConnection('www.youtube.com', timeout=request_timeout)
|
endpoint = f'/shorts/{video_id}'
|
||||||
connection.request('GET', f'/shorts/{video_id}')
|
self._logger.debug('YoutubeManager: Checking for shorts: %s', endpoint)
|
||||||
|
connection.request('GET', endpoint)
|
||||||
response = connection.getresponse()
|
response = connection.getresponse()
|
||||||
|
response.read()
|
||||||
return response.status != self.SHORTS_CHECK_STATUS
|
return response.status != self.SHORTS_CHECK_STATUS
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error
|
raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error
|
||||||
|
|
@ -71,6 +73,7 @@ class YoutubeManager:
|
||||||
HTTPHeaders, ChannelResult]:
|
HTTPHeaders, ChannelResult]:
|
||||||
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
|
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
|
||||||
f'&id={channel_id}&key={self._api_key}')
|
f'&id={channel_id}&key={self._api_key}')
|
||||||
|
self._logger.debug('YoutubeManager: request channel info for channel %s', channel_id)
|
||||||
headers, info = self._request(url=url, request_timeout=request_timeout)
|
headers, info = self._request(url=url, request_timeout=request_timeout)
|
||||||
return headers, ChannelResult.from_dict(info)
|
return headers, ChannelResult.from_dict(info)
|
||||||
|
|
||||||
|
|
@ -78,5 +81,6 @@ class YoutubeManager:
|
||||||
HTTPHeaders, SearchResult]:
|
HTTPHeaders, SearchResult]:
|
||||||
url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}'
|
url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}'
|
||||||
f'&maxResults={max_results}&order=date&type=video&key={self._api_key}')
|
f'&maxResults={max_results}&order=date&type=video&key={self._api_key}')
|
||||||
|
self._logger.debug('YoutubeManager: request channel videos for channel %s', channel_id)
|
||||||
headers, info = self._request(url=url, request_timeout=request_timeout)
|
headers, info = self._request(url=url, request_timeout=request_timeout)
|
||||||
return headers, SearchResult.from_dict(info)
|
return headers, SearchResult.from_dict(info)
|
||||||
|
|
|
||||||
|
|
@ -74,7 +74,7 @@ class Result(Generic[T_api]):
|
||||||
etag=info['etag'],
|
etag=info['etag'],
|
||||||
nextPageToken=info.get('nextPageToken'),
|
nextPageToken=info.get('nextPageToken'),
|
||||||
pageInfo=PageInfo.from_dict(info['pageInfo']),
|
pageInfo=PageInfo.from_dict(info['pageInfo']),
|
||||||
items=[item_type.from_dict(i) for i in info['items']])
|
items=[item_type.from_dict(i) for i in info.get('items', [])])
|
||||||
|
|
||||||
|
|
||||||
# Channel Objects
|
# Channel Objects
|
||||||
|
|
@ -87,7 +87,7 @@ class ChannelSnippet(_Api):
|
||||||
customUrl: str
|
customUrl: str
|
||||||
publishedAt: datetime
|
publishedAt: datetime
|
||||||
thumbnails: Thumbnails
|
thumbnails: Thumbnails
|
||||||
country: str
|
country: str | None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_dict(info: dict) -> ChannelSnippet:
|
def from_dict(info: dict) -> ChannelSnippet:
|
||||||
|
|
@ -97,7 +97,7 @@ class ChannelSnippet(_Api):
|
||||||
customUrl=info['customUrl'],
|
customUrl=info['customUrl'],
|
||||||
publishedAt=datetime.fromisoformat(info['publishedAt']),
|
publishedAt=datetime.fromisoformat(info['publishedAt']),
|
||||||
thumbnails=Thumbnails.from_dict(info['thumbnails']),
|
thumbnails=Thumbnails.from_dict(info['thumbnails']),
|
||||||
country=info['country'])
|
country=info.get('country'))
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
from .youtube_objects import ChannelSnippet, SearchResultItem
|
from .youtube_objects import ChannelSnippet, SearchResultItem
|
||||||
|
|
@ -12,6 +14,17 @@ class SubscriptionInfo:
|
||||||
shorts_list: list[SearchResultItem] = field(default_factory=list)
|
shorts_list: list[SearchResultItem] = field(default_factory=list)
|
||||||
video_list: list[SearchResultItem] = field(default_factory=list)
|
video_list: list[SearchResultItem] = field(default_factory=list)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_dict(info: dict) -> SubscriptionInfo:
|
||||||
|
channel_info: dict | None = info.get('channel_info')
|
||||||
|
return SubscriptionInfo(
|
||||||
|
name=info['name'],
|
||||||
|
channel_id=info['channel_id'],
|
||||||
|
last_update=info['last_update'],
|
||||||
|
channel_info=ChannelSnippet.from_dict(channel_info) if channel_info is not None else None,
|
||||||
|
shorts_list=[SearchResultItem.from_dict(s) for s in info['shorts_list']],
|
||||||
|
video_list=[SearchResultItem.from_dict(s) for s in info['video_list']])
|
||||||
|
|
||||||
|
|
||||||
Subscriptions = dict[str, SubscriptionInfo]
|
Subscriptions = dict[str, SubscriptionInfo]
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue