main
 1#!/usr/bin/env python
 2# -*- coding: utf-8 -*-
 3from datetime import UTC, datetime
 4from zoneinfo import ZoneInfo
 5
 6from glom import glom
 7from loguru import logger
 8from pyrogram.client import Client
 9from pyrogram.types import Message
10
11from config import NUM_YOUTUBE_SEARCH_RESULTS, PREFIX, PROXY, TOKEN, TZ
12from messages.parser import parse_msg
13from messages.sender import send2tg
14from messages.utils import equal_prefix, startswith_prefix
15from networking import hx_req
16from utils import number_to_emoji
17
18HELP = f"""🔍**搜索YouTube**
19`{PREFIX.SEARCH_YOUTUBE}` + 关键词
20"""
21
22
23async def search_youtube(client: Client, message: Message, **kwargs):
24    """Search YouTube."""
25    # send docs if message == "/ytb", without reply
26    if equal_prefix(message.text, prefix=[PREFIX.SEARCH_YOUTUBE]) and not message.reply_to_message:
27        await send2tg(client, message, texts=HELP, **kwargs)
28        return
29    info = parse_msg(message, silent=True)
30    if not startswith_prefix(info["text"], prefix=[PREFIX.SEARCH_YOUTUBE]):
31        return
32    query = info["text"].removeprefix(PREFIX.SEARCH_YOUTUBE).strip()
33    if not query:
34        return
35
36    res = await query_youtube(query)
37    if not res.get("data"):
38        await send2tg(client, message, texts="❌查询YouTube失败", **kwargs)
39        return
40    if error := res.get("error", ""):
41        await send2tg(client, message, texts=error, **kwargs)
42        return
43
44    msg = ""
45    for idx, item in enumerate(res["data"]):
46        video_url = f"https://www.youtube.com/watch?v={item['vid']}"
47        msg += f"👤[{item['author']}]({item['channel']}) 🕒{item['date']:%Y-%m-%d}\n"
48        msg += f"{number_to_emoji(idx + 1)}[{item['title']}]({video_url})\n\n"
49    await send2tg(client, message, texts=msg, **kwargs)
50
51
52async def query_youtube(query: str) -> dict:
53    results = []
54    try:
55        logger.info(f"Query YouTube info for {query=}, proxy={PROXY.GOOGLE}")
56        api = "https://www.googleapis.com/youtube/v3/search"
57        params = {
58            "key": TOKEN.YOUTUBE_API_KEY,
59            "part": "snippet",
60            "q": query,
61            "maxResults": min(NUM_YOUTUBE_SEARCH_RESULTS, 50),
62            "safeSearch": "none",
63            "type": "video",
64        }
65        resp = await hx_req(api, proxy=PROXY.GOOGLE, params=params, check_keys=["items"], max_retry=0)
66        if resp.get("hx_error"):
67            logger.warning(f"Search YouTube API failed: {resp['hx_error']}")
68            return {"error": {resp["hx_error"]}}
69
70        for x in resp["items"]:
71            if glom(x, "id.kind", default="") != "youtube#video":
72                continue
73            results.append(
74                {
75                    "vid": x["id"]["videoId"],
76                    "title": x["snippet"]["title"],
77                    "author": x["snippet"]["channelTitle"],
78                    "channel": f"https://www.youtube.com/channel/{x['snippet']['channelId']}",
79                    "date": datetime.strptime(x["snippet"]["publishedAt"], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ)),
80                }
81            )
82    except Exception as e:
83        logger.error(f"Failed to get video info: {e}")
84        return {"error": str(e)}
85    return {"data": results}