Implement YouTube manager

This commit is contained in:
BreadTube 2025-09-26 20:59:38 +09:00 committed by Corentin
commit b958f4fe55
8 changed files with 253 additions and 38 deletions

View file

@ -11,6 +11,8 @@ class Config:
bot_channel_init_retries: int = 3
bot_message_duration: float = 150.
request_timeout: float = 3.
youtube_channel_refresh_interval: float = 3600
youtube_channel_video_count: int = 10
def to_str(self) -> str:
return '\n'.join(['config', *[f'{k}={v}' for k, v in asdict(self).items()]])

View file

@ -14,15 +14,14 @@ import urllib.error
import urllib.request
import traceback
from breadtube_bot.youtube_api import YoutubeManager
from .api import Api, ApiAction, ApiVersion
from .config import Config
from .logger import create_logger
from .objects import (
Attachment, ChannelCategory, ChannelType, FileMime, Message, MessageReference, MessageReferenceType, Overwrite,
OverwriteType, Permissions, Role, TextChannel, User)
HTTPHeaders = dict[str, str]
Attachment, ChannelCategory, ChannelType, FileMime, HTTPHeaders, Message, MessageReference, MessageReferenceType,
Overwrite, OverwriteType, Permissions, Role, TextChannel, User)
class ApiEncoder(json.JSONEncoder):
@ -45,6 +44,12 @@ class DiscordManager:
remaining: int
next_reset: float
@dataclass
class YoutTubeChannel:
name: str
channel_id: str
last_update: float
class Task(Enum):
SCAN_BOT_CHANNEL = 1
DELETE_MESSAGES = 2
@ -56,7 +61,7 @@ class DiscordManager:
raise RuntimeError('Cannot current bot version')
return tomllib.loads(pyproject_path.read_text(encoding='utf-8'))['project']['version']
def __init__(self, bot_token: str, guild_id: int, config: Config | None = None,
def __init__(self, bot_token: str, guild_id: int, yt_api_key: str, config: Config | None = None,
log_level: int = logging.INFO) -> None:
self.config = config or Config()
self.guild_id = guild_id
@ -73,6 +78,7 @@ class DiscordManager:
self.guild_roles: list = self.list_roles()
self.bot_channel: TextChannel | None = None
self.init_message: Message | None = None
for _ in range(self.config.bot_channel_init_retries):
while not self.init_bot_channel():
time.sleep(10)
@ -85,6 +91,8 @@ class DiscordManager:
self._scan_bot_channel()
self.tasks.append((
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger)
self.logger.info('Bot initialized')
def _update_rate_limit(self, headers: HTTPHeaders):

View file

@ -5,6 +5,9 @@ from datetime import datetime
from enum import Enum, IntFlag
HTTPHeaders = dict[str, str]
class FileMime(Enum):
AUDIO_OGG = 'audio/ogg'
IMAGE_JPEG = 'image/jpeg'

View file

@ -0,0 +1,71 @@
from __future__ import annotations
from dataclasses import dataclass
import json
import time
from typing import TYPE_CHECKING
import urllib.error
import urllib.request
from breadtube_bot.yt_objects import SearchResult
if TYPE_CHECKING:
import logging
from .objects import HTTPHeaders
class YoutubeManager:
DEFAULT_DAILY_REQUESTS = 10_000
@dataclass
class RateLimit:
remaining: int
next_reset: float
def __init__(self, api_key: str, logger: logging.Logger):
self._api_key = api_key
self._logger = logger
self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_REQUESTS, next_reset=time.time() + 24 * 3600)
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]:
if time.time() >= self.rate_limit.next_reset:
self.rate_limit.next_reset = time.time() + 24 * 3600
self.rate_limit.remaining = self.DEFAULT_DAILY_REQUESTS
elif self.rate_limit.remaining <= 0:
sleep_time = time.time() - self.rate_limit.next_reset
self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time)
time.sleep(sleep_time)
self.rate_limit.remaining -= 1
request = urllib.request.Request(url)
request.add_header('Accept', 'application/json')
try:
with urllib.request.urlopen(request, timeout=request_timeout) as response:
if response.status != expected_status:
raise RuntimeError(
f'Unexpected YT status {response.status} (expected: {expected_status})'
f' -> {response.read().decode()}')
return dict(response.getheaders()), json.loads(response.read().decode())
except urllib.error.HTTPError as error:
raise RuntimeError(
f'HTTP error calling API ({url}): {error}:\n'
f'Headers:\n{error.headers}Body:\n{error.read()}') from error
except urllib.error.URLError as error:
raise RuntimeError(f'URL error calling YT API ({url}): {error}') from error
except TimeoutError as error:
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
def request_channel_info(self, channel_id: str, request_timeout: float) -> tuple[
HTTPHeaders, dict]:
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
f'&id={channel_id}&key={self._api_key}')
headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, info
def request_channel_videos(self, channel_id: str, max_results: int, request_timeout: float) -> tuple[
HTTPHeaders, SearchResult]:
url = ('https://www.googleapis.com/youtube/v3/search?part=snippet'
f'&channelId={channel_id}&maxResults={max_results}&order=date&key={self._api_key}')
headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, SearchResult.from_dict(info)

157
breadtube_bot/yt_objects.py Normal file
View file

@ -0,0 +1,157 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from types import get_original_bases
from typing import Generic, Self, TypeVar, get_args
T = TypeVar('T')
class _Api(Generic[T], ABC):
@staticmethod
@abstractmethod
def from_dict(info: dict) -> T:
raise NotImplementedError
T_api = TypeVar('T_api', bound=_Api)
# Generic Objects
@dataclass
class PageInfo(_Api):
totalResults: int
resultsPerPage: int
@staticmethod
def from_dict(info: dict) -> PageInfo:
return PageInfo(totalResults=info['totalResults'], resultsPerPage=info['resultsPerPage'])
@dataclass
class SearchResultId(_Api):
kind: str
videoId: str
@staticmethod
def from_dict(info: dict) -> SearchResultId:
return SearchResultId(kind=info['kind'], videoId=info['videoId'])
@dataclass
class ThumbnailInfo(_Api):
url: str
width: int
height: int
@staticmethod
def from_dict(info: dict) -> ThumbnailInfo:
return ThumbnailInfo(url=info['url'], width=info['width'], height=info['height'])
@dataclass
class Thumbnails(_Api):
default: ThumbnailInfo
medium: ThumbnailInfo
high: ThumbnailInfo
@staticmethod
def from_dict(info: dict) -> Thumbnails:
return Thumbnails(
default=ThumbnailInfo.from_dict(info['default']),
medium=ThumbnailInfo.from_dict(info['medium']),
high=ThumbnailInfo.from_dict(info['high']))
@dataclass
class Result(Generic[T_api]):
kind: str
etag: str
nextPageToken: str
pageInfo: PageInfo
items: list[T_api]
@classmethod
def from_dict(cls, info: dict) -> Self:
item_type = get_args(get_original_bases(cls)[0])[0]
return cls(
kind=info['kind'],
etag=info['etag'],
nextPageToken=info['nextPageToken'],
pageInfo=PageInfo.from_dict(info['pageInfo']),
items=[item_type.from_dict(i) for i in info['items']])
# Channel Objects
@dataclass
class ChannelResultItem(_Api):
kind: str
etag: str
id: SearchResultId
snippet: SearchResultSnippet
@staticmethod
def from_dict(info: dict) -> SearchResultItem:
return SearchResultItem(
kind=info['kind'],
etag=info['etag'],
id=SearchResultId.from_dict(info['id']),
snippet=SearchResultSnippet.from_dict(info['snippet']))
class ChannelResult(Result[ChannelResultItem]):
pass
# Search Objects
@dataclass
class SearchResultSnippet(_Api):
publishedAt: datetime
channelId: str
title: str
description: str
thumbnails: Thumbnails
channelTitle: str
liveBroadcastContent: str
publishTime: datetime
@staticmethod
def from_dict(info: dict) -> SearchResultSnippet:
return SearchResultSnippet(
publishedAt=datetime.fromisoformat(info['publishedAt']),
channelId=info['channelId'],
title=info['title'],
description=info['description'],
thumbnails=Thumbnails.from_dict(info['thumbnails']),
channelTitle=info['channelTitle'],
liveBroadcastContent=info['liveBroadcastContent'],
publishTime=datetime.fromisoformat(info['publishTime']))
@dataclass
class SearchResultItem(_Api):
kind: str
etag: str
id: SearchResultId
snippet: SearchResultSnippet
@staticmethod
def from_dict(info: dict) -> SearchResultItem:
return SearchResultItem(
kind=info['kind'],
etag=info['etag'],
id=SearchResultId.from_dict(info['id']),
snippet=SearchResultSnippet.from_dict(info['snippet']))
class SearchResult(Result[SearchResultItem]):
pass