Implement subscription from RSS feed

This commit is contained in:
BreadTube 2026-01-09 00:25:10 +09:00
commit aacceacd96
11 changed files with 185 additions and 271 deletions

1
.gitattributes vendored Normal file
View file

@ -0,0 +1 @@
tests/data/* filter=lfs diff=lfs merge=lfs -text

2
.gitignore vendored
View file

@ -1,3 +1,3 @@
*.pyc
data
/data

View file

@ -10,7 +10,7 @@ from pathlib import Path
import re
import time
import tomllib
from typing import Any, TYPE_CHECKING
from typing import Any
import traceback
from .config import Config
@ -19,10 +19,8 @@ from .logger import create_logger
from .objects import (ChannelCategory, FileMime, Message, MessageReference, MessageReferenceType, Overwrite,
OverwriteType, Permissions, Role, TextChannel)
from .youtube_manager import YoutubeManager
from .youtube_subscription import SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, SubscriptionInfo, Subscriptions
if TYPE_CHECKING:
from breadtube_bot.youtube_objects import SearchResultItem
from .youtube_subscription import (
SUBSCRIPTION_FILE_COLUMNS, SubscriptionHelper, SubscriptionInfo, Subscriptions, VideoInfo)
class Bot:
@ -31,9 +29,6 @@ class Bot:
INIT_MESSAGE: str = ('Bot initialized.\nThis is the current configuration used.\n'
'You can upload a new one to update the configuration.')
MAX_DOWNLOAD_SIZE: int = 50_000
SUBS_LIST_MIN_SIZE: int = 50
SUBS_LIST_SHORTS_RATIO: int = 5
SUBS_LIST_VIDEO_RATIO: int = 2
SUBS_SAVE_PATH: Path = Path('/tmp/breadtube-bot_subs.json')
class Task(Enum):
@ -48,7 +43,7 @@ class Bot:
raise RuntimeError('Cannot current bot version')
return tomllib.loads(pyproject_path.read_text(encoding='utf-8'))['project']['version']
def __init__(self, bot_token: str, guild_id: int, yt_api_key: str, config: Config | None = None,
def __init__(self, bot_token: str, guild_id: int, config: Config | None = None,
log_level: int = logging.INFO):
self.config: Config = config or Config()
self.guild_id = guild_id
@ -93,10 +88,16 @@ class Bot:
raise RuntimeError("Couldn't initialize bot channel/role/permission")
self.bot_channel: TextChannel = bot_channel
self.yt_manager = YoutubeManager(api_key=yt_api_key, logger=self.logger)
self._yt_subscriptions: Subscriptions = {
name: SubscriptionInfo.from_dict(info) for name, info in json.loads(
self.SUBS_SAVE_PATH.read_text(encoding='utf-8')).items()} if self.SUBS_SAVE_PATH.exists() else {}
self.yt_manager = YoutubeManager(logger=self.logger)
self._yt_subscriptions: Subscriptions = {}
if self.SUBS_SAVE_PATH.exists():
try:
self._yt_subscriptions = {
name: SubscriptionInfo.from_dict(info) for name, info in json.loads(
self.SUBS_SAVE_PATH.read_text(encoding='utf-8')).items()}
except Exception:
self.logger.error('Cannot load saved subscriptions at path "%s" -> deleting', self.SUBS_SAVE_PATH)
self.SUBS_SAVE_PATH.unlink()
self._scan_bot_channel()
self.tasks.append((
self.Task.SCAN_BOT_CHANNEL, time.time() + self.config.bot_channel_scan_interval, None))
@ -322,59 +323,42 @@ class Bot:
request_timeout=self.config.request_timeout)
return sub_channel
def _refresh_subscription(self, subscription: SubscriptionInfo):
_, yt_video_info = self.yt_manager.request_channel_videos(
channel_id=subscription.channel_id,
max_results=self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count,
request_timeout=self.config.request_timeout)
video_ids = {v.id.videoId for v in subscription.shorts_list + subscription.video_list}
yt_connection = http.client.HTTPSConnection('www.youtube.com', timeout=self.config.request_timeout)
for yt_info in yt_video_info.items:
if yt_info.id.videoId in video_ids:
continue
if self.yt_manager.is_shorts(yt_connection, yt_info.id.videoId):
subscription.shorts_list.append(yt_info)
else:
subscription.video_list.append(yt_info)
video_ids.add(yt_info.id.videoId)
internal_size = min(self.SUBS_LIST_MIN_SIZE,
self.SUBS_LIST_SHORTS_RATIO * self.config.youtube_channel_video_count)
subscription.shorts_list = sorted(
subscription.shorts_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size]
internal_size = min(self.SUBS_LIST_MIN_SIZE,
self.SUBS_LIST_VIDEO_RATIO * self.config.youtube_channel_video_count)
subscription.video_list = sorted(
subscription.video_list, key=lambda x: x.snippet.publishTime, reverse=True)[:internal_size]
def _refresh_subscription(self, connection: http.client.HTTPSConnection, subscription: SubscriptionInfo):
_, yt_channel_info, yt_video_info = self.yt_manager.request_channel_videos(
connection=connection, channel_id=subscription.channel_id)
if subscription.channel_info is None:
subscription.channel_info = yt_channel_info
video_ids: set[str] = {v.video_id for v in subscription.video_list}
new_videos = [video for video in yt_video_info if video.video_id not in video_ids]
if new_videos:
subscription.video_list = sorted(
subscription.video_list + new_videos, key=lambda x: x.published,
reverse=True)[:self.config.youtube_channel_video_count]
subscription.last_update = time.time()
def _video_message_content(self, video: SearchResultItem) -> str:
def _video_message_content(self, video: VideoInfo, subscription: SubscriptionInfo) -> str:
return (self.config.youtube_channel_video_message
.replace('{{video_id}}', str(video.id.videoId))
.replace('{{video_title}}', str(html.unescape(video.snippet.title)))
.replace('{{video_description}}', str(video.snippet.description))
.replace('{{video_publish_time}}', video.snippet.publishTime.isoformat())
.replace('{{channel_id}}', str(video.snippet.channelId))
.replace('{{channel_title}}', str(video.snippet.channelTitle))
)
.replace('{{video_id}}', str(video.video_id))
.replace('{{video_title}}', str(html.unescape(video.title)))
.replace('{{video_description}}', str(video.description))
.replace('{{video_publish_time}}', video.published.isoformat())
.replace('{{channel_id}}', str(subscription.channel_info.channel_id)
if subscription.channel_info is not None else 'NO_CHANNEL_ID')
.replace('{{channel_title}}', str(subscription.channel_info.title
if subscription.channel_info is not None else 'NO_CHANNEL_TITLE')))
def _refresh_sub(self, subscription: SubscriptionInfo, channel_dict: dict[str, TextChannel],
category_ranges: list[tuple[int, int, ChannelCategory]]):
def _refresh_sub(self, connection: http.client.HTTPSConnection, subscription: SubscriptionInfo,
channel_dict: dict[str, TextChannel], category_ranges: list[tuple[int, int, ChannelCategory]]):
try:
sub_channel = self._get_subscription_channel(subscription, channel_dict, category_ranges)
except RuntimeError as error:
self.logger.error(error)
return
self._refresh_subscription(connection, subscription)
if subscription.channel_info is None:
_, channel_info = self.yt_manager.request_channel_info(
subscription.channel_id, request_timeout=self.config.request_timeout)
if not channel_info.items:
raise RuntimeError('No channel info return from YouTube API for channel: %s', sub_channel.name)
subscription.channel_info = channel_info.items[0].snippet
self._refresh_subscription(subscription)
sub_init_message = f'https://www.youtube.com/{subscription.channel_info.customUrl}'
raise RuntimeError('No channel info after refreshing subscription')
sub_init_message = f'https://www.youtube.com/{subscription.channel_info.url}'
sub_messages = self._get_all_channel_messages(sub_channel)
if not sub_messages or sub_messages[-1].content != sub_init_message:
self.logger.debug('Clearing sub channel: %s', sub_channel.name)
@ -391,14 +375,14 @@ class Bot:
stop_scan = False
for yt_video in yt_videos:
for index, message in enumerate(messages[last_matching_index:], start=last_matching_index):
if message.content != self._video_message_content(yt_video):
if message.content != self._video_message_content(yt_video, subscription):
if last_matching_index != 0:
stop_scan = True
break
self.logger.debug('Unmatched video: %s', yt_video.id.videoId)
self.logger.debug('Unmatched video: %s', yt_video.video_id)
immediate_delete[message.id] = message
else:
self.logger.debug('Matched video: %s', yt_video.id.videoId)
self.logger.debug('Matched video: %s', yt_video.video_id)
last_matching_index = index + 1
break
else:
@ -417,7 +401,7 @@ class Bot:
message.id, sub_channel.name, error)
for video in yt_videos[last_matching_index:]:
_ = self.discord_manager.create_message(
sub_channel, {'content': self._video_message_content(video)},
sub_channel, {'content': self._video_message_content(video, subscription)},
request_timeout=self.config.request_timeout)
subscription.last_update = time.time()
@ -443,15 +427,20 @@ class Bot:
category_ranges.append((ord(range_info[0].lower()), ord(range_info[1].lower()), category))
category_ranges = sorted(category_ranges, key=operator.itemgetter(0))
yt_connection = http.client.HTTPSConnection('www.youtube.com', timeout=self.config.request_timeout)
sorted_subs = sorted(self._yt_subscriptions.values(), key=lambda s: s.last_update)
for sub_info in sorted_subs:
try:
self._refresh_sub(sub_info, channel_dict, category_ranges)
self._refresh_sub(yt_connection, sub_info, channel_dict, category_ranges)
except RuntimeError as error:
self.logger.error('Refreshing subscription %s failed: %s', sub_info.channel_id, error)
except TimeoutError as error:
self.logger.error('Timeout error refreshing subcription: %s', error)
break
except Exception as error:
self.logger.error('Refreshing subscription %s unexpectedly failed: %s', sub_info.channel_id, error)
break
yt_connection.close()
self.logger.info('Subs refreshed')
def run(self):

View file

@ -12,7 +12,7 @@ class Config:
bot_message_duration: float = 150.
request_timeout: float = 3.
unmanaged_categories: str = ''
youtube_channel_refresh_interval: float = 3600
youtube_channel_refresh_interval: float = 600
youtube_channel_video_count: int = 10
youtube_channel_video_message: str = '[{{video_title}}](https://www.youtube.com/video/{{video_id}})'

View file

@ -1,13 +1,14 @@
from __future__ import annotations
from dataclasses import dataclass
import json
from datetime import datetime
import time
from typing import TYPE_CHECKING
import urllib.error
import urllib.request
from xml.etree import ElementTree as ET
from .youtube_objects import ChannelResult, SearchResult
from .youtube_subscription import ChannelInfo, ThumbnailInfo, VideoInfo
if TYPE_CHECKING:
@ -25,12 +26,11 @@ class YoutubeManager:
remaining: int
next_reset: float
def __init__(self, api_key: str, logger: logging.Logger):
self._api_key = api_key
def __init__(self, logger: logging.Logger):
self._logger = logger
self.rate_limit = self.RateLimit(remaining=self.DEFAULT_DAILY_POINTS, next_reset=time.time() + 24 * 3600)
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, dict]:
def _request(self, url: str, request_timeout: float, expected_status: int = 200) -> tuple[HTTPHeaders, str]:
if time.time() >= self.rate_limit.next_reset:
self.rate_limit.next_reset = time.time() + 24 * 3600
self.rate_limit.remaining = self.DEFAULT_DAILY_POINTS
@ -41,14 +41,14 @@ class YoutubeManager:
self.rate_limit.remaining -= 1
request = urllib.request.Request(url)
request.add_header('Accept', 'application/json')
# request.add_header('Accept', 'application/json')
try:
with urllib.request.urlopen(request, timeout=request_timeout) as response:
if response.status != expected_status:
raise RuntimeError(
f'Unexpected YT status {response.status} (expected: {expected_status})'
f' -> {response.read().decode()}')
return dict(response.getheaders()), json.loads(response.read().decode())
return dict(response.getheaders()), response.read().decode()
except urllib.error.HTTPError as error:
raise RuntimeError(
f'HTTP error calling API ({url}): {error}:\n'
@ -69,18 +69,55 @@ class YoutubeManager:
except Exception as error:
raise RuntimeError(f'Exception calling YouTube shorts ({video_id}): {error}') from error
def request_channel_info(self, channel_id: str, request_timeout: float) -> tuple[
HTTPHeaders, ChannelResult]:
url = ('https://www.googleapis.com/youtube/v3/channels?part=snippet'
f'&id={channel_id}&key={self._api_key}')
self._logger.debug('YoutubeManager: request channel info for channel %s', channel_id)
headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, ChannelResult.from_dict(info)
@staticmethod
def _parse_rss_data(data) -> tuple[ChannelInfo, list[VideoInfo]]:
videos: list[VideoInfo] = []
root = ET.parse(data)
author = root.find('{*}author')
channel_info = ChannelInfo(
channel_id=root.find('{*}channelId').text, # type: ignore
title=author.find('{*}name').text, # type: ignore
url=author.find('{*}uri').text) # type: ignore
for entry in root.findall('{*}entry'):
media = entry.find('{*}group') # type: ignore
thumbnail = media.find('{*}thumbnail') # type: ignore
videos.append(VideoInfo(
video_id=entry.find('{*}videoId').text, # type: ignore
title=entry.find('{*}title').text, # type: ignore
description=media.find('{*}description').text, # type: ignore
url=entry.find('{*}link').get('href'), # type: ignore
thumbnail=ThumbnailInfo(
url=thumbnail.get('url'), # type: ignore
width=thumbnail.get('width'), # type: ignore
height=thumbnail.get('height')), # type: ignore
published=datetime.fromisoformat(entry.find('{*}published').text), # type: ignore
updated=datetime.fromisoformat(entry.find('{*}updated').text) # type: ignore
))
return channel_info, videos
def request_channel_videos(self, channel_id: str, max_results: int, request_timeout: float) -> tuple[
HTTPHeaders, SearchResult]:
url = (f'https://www.googleapis.com/youtube/v3/search?part=snippet&channelId={channel_id}'
f'&maxResults={max_results}&order=date&type=video&key={self._api_key}')
def request_channel_videos(self, connection: http.client.HTTPConnection, channel_id: str,
expected_status: int = 200) -> tuple[HTTPHeaders, ChannelInfo, list[VideoInfo]]:
url = '/feeds/videos.xml?playlist_id='
url += f'UULF{channel_id[2:]}' if channel_id.startswith('UC') else f'{channel_id}'
self._logger.debug('YoutubeManager: request channel videos for channel %s', channel_id)
headers, info = self._request(url=url, request_timeout=request_timeout)
return headers, SearchResult.from_dict(info)
try:
connection.request('GET', url)
response = connection.getresponse()
headers = dict(response.getheaders())
except urllib.error.HTTPError as error:
raise RuntimeError(
f'HTTP error calling {url}: {error}:\n'
f'Headers:\n{error.headers}Body:\n{error.read()}') from error
except urllib.error.URLError as error:
raise RuntimeError(f'URL error calling {url}: {error}') from error
except TimeoutError as error:
raise RuntimeError(f'Timeout calling {url}: {error}') from error
except Exception as error:
raise RuntimeError(f'Unexecpted error calling {url}: {error}') from error
if response.status != expected_status:
raise RuntimeError(
f'Unexpected YT status {response.status} (expected: {expected_status}) for {url}'
f' -> {response.read().decode()}')
return headers, *self._parse_rss_data(response)

View file

@ -1,177 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from types import get_original_bases
from typing import Generic, Self, TypeVar, get_args
T = TypeVar('T')
class _Api(Generic[T], ABC):
@staticmethod
@abstractmethod
def from_dict(info: dict) -> T:
raise NotImplementedError
T_api = TypeVar('T_api', bound=_Api)
# Generic Objects
@dataclass
class PageInfo(_Api):
totalResults: int
resultsPerPage: int
@staticmethod
def from_dict(info: dict) -> PageInfo:
return PageInfo(totalResults=info['totalResults'], resultsPerPage=info['resultsPerPage'])
@dataclass
class ThumbnailInfo(_Api):
url: str
width: int
height: int
@staticmethod
def from_dict(info: dict) -> ThumbnailInfo:
return ThumbnailInfo(url=info['url'], width=info['width'], height=info['height'])
@dataclass
class Thumbnails(_Api):
default: ThumbnailInfo
medium: ThumbnailInfo
high: ThumbnailInfo
@staticmethod
def from_dict(info: dict) -> Thumbnails:
return Thumbnails(
default=ThumbnailInfo.from_dict(info['default']),
medium=ThumbnailInfo.from_dict(info['medium']),
high=ThumbnailInfo.from_dict(info['high']))
@dataclass
class Result(Generic[T_api]):
kind: str
etag: str
nextPageToken: str | None
pageInfo: PageInfo
items: list[T_api]
@classmethod
def from_dict(cls, info: dict) -> Self:
item_type = get_args(get_original_bases(cls)[0])[0]
return cls(
kind=info['kind'],
etag=info['etag'],
nextPageToken=info.get('nextPageToken'),
pageInfo=PageInfo.from_dict(info['pageInfo']),
items=[item_type.from_dict(i) for i in info.get('items', [])])
# Channel Objects
@dataclass
class ChannelSnippet(_Api):
title: str
description: str
customUrl: str
publishedAt: datetime
thumbnails: Thumbnails
country: str | None
@staticmethod
def from_dict(info: dict) -> ChannelSnippet:
return ChannelSnippet(
title=info['title'],
description=info['description'],
customUrl=info['customUrl'],
publishedAt=datetime.fromisoformat(info['publishedAt']),
thumbnails=Thumbnails.from_dict(info['thumbnails']),
country=info.get('country'))
@dataclass
class ChannelResultItem(_Api):
kind: str
etag: str
id: str
snippet: ChannelSnippet
@staticmethod
def from_dict(info: dict) -> ChannelResultItem:
return ChannelResultItem(
kind=info['kind'],
etag=info['etag'],
id=info['id'],
snippet=ChannelSnippet.from_dict(info['snippet']))
class ChannelResult(Result[ChannelResultItem]):
pass
# Search Objects
@dataclass
class SearchResultId(_Api):
kind: str
videoId: str
@staticmethod
def from_dict(info: dict) -> SearchResultId:
return SearchResultId(kind=info['kind'], videoId=info['videoId'])
@dataclass
class SearchSnippet(_Api):
publishedAt: datetime
channelId: str
title: str
description: str
thumbnails: Thumbnails
channelTitle: str
liveBroadcastContent: str
publishTime: datetime
@staticmethod
def from_dict(info: dict) -> SearchSnippet:
return SearchSnippet(
publishedAt=datetime.fromisoformat(info['publishedAt']),
channelId=info['channelId'],
title=info['title'],
description=info['description'],
thumbnails=Thumbnails.from_dict(info['thumbnails']),
channelTitle=info['channelTitle'],
liveBroadcastContent=info['liveBroadcastContent'],
publishTime=datetime.fromisoformat(info['publishTime']))
@dataclass
class SearchResultItem(_Api):
kind: str
etag: str
id: SearchResultId
snippet: SearchSnippet
@staticmethod
def from_dict(info: dict) -> SearchResultItem:
return SearchResultItem(
kind=info['kind'],
etag=info['etag'],
id=SearchResultId.from_dict(info['id']),
snippet=SearchSnippet.from_dict(info['snippet']))
class SearchResult(Result[SearchResultItem]):
pass

View file

@ -1,8 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from .youtube_objects import ChannelSnippet, SearchResultItem
@dataclass
class ThumbnailInfo:
url: str
width: int
height: int
@staticmethod
def from_dict(info: dict) -> ThumbnailInfo:
return ThumbnailInfo(url=info['url'], width=info['width'], height=info['height'])
@dataclass
class VideoInfo:
video_id: str
title: str
description: str
url: str
thumbnail: ThumbnailInfo
published: datetime
updated: datetime
@staticmethod
def from_dict(info: dict) -> VideoInfo:
return VideoInfo(
video_id=info['channel_id'],
title=info['title'],
description=info['description'],
url=info['url'],
thumbnail=ThumbnailInfo.from_dict(info['thumbnail']),
published=datetime.fromisoformat(info['published']),
updated=datetime.fromisoformat(info['updated']))
@dataclass
class ChannelInfo:
channel_id: str
title: str
url: str
@staticmethod
def from_dict(info: dict) -> ChannelInfo:
return ChannelInfo(
channel_id=info['channel_id'],
title=info['title'],
url=info['url'])
@dataclass
@ -10,9 +56,8 @@ class SubscriptionInfo:
name: str
channel_id: str
last_update: float
channel_info: ChannelSnippet | None = None
shorts_list: list[SearchResultItem] = field(default_factory=list)
video_list: list[SearchResultItem] = field(default_factory=list)
channel_info: ChannelInfo | None = None
video_list: list[VideoInfo] = field(default_factory=list)
@staticmethod
def from_dict(info: dict) -> SubscriptionInfo:
@ -21,9 +66,8 @@ class SubscriptionInfo:
name=info['name'],
channel_id=info['channel_id'],
last_update=info['last_update'],
channel_info=ChannelSnippet.from_dict(channel_info) if channel_info is not None else None,
shorts_list=[SearchResultItem.from_dict(s) for s in info['shorts_list']],
video_list=[SearchResultItem.from_dict(s) for s in info['video_list']])
channel_info=ChannelInfo.from_dict(channel_info) if channel_info is not None else None,
video_list=[VideoInfo.from_dict(s) for s in info['video_list']])
Subscriptions = dict[str, SubscriptionInfo]

View file

@ -40,7 +40,7 @@ select = ["A", "ARG", "B", "C", "E", "F", "FURB", "G", "I","ICN", "ISC", "PERF",
ignore = ["E275", "FURB140", "I001", "PERF203", "RET502", "RET503", "SIM105"]
[tool.ruff.lint.per-file-ignores]
"tests/*" = ["SLF001"]
"tests/*" = ["SLF001", "PLR2004"]
[tool.ruff.lint.flake8-quotes]
inline-quotes = "single"

View file

@ -16,9 +16,7 @@ def main():
del arguments
bot_token = Path('data/discord_bot_token.txt').read_text(encoding='utf-8').strip()
yt_api_key = Path('data/google_api_key.txt').read_text(encoding='utf-8').strip()
manager = Bot(bot_token=bot_token, guild_id=guild_id, yt_api_key=yt_api_key,
log_level=logging.DEBUG if debug_mode else logging.INFO)
manager = Bot(bot_token=bot_token, guild_id=guild_id, log_level=logging.DEBUG if debug_mode else logging.INFO)
try:
manager.run()
except KeyboardInterrupt:

BIN
tests/data/rss_feed_sample.xml (Stored with Git LFS) Normal file

Binary file not shown.

View file

@ -0,0 +1,19 @@
import logging
from pathlib import Path
from breadtube_bot.youtube_manager import YoutubeManager
from breadtube_bot.youtube_subscription import ChannelInfo
def test_rss_parsing():
logger = logging.getLogger('breadtube-bot-test')
manager = YoutubeManager(logger=logger)
channel_info, videos = manager._parse_rss_data(Path('tests/data/rss_feed_sample.xml').read_text(encoding='utf-8'))
assert channel_info == ChannelInfo(
channel_id='UCFemKOoYVrTGUhuVzuNPt4A', title='Actu Réfractaire',
url='https://www.youtube.com/channel/UCFemKOoYVrTGUhuVzuNPt4A')
assert len(videos) == 15
video_ids = {'RZfVeU_iK0I', 'sLTFoRQHq3o', 'BcJ-ATQOQps', 'bOF1Pbtdg7U', '8yai5Maa1Wc', 'j1wU7JSUhe0',
'agAf1SdyK_Y', 'a4Kj_vUULfI', 'Sl2ukhsD7w0', 'wGSpwg0MC98', 'JNWkTB-7Zyk', '2I9rY7zSLPs',
'yR98Ur1BUJ8', 'HHBZ75L_vvY', 'tmBt6RCr6gQ'}
assert {video.video_id for video in videos} == video_ids