main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from datetime import UTC, datetime
4from zoneinfo import ZoneInfo
5
6from glom import glom
7from loguru import logger
8from pyrogram.client import Client
9from pyrogram.types import Message
10
11from config import NUM_YOUTUBE_SEARCH_RESULTS, PREFIX, PROXY, TOKEN, TZ
12from messages.parser import parse_msg
13from messages.sender import send2tg
14from messages.utils import equal_prefix, startswith_prefix
15from networking import hx_req
16from utils import number_to_emoji
17
18HELP = f"""🔍**搜索YouTube**
19`{PREFIX.SEARCH_YOUTUBE}` + 关键词
20"""
21
22
23async def search_youtube(client: Client, message: Message, **kwargs):
24 """Search YouTube."""
25 # send docs if message == "/ytb", without reply
26 if equal_prefix(message.text, prefix=[PREFIX.SEARCH_YOUTUBE]) and not message.reply_to_message:
27 await send2tg(client, message, texts=HELP, **kwargs)
28 return
29 info = parse_msg(message, silent=True)
30 if not startswith_prefix(info["text"], prefix=[PREFIX.SEARCH_YOUTUBE]):
31 return
32 query = info["text"].removeprefix(PREFIX.SEARCH_YOUTUBE).strip()
33 if not query:
34 return
35
36 res = await query_youtube(query)
37 if not res.get("data"):
38 await send2tg(client, message, texts="❌查询YouTube失败", **kwargs)
39 return
40 if error := res.get("error", ""):
41 await send2tg(client, message, texts=error, **kwargs)
42 return
43
44 msg = ""
45 for idx, item in enumerate(res["data"]):
46 video_url = f"https://www.youtube.com/watch?v={item['vid']}"
47 msg += f"👤[{item['author']}]({item['channel']}) 🕒{item['date']:%Y-%m-%d}\n"
48 msg += f"{number_to_emoji(idx + 1)}[{item['title']}]({video_url})\n\n"
49 await send2tg(client, message, texts=msg, **kwargs)
50
51
52async def query_youtube(query: str) -> dict:
53 results = []
54 try:
55 logger.info(f"Query YouTube info for {query=}, proxy={PROXY.GOOGLE}")
56 api = "https://www.googleapis.com/youtube/v3/search"
57 params = {
58 "key": TOKEN.YOUTUBE_API_KEY,
59 "part": "snippet",
60 "q": query,
61 "maxResults": min(NUM_YOUTUBE_SEARCH_RESULTS, 50),
62 "safeSearch": "none",
63 "type": "video",
64 }
65 resp = await hx_req(api, proxy=PROXY.GOOGLE, params=params, check_keys=["items"], max_retry=0)
66 if resp.get("hx_error"):
67 logger.warning(f"Search YouTube API failed: {resp['hx_error']}")
68 return {"error": {resp["hx_error"]}}
69
70 for x in resp["items"]:
71 if glom(x, "id.kind", default="") != "youtube#video":
72 continue
73 results.append(
74 {
75 "vid": x["id"]["videoId"],
76 "title": x["snippet"]["title"],
77 "author": x["snippet"]["channelTitle"],
78 "channel": f"https://www.youtube.com/channel/{x['snippet']['channelId']}",
79 "date": datetime.strptime(x["snippet"]["publishedAt"], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ)),
80 }
81 )
82 except Exception as e:
83 logger.error(f"Failed to get video info: {e}")
84 return {"error": str(e)}
85 return {"data": results}