Implement video message configuration

* Config is update when new key is added (always fully explicit)
This commit is contained in:
BreadTube 2025-10-29 21:32:45 +09:00
commit f59982f745
3 changed files with 36 additions and 9 deletions

View file

@ -11,6 +11,7 @@ import time
import tomllib import tomllib
from typing import Any, TYPE_CHECKING from typing import Any, TYPE_CHECKING
import traceback import traceback
import urllib.parse
from .config import Config from .config import Config
from .discord_manager import ApiEncoder, DiscordManager from .discord_manager import ApiEncoder, DiscordManager
@ -177,6 +178,8 @@ class Bot:
if new_config is None and content.startswith(b'config'): if new_config is None and content.startswith(b'config'):
try: try:
self.config = Config.from_str(content.decode()) self.config = Config.from_str(content.decode())
if self.config.to_str() != content.decode():
new_config = self.config
except RuntimeError as error: except RuntimeError as error:
self.logger.error('Cannot load config from init message: %s', error) self.logger.error('Cannot load config from init message: %s', error)
has_error = True has_error = True
@ -345,9 +348,15 @@ class Bot:
subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size] subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size]
subscription.last_update = time.time() subscription.last_update = time.time()
@staticmethod def _video_message_content(self, video: SearchResultItem) -> str:
def _video_message_content(video: SearchResultItem) -> str: return (self.config.youtube_channel_video_message
return f'https://www.youtube.com/video/{video.id.videoId}' .replace('{{video_id}}', str(video.id.videoId))
.replace('{{video_title}}', str(urllib.parse.unquote(video.snippet.title)))
.replace('{{video_description}}', str(video.snippet.description))
.replace('{{video_publish_time}}', video.snippet.publishTime.isoformat())
.replace('{{channel_id}}', str(video.snippet.channelId))
.replace('{{channel_title}}', str(video.snippet.channelTitle))
)
def _refresh_sub(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel], def _refresh_sub(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel],
category_ranges: list[tuple[int, int, ChannelCategory]]): category_ranges: list[tuple[int, int, ChannelCategory]]):

View file

@ -14,6 +14,7 @@ class Config:
unmanaged_categories: str = '' unmanaged_categories: str = ''
youtube_channel_refresh_interval: float = 3600 youtube_channel_refresh_interval: float = 3600
youtube_channel_video_count: int = 10 youtube_channel_video_count: int = 10
youtube_channel_video_message: str = '[{{video_title}}](https://www.youtube.com/video/{{video_id}})'
def to_str(self) -> str: def to_str(self) -> str:
return '\n'.join(['config', *[f'{k}={v}' for k, v in asdict(self).items()]]) return '\n'.join(['config', *[f'{k}={v}' for k, v in asdict(self).items()]])

View file

@ -30,6 +30,7 @@ class ApiEncoder(json.JSONEncoder):
class DiscordManager: class DiscordManager:
MIN_API_VERSION = 9 MIN_API_VERSION = 9
TOO_MANY_REQUEST_STATUS = 429
@dataclass @dataclass
class RateLimit: class RateLimit:
@ -88,14 +89,30 @@ class DiscordManager:
else: else:
request.add_header('Content-Type', 'application/json') request.add_header('Content-Type', 'application/json')
request.add_header('Authorization', f'Bot {self._bot_token}') request.add_header('Authorization', f'Bot {self._bot_token}')
try:
def _request() -> tuple[int, dict, bytes | None]:
nonlocal request, request_timeout
with urllib.request.urlopen(request, timeout=request_timeout) as response: with urllib.request.urlopen(request, timeout=request_timeout) as response:
if response.status != expected_code:
raise RuntimeError(
f'Unexpected code {response.status} (expected: {expected_code}) -> {response.read().decode()}')
body = response.read()
headers = dict(response.getheaders()) headers = dict(response.getheaders())
return response.status, headers, response.read()
try:
body = b''
try:
status, headers, body = _request()
except urllib.error.HTTPError as error:
if error.status != self.TOO_MANY_REQUEST_STATUS:
raise error
status = error.status
headers = dict(error.headers)
self._update_rate_limit(headers) self._update_rate_limit(headers)
if status == self.TOO_MANY_REQUEST_STATUS:
self._logger.warning('Warning: too many request -> retrying')
status, headers, body = _request()
self._update_rate_limit(headers)
if status != expected_code:
raise RuntimeError(f'Unexpected code {status} (expected: {expected_code}) -> {body}')
return headers, json.loads(body.decode()) if body else None return headers, json.loads(body.decode()) if body else None
except urllib.error.HTTPError as error: except urllib.error.HTTPError as error:
raise RuntimeError( raise RuntimeError(