Subscription update

This commit is contained in:
BreadTube 2025-09-29 23:34:47 +09:00 committed by Corentin
commit f510cc5aef
4 changed files with 182 additions and 81 deletions

View file

@ -1,13 +1,14 @@
from __future__ import annotations
from dataclasses import dataclass
import http.client
import json
import time
from typing import TYPE_CHECKING
import urllib.error
import urllib.request
from .yt_objects import ChannelResult, SearchResult
from .youtube_objects import ChannelResult, SearchResult
if TYPE_CHECKING:
@ -17,6 +18,7 @@ if TYPE_CHECKING:
class YoutubeManager:
DEFAULT_DAILY_REQUESTS = 10_000
SHORTS_CHECK_STATUS = 303
@dataclass
class RateLimit:
@ -56,6 +58,15 @@ class YoutubeManager:
except TimeoutError as error:
raise RuntimeError(f'Timeout calling YT API ({url}): {error}') from error
def is_shorts(self, video_id: str, request_timeout: float) -> bool:
try:
connection = http.client.HTTPSConnection('www.youtube.com', timeout=request_timeout)
connection.request('GET', f'/shorts/{video_id}')
response = connection.getresponse()
return response.status != self.SHORTS_CHECK_STATUS
except Exception as error:
raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error
def request_channel_info(self, channel_id: str, request_timeout: float) -> tuple[
HTTPHeaders, ChannelResult]:
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
@ -65,7 +76,7 @@ class YoutubeManager:
def request_channel_videos(self, channel_id: str, max_results: int, request_timeout: float) -> tuple[
HTTPHeaders, SearchResult]:
url = ('https://www.googleapis.com/youtube/v3/search?part=snippet'
f'&channelId={channel_id}&maxResults={max_results}&order=date&key={self._api_key}')
url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}'
f'&maxResults={max_results}&order=date&type=video&key={self._api_key}')
headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, SearchResult.from_dict(info)