breadtube-bot/breadtube_bot/youtube_manager.py
2026-01-09 00:25:10 +09:00

123 lines
5.7 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
import time
from typing import TYPE_CHECKING
import urllib.error
import urllib.request
from xml.etree import ElementTree as ET
from .youtube_subscription import ChannelInfo, ThumbnailInfo, VideoInfo
if TYPE_CHECKING:
import http.client
import logging
from .objects import HTTPHeaders
class YoutubeManager:
DEFAULT_DAILY_POINTS = 10_000
SHORTS_CHECK_STATUS = 303
@dataclass
class RateLimit:
remaining: int
next_reset: float
def __init__(self, logger: logging.Logger):
self._logger = logger
self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_POINTS, next_reset=time.time() + 24 * 3600)
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, str]:
if time.time() >= self.rate_limit.next_reset:
self.rate_limit.next_reset = time.time() + 24 * 3600
self.rate_limit.remaining = self.DEFAULT_DAILY_POINTS
elif self.rate_limit.remaining <= 0:
sleep_time = time.time() - self.rate_limit.next_reset
self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time)
time.sleep(sleep_time)
self.rate_limit.remaining -= 1
request = urllib.request.Request(url)
# request.add_header('Accept', 'application/json')
try:
with urllib.request.urlopen(request, timeout=request_timeout) as response:
if response.status != expected_status:
raise RuntimeError(
f'Unexpected YT status {response.status} (expected: {expected_status})'
f' -> {response.read().decode()}')
return dict(response.getheaders()), response.read().decode()
except urllib.error.HTTPError as error:
raise RuntimeError(
f'HTTP error calling API ({url}): {error}:\n'
f'Headers:\n{error.headers}Body:\n{error.read()}') from error
except urllib.error.URLError as error:
raise RuntimeError(f'URL error calling YT API ({url}): {error}') from error
except TimeoutError as error:
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
def is_shorts(self, connection: http.client.HTTPConnection, video_id: str) -> bool:
try:
endpoint = f'/shorts/{video_id}'
self._logger.debug('YoutubeManager: Checking for shorts: %s', endpoint)
connection.request('GET', endpoint)
response = connection.getresponse()
response.read()
return response.status != self.SHORTS_CHECK_STATUS
except Exception as error:
raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error
@staticmethod
def _parse_rss_data(data) -> tuple[ChannelInfo, list[VideoInfo]]:
videos: list[VideoInfo] = []
root = ET.parse(data)
author = root.find('{*}author')
channel_info = ChannelInfo(
channel_id=root.find('{*}channelId').text, # type: ignore
title=author.find('{*}name').text, # type: ignore
url=author.find('{*}uri').text) # type: ignore
for entry in root.findall('{*}entry'):
media = entry.find('{*}group') # type: ignore
thumbnail = media.find('{*}thumbnail') # type: ignore
videos.append(VideoInfo(
video_id=entry.find('{*}videoId').text, # type: ignore
title=entry.find('{*}title').text, # type: ignore
description=media.find('{*}description').text, # type: ignore
url=entry.find('{*}link').get('href'), # type: ignore
thumbnail=ThumbnailInfo(
url=thumbnail.get('url'), # type: ignore
width=thumbnail.get('width'), # type: ignore
height=thumbnail.get('height')), # type: ignore
published=datetime.fromisoformat(entry.find('{*}published').text), # type: ignore
updated=datetime.fromisoformat(entry.find('{*}updated').text) # type: ignore
))
return channel_info, videos
def request_channel_videos(self, connection: http.client.HTTPConnection, channel_id: str,
expected_status: int = 200) -> tuple[HTTPHeaders, ChannelInfo, list[VideoInfo]]:
url = '/feeds/videos.xml?playlist_id='
url += f'UULF{channel_id[2:]}' if channel_id.startswith('UC') else f'{channel_id}'
self._logger.debug('YoutubeManager: request channel videos for channel %s', channel_id)
try:
connection.request('GET', url)
response = connection.getresponse()
headers = dict(response.getheaders())
except urllib.error.HTTPError as error:
raise RuntimeError(
f'HTTP error calling {url}: {error}:\n'
f'Headers:\n{error.headers}Body:\n{error.read()}') from error
except urllib.error.URLError as error:
raise RuntimeError(f'URL error calling {url}: {error}') from error
except TimeoutError as error:
raise RuntimeError(f'Timeout calling {url}: {error}') from error
except Exception as error:
raise RuntimeError(f'Unexecpted error calling {url}: {error}') from error
if response.status != expected_status:
raise RuntimeError(
f'Unexpected YT status {response.status} (expected: {expected_status}) for {url}'
f' -> {response.read().decode()}')
return headers, *self._parse_rss_data(response)