main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3"""This file contains the code for extracting information from YouTube videos.
  4
  5But not for downloading YouTube videos.
  6For downloading YouTube videos, please see `src/preview/ytdlp.py`.
  7"""
  8
  9import re
 10from datetime import UTC, datetime, timedelta
 11from zoneinfo import ZoneInfo
 12
 13from glom import Coalesce, glom
 14from loguru import logger
 15
 16from config import PROXY, TOKEN, TZ, cache
 17from messages.utils import blockquote
 18from networking import hx_req
 19from utils import nowstr, readable_count, true
 20
 21
 22@cache.memoize(ttl=60)
 23async def get_youtube_comments(vid: str | None) -> list[str]:
 24    if not vid:
 25        return []
 26    api = "https://www.googleapis.com/youtube/v3/commentThreads"
 27    params = {"key": TOKEN.YOUTUBE_API_KEY, "maxResults": 100, "textFormat": "plainText", "part": "snippet", "videoId": vid}
 28    comments = []
 29    try:
 30        resp = await hx_req(api, proxy=PROXY.GOOGLE, params=params, check_keys=["items"])
 31        if resp.get("hx_error"):
 32            logger.warning(f"YouTube Comments API failed: {resp['hx_error']}")
 33            return []
 34        data = resp["items"]
 35        for idx, x in enumerate(data):
 36            name = glom(x, "snippet.topLevelComment.snippet.authorDisplayName", default="匿名")
 37            name = name.removeprefix("@")
 38            if author_url := glom(x, "snippet.topLevelComment.snippet.authorChannelUrl", default=""):
 39                name = f"[{name}]({author_url})"
 40            if cmt := glom(x, "snippet.topLevelComment.snippet.textDisplay", default=""):
 41                if idx == 0:
 42                    comments.append(f"\n{blockquote('💬**点此展开评论区**:')}")
 43                cmt = f"💬**{name}**: {cmt}"
 44                comments.append(f"\n{blockquote(cmt)}")
 45    except Exception as e:
 46        logger.error(f"Failed to get YouTube comments: {e}")
 47        return []
 48    return comments
 49
 50
 51@cache.memoize(ttl=120)
 52async def get_youtube_vinfo(video_id: str) -> dict:
 53    """Fetch YouTube video info.
 54
 55    Returns:
 56        {
 57            "downloadable": (bool),
 58            "error_msg": (str),
 59            "title": (str),
 60            "description": (str),
 61            "author": (str),
 62            "channel": (str) channel url,
 63            "pubdate": (str)
 64            "duration": (int) in seconds,
 65            "has_subtitle": (bool),
 66            "is_live": (bool),
 67            "live_start": (datetime),
 68            "live_end": (datetime),
 69            "scheduled_start": (datetime),
 70            "view_count": (int),
 71            "like_count": (int),
 72            "favorite_count": (int),
 73            "comment_count": (int),
 74            "statistics": (str) "👁100K 👍100K ⭐️100K 💬100K",
 75            "emoji": (str) "🔴"
 76        }
 77    """
 78    if not video_id:
 79        return {"downloadable": False, "error_msg": "❌未提供VideoID"}
 80    info: dict = {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
 81    try:
 82        logger.info(f"Fetch YouTube video info for {video_id=}, proxy={PROXY.GOOGLE}")
 83        api = "https://www.googleapis.com/youtube/v3/videos"
 84        params = {"key": TOKEN.YOUTUBE_API_KEY, "part": "snippet,status,contentDetails,liveStreamingDetails,statistics", "id": video_id, "hl": "zh-CN"}
 85        resp = await hx_req(api, proxy=PROXY.GOOGLE, params=params, check_keys=["items.0.snippet"], max_retry=3)
 86        if resp.get("hx_error"):
 87            logger.warning(f"YouTube Videos API failed: {resp['hx_error']}")
 88            return {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
 89        if not glom(resp, "items.0.snippet", default={}):
 90            logger.warning("YouTube Videos API failed: Video not found")
 91            return {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
 92
 93        # basci info
 94        info["title"] = glom(resp, "items.0.snippet.title", default="Title")
 95        info["description"] = glom(resp, "items.0.snippet.description", default="")
 96        info["author"] = glom(resp, "items.0.snippet.channelTitle", default="YouTuber")
 97        channel = glom(resp, "items.0.snippet.channelId", default="")
 98        info["channel"] = f"https://www.youtube.com/channel/{channel}"
 99        if pubdate := glom(resp, "items.0.snippet.publishedAt", default=""):
100            dt = datetime.strptime(pubdate, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
101            info["pubdate"] = f"{dt:%Y-%m-%d %H:%M:%S}"
102        else:
103            info["pubdate"] = nowstr(TZ)
104        info["has_subtitle"] = true(glom(resp, "items.0.contentDetails.caption", default=False))
105
106        # livestreaming
107        info |= {"is_live": False, "live_start": "", "live_end": "", "scheduled_start": ""}
108        if live_details := glom(resp, "items.0.liveStreamingDetails", default={}):
109            info["is_live"] = True
110            if live_start := live_details.get("actualStartTime"):
111                info["live_start"] = datetime.strptime(live_start, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
112            if live_end := live_details.get("actualEndTime"):
113                info["live_end"] = datetime.strptime(live_end, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
114            if scheduled_start := live_details.get("scheduledStartTime"):
115                info["scheduled_start"] = datetime.strptime(scheduled_start, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
116
117        # statistics
118        info |= {
119            "view_count": int(glom(resp, "items.0.statistics.viewCount", default=0)),
120            "like_count": int(glom(resp, "items.0.statistics.likeCount", default=0)),
121            "favorite_count": int(glom(resp, "items.0.statistics.favoriteCount", default=0)),
122            "comment_count": int(glom(resp, "items.0.statistics.commentCount", default=0)),
123        }
124        statistics = ""
125        if view := info.get("view_count"):
126            statistics += f"👁{readable_count(view)}"
127        if like := info.get("like_count"):
128            statistics += f"👍{readable_count(like)}"
129        if favorite := info.get("favorite_count"):
130            statistics += f"⭐️{readable_count(favorite)}"
131        if comment := info.get("comment_count"):
132            statistics += f"💬{readable_count(comment)}"
133        info["statistics"] = statistics
134
135        # downloadable
136        info |= {"downloadable": True, "error_msg": ""}
137        privacy = glom(resp, "items.0.status.privacyStatus", default="private")  # public, private, unlisted
138        status = glom(resp, "items.0.status.uploadStatus", default="failed")  # deleted, failed, processed, uploaded, rejected
139        if privacy not in ["public", "unlisted"]:
140            info |= {"downloadable": False, "error_msg": "❌私享视频不可下载"}
141        if status != "processed":
142            info |= {"downloadable": False, "error_msg": f"❌转码视频未完成, 当前状态: {status}"}
143        if glom(resp, "items.0.snippet.liveBroadcastContent", default="") in ["live", "upcoming"]:
144            info |= {"downloadable": False, "error_msg": f"❌直播还未完成, 当前状态: {glom(resp, 'items.0.snippet.liveBroadcastContent')}"}
145        if info["is_live"] and not info["live_end"]:
146            info |= {"downloadable": False, "error_msg": f"❌直播还未完成, 当前状态: {glom(resp, 'items.0.snippet.liveBroadcastContent')}"}
147        blocked_regions = glom(resp, "items.0.contentDetails.regionRestriction.blocked", default=[]) or []
148        if "US" in blocked_regions:
149            info |= {"downloadable": False, "error_msg": f"❌视频在以下国家/地区被屏蔽: {', '.join(blocked_regions)}"}
150
151        # parse duration
152        """For a video that is at least one minute long and less than one hour long, the duration is in the format PT#M#S,
153        in which the letters PT indicate that the value specifies a period of time, and the letters M and S refer to length in minutes and seconds, respectively.
154        The # characters preceding the M and S letters are both integers that specify the number of minutes (or seconds) of the video.
155        For example, a value of PT15M33S indicates that the video is 15 minutes and 33 seconds long.
156
157        If the video is at least one hour long, the duration is in the format PT#H#M#S,
158        in which the # preceding the letter H specifies the length of the video in hours and all of the other details are the same as described above.
159        If the video is at least one day long, the letters P and T are separated, and the value's format is P#DT#H#M#S.
160
161        Please refer to the ISO 8601 specification for complete details. (https://en.wikipedia.org/wiki/ISO_8601#Durations)
162        """
163        duration = glom(resp, "items.0.contentDetails.duration", default="PT0M0S")
164        pattern = r"^P(?:(?P<days>\d+\.\d+|\d*?)D)?T?(?:(?P<hours>\d+\.\d+|\d*?)H)?(?:(?P<minutes>\d+\.\d+|\d*?)M)?(?:(?P<seconds>\d+\.\d+|\d*?)S)?$"
165        if matched := re.match(pattern, duration):
166            parts = {k: float(v) for k, v in matched.groupdict("0").items()}
167            info["duration"] = int(timedelta(**parts).total_seconds())
168        else:
169            info["duration"] = 0
170    except Exception as e:
171        logger.error(f"Failed to get video info: {e}")
172        return {"downloadable": False, "error_msg": "❌无法获取此视频信息"}
173    return info | {"emoji": "🔴"}
174
175
176async def get_youtube_channel_thumb(channel_id: str) -> str:
177    """Get YouTube channel thumbnail url."""
178    if not channel_id:
179        return ""
180    api = "https://www.googleapis.com/youtube/v3/channels"
181    params = {"key": TOKEN.YOUTUBE_API_KEY, "part": "snippet", "id": channel_id, "hl": "zh-CN"}
182    resp = await hx_req(api, proxy=PROXY.GOOGLE, params=params, check_keys=["items.0.snippet"], max_retry=3)
183    if resp.get("hx_error"):
184        logger.warning(f"YouTube Channels API failed: {resp['hx_error']}")
185        return ""
186    thumbnails = glom(resp, "items.0.snippet.thumbnails", default={})
187    return glom(thumbnails, Coalesce("high.url", "medium.url", "default.url"), default="")
188
189
190@cache.memoize(ttl=120)
191async def get_youtube_channel_name_by_handle(handle: str) -> str:
192    """Get YouTube channel by handle."""
193    if not handle:
194        return ""
195    api = "https://www.googleapis.com/youtube/v3/channels"
196    params = {"key": TOKEN.YOUTUBE_API_KEY, "part": "snippet", "forHandle": handle}
197    resp = await hx_req(api, proxy=PROXY.GOOGLE, params=params, check_keys=["items.0.snippet"], max_retry=3)
198    if resp.get("hx_error"):
199        logger.warning(f"YouTube Channels API failed: {resp['hx_error']}")
200        return ""
201    return glom(resp, "items.0.snippet.title", default="")