from __future__ import annotations from dataclasses import dataclass from datetime import datetime import json import time from typing import TYPE_CHECKING import urllib.error import urllib.request from xml.etree import ElementTree as ET from .youtube_subscription import ChannelInfo, ThumbnailInfo, VideoInfo if TYPE_CHECKING: import http.client import logging from .objects import HTTPHeaders class YoutubeManager: DEFAULT_DAILY_POINTS = 10_000 SHORTS_CHECK_STATUS = 303 @dataclass class RateLimit: remaining: int next_reset: float def __init__(self, api_key: str, logger: logging.Logger): self._api_key = api_key self._logger = logger self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_POINTS, next_reset=time.time() + 24 * 3600) def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]: if time.time() >= self.rate_limit.next_reset: self.rate_limit.next_reset = time.time() + 24 * 3600 self.rate_limit.remaining = self.DEFAULT_DAILY_POINTS elif self.rate_limit.remaining <= 0: sleep_time = time.time() - self.rate_limit.next_reset self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time) time.sleep(sleep_time) self.rate_limit.remaining -= 1 request = urllib.request.Request(url) request.add_header('Accept', 'application/json') try: with urllib.request.urlopen(request, timeout=request_timeout) as response: if response.status != expected_status: raise RuntimeError( f'Unexpected YT status {response.status} (expected: {expected_status})' f' -> {response.read().decode()}') return dict(response.getheaders()), json.loads(response.read().decode()) except urllib.error.HTTPError as error: raise RuntimeError( f'HTTP error calling API ({url}): {error}:\n' f'Headers:\n{error.headers}Body:\n{error.read()}') from error except urllib.error.URLError as error: raise RuntimeError(f'URL error calling YT API ({url}): {error}') from error except TimeoutError as error: raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error def is_shorts(self, connection: http.client.HTTPConnection, video_id: str) -> bool: try: endpoint = f'/shorts/{video_id}' self._logger.debug('YoutubeManager: Checking for shorts: %s', endpoint) connection.request('GET', endpoint) response = connection.getresponse() response.read() return response.status != self.SHORTS_CHECK_STATUS except Exception as error: raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error def request_channel_id(self, channel_name: str, request_timeout: float) -> str: url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet' f'&forHandle%40={channel_name}&key={self._api_key}') self._logger.debug('YoutubeManager: request channel id for channel %s', channel_name) _, info = self._request(url=url, request_timeout=request_timeout) return info['items'][0]['id'] @staticmethod def _parse_rss_data(data) -> tuple[ChannelInfo, list[VideoInfo]]: videos: list[VideoInfo] = [] root = ET.parse(data) author = root.find('{*}author') channel_info = ChannelInfo( channel_id=root.find('{*}channelId').text, # type: ignore title=author.find('{*}name').text, # type: ignore url=author.find('{*}uri').text) # type: ignore for entry in root.findall('{*}entry'): media = entry.find('{*}group') # type: ignore thumbnail = media.find('{*}thumbnail') # type: ignore videos.append(VideoInfo( video_id=entry.find('{*}videoId').text, # type: ignore title=entry.find('{*}title').text, # type: ignore description=media.find('{*}description').text, # type: ignore url=entry.find('{*}link').get('href'), # type: ignore thumbnail=ThumbnailInfo( url=thumbnail.get('url'), # type: ignore width=thumbnail.get('width'), # type: ignore height=thumbnail.get('height')), # type: ignore published=datetime.fromisoformat(entry.find('{*}published').text), # type: ignore updated=datetime.fromisoformat(entry.find('{*}updated').text) # type: ignore )) return channel_info, videos def request_channel_videos(self, connection: http.client.HTTPConnection, channel_id: str, expected_status: int = 200) -> tuple[HTTPHeaders, ChannelInfo, list[VideoInfo]]: url = '/feeds/videos.xml?playlist_id=' url += f'UULF{channel_id[2:]}' if channel_id.startswith('UC') else f'{channel_id}' self._logger.debug('YoutubeManager: request channel videos for channel %s', channel_id) try: connection.request('GET', url) response = connection.getresponse() headers = dict(response.getheaders()) except urllib.error.HTTPError as error: raise RuntimeError( f'HTTP error calling {url}: {error}:\n' f'Headers:\n{error.headers}Body:\n{error.read()}') from error except urllib.error.URLError as error: raise RuntimeError(f'URL error calling {url}: {error}') from error except TimeoutError as error: raise RuntimeError(f'Timeout calling {url}: {error}') from error except Exception as error: raise RuntimeError(f'Unexecpted error calling {url}: {error}') from error if response.status != expected_status: raise RuntimeError( f'Unexpected YT status {response.status} (expected: {expected_status}) for {url}' f' -> {response.read().decode()}') return headers, *self._parse_rss_data(response)