from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime @dataclass class ThumbnailInfo: url: str width: int height: int @staticmethod def from_dict(info: dict) -> ThumbnailInfo: return ThumbnailInfo(url=info['url'], width=info['width'], height=info['height']) @dataclass class VideoInfo: video_id: str title: str description: str url: str thumbnail: ThumbnailInfo published: datetime updated: datetime @staticmethod def from_dict(info: dict) -> VideoInfo: return VideoInfo( video_id=info['channel_id'], title=info['title'], description=info['description'], url=info['url'], thumbnail=ThumbnailInfo.from_dict(info['thumbnail']), published=datetime.fromisoformat(info['published']), updated=datetime.fromisoformat(info['updated'])) @dataclass class ChannelInfo: channel_id: str title: str url: str @staticmethod def from_dict(info: dict) -> ChannelInfo: return ChannelInfo( channel_id=info['channel_id'], title=info['title'], url=info['url']) @dataclass class SubscriptionInfo: name: str channel_id: str last_update: float channel_info: ChannelInfo | None = None video_list: list[VideoInfo] = field(default_factory=list) @staticmethod def from_dict(info: dict) -> SubscriptionInfo: channel_info: dict | None = info.get('channel_info') return SubscriptionInfo( name=info['name'], channel_id=info['channel_id'], last_update=info['last_update'], channel_info=ChannelInfo.from_dict(channel_info) if channel_info is not None else None, video_list=[VideoInfo.from_dict(s) for s in info['video_list']]) Subscriptions = dict[str, SubscriptionInfo] SUBSCRIPTION_FILE_COLUMNS: list[bytes] = [b'Channel Name', b'Channel ID'] class SubscriptionHelper: @staticmethod def read_text(content: bytes) -> Subscriptions: content_lines = content.strip().splitlines() if content_lines[0] != b';'.join(SUBSCRIPTION_FILE_COLUMNS): raise RuntimeError(f"Unexpected header: {content_lines[0]} != {b','.join(SUBSCRIPTION_FILE_COLUMNS)}") subscriptions: Subscriptions = {} for line_number, line in enumerate(content_lines[1:], start=1): values = line.split(b';') if len(values) != len(SUBSCRIPTION_FILE_COLUMNS): raise RuntimeError(f'Unexpected value count (columns) at line {line_number}' f', got {len(values)} but expected {len(SUBSCRIPTION_FILE_COLUMNS)}') name = values[0].decode() channel_id = values[1].decode() subscriptions[channel_id] = SubscriptionInfo(name=name, channel_id=channel_id, last_update=0) return subscriptions @staticmethod def update_subscriptions(new: Subscriptions, previous: Subscriptions): for channel_id, previous_info in previous.items(): if channel_id in new: new[channel_id].last_update = previous_info.last_update new[channel_id].channel_info = previous_info.channel_info @staticmethod def generate_text(subscriptions: Subscriptions) -> bytes: content_lines: list[bytes] = [b';'.join(SUBSCRIPTION_FILE_COLUMNS)] content_lines.extend([ f"{sub_info.name.replace(';', ',')};{sub_info.channel_id}".encode() for sub_info in subscriptions.values()]) return b'\n'.join(content_lines)