132 lines
6.1 KiB
Python
132 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import json
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
import urllib.error
|
|
import urllib.request
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from .youtube_subscription import ChannelInfo, ThumbnailInfo, VideoInfo
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
import http.client
|
|
import logging
|
|
from .objects import HTTPHeaders
|
|
|
|
|
|
class YoutubeManager:
|
|
DEFAULT_DAILY_POINTS = 10_000
|
|
SHORTS_CHECK_STATUS = 303
|
|
|
|
@dataclass
|
|
class RateLimit:
|
|
remaining: int
|
|
next_reset: float
|
|
|
|
def __init__(self, api_key: str, logger: logging.Logger):
|
|
self._api_key = api_key
|
|
self._logger = logger
|
|
self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_POINTS, next_reset=time.time() + 24 * 3600)
|
|
|
|
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]:
|
|
if time.time() >= self.rate_limit.next_reset:
|
|
self.rate_limit.next_reset = time.time() + 24 * 3600
|
|
self.rate_limit.remaining = self.DEFAULT_DAILY_POINTS
|
|
elif self.rate_limit.remaining <= 0:
|
|
sleep_time = time.time() - self.rate_limit.next_reset
|
|
self._logger.debug('No more remaining in Youtube RateLimit : sleeping for %.03fs', sleep_time)
|
|
time.sleep(sleep_time)
|
|
self.rate_limit.remaining -= 1
|
|
|
|
request = urllib.request.Request(url)
|
|
request.add_header('Accept', 'application/json')
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=request_timeout) as response:
|
|
if response.status != expected_status:
|
|
raise RuntimeError(
|
|
f'Unexpected YT status {response.status} (expected: {expected_status})'
|
|
f' -> {response.read().decode()}')
|
|
return dict(response.getheaders()), json.loads(response.read().decode())
|
|
except urllib.error.HTTPError as error:
|
|
raise RuntimeError(
|
|
f'HTTP error calling API ({url}): {error}:\n'
|
|
f'Headers:\n{error.headers}Body:\n{error.read()}') from error
|
|
except urllib.error.URLError as error:
|
|
raise RuntimeError(f'URL error calling YT API ({url}): {error}') from error
|
|
except TimeoutError as error:
|
|
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
|
|
|
|
def is_shorts(self, connection: http.client.HTTPConnection, video_id: str) -> bool:
|
|
try:
|
|
endpoint = f'/shorts/{video_id}'
|
|
self._logger.debug('YoutubeManager: Checking for shorts: %s', endpoint)
|
|
connection.request('GET', endpoint)
|
|
response = connection.getresponse()
|
|
response.read()
|
|
return response.status != self.SHORTS_CHECK_STATUS
|
|
except Exception as error:
|
|
raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error
|
|
|
|
def request_channel_id(self, channel_name: str, request_timeout: float) -> str:
|
|
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
|
|
f'&forHandle=%40{channel_name}&key={self._api_key}')
|
|
self._logger.debug('YoutubeManager: request channel id for channel %s', channel_name)
|
|
_, info = self._request(url=url, request_timeout=request_timeout)
|
|
return info['items'][0]['id']
|
|
|
|
@staticmethod
|
|
def _parse_rss_data(data) -> tuple[ChannelInfo, list[VideoInfo]]:
|
|
videos: list[VideoInfo] = []
|
|
root = ET.parse(data)
|
|
author = root.find('{*}author')
|
|
channel_info = ChannelInfo(
|
|
channel_id=root.find('{*}channelId').text, # type: ignore
|
|
title=author.find('{*}name').text, # type: ignore
|
|
url=author.find('{*}uri').text) # type: ignore
|
|
for entry in root.findall('{*}entry'):
|
|
media = entry.find('{*}group') # type: ignore
|
|
thumbnail = media.find('{*}thumbnail') # type: ignore
|
|
videos.append(VideoInfo(
|
|
video_id=entry.find('{*}videoId').text, # type: ignore
|
|
title=entry.find('{*}title').text, # type: ignore
|
|
description=media.find('{*}description').text, # type: ignore
|
|
url=entry.find('{*}link').get('href'), # type: ignore
|
|
thumbnail=ThumbnailInfo(
|
|
url=thumbnail.get('url'), # type: ignore
|
|
width=thumbnail.get('width'), # type: ignore
|
|
height=thumbnail.get('height')), # type: ignore
|
|
published=datetime.fromisoformat(entry.find('{*}published').text), # type: ignore
|
|
updated=datetime.fromisoformat(entry.find('{*}updated').text) # type: ignore
|
|
))
|
|
return channel_info, videos
|
|
|
|
def request_channel_videos(self, connection: http.client.HTTPConnection, channel_id: str,
|
|
expected_status: int = 200) -> tuple[HTTPHeaders, ChannelInfo, list[VideoInfo]]:
|
|
url = '/feeds/videos.xml?playlist_id='
|
|
url += f'UULF{channel_id[2:]}' if channel_id.startswith('UC') else f'{channel_id}'
|
|
self._logger.debug('YoutubeManager: request channel videos for channel %s', channel_id)
|
|
try:
|
|
connection.request('GET', url)
|
|
response = connection.getresponse()
|
|
headers = dict(response.getheaders())
|
|
except urllib.error.HTTPError as error:
|
|
raise RuntimeError(
|
|
f'HTTP error calling {url}: {error}:\n'
|
|
f'Headers:\n{error.headers}Body:\n{error.read()}') from error
|
|
except urllib.error.URLError as error:
|
|
raise RuntimeError(f'URL error calling {url}: {error}') from error
|
|
except TimeoutError as error:
|
|
raise RuntimeError(f'Timeout calling {url}: {error}') from error
|
|
except Exception as error:
|
|
raise RuntimeError(f'Unexecpted error calling {url}: {error}') from error
|
|
|
|
if response.status != expected_status:
|
|
raise RuntimeError(
|
|
f'Unexpected YT status {response.status} (expected: {expected_status}) for {url}'
|
|
f' -> {response.read().decode()}')
|
|
|
|
return headers, *self._parse_rss_data(response)
|