main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4from datetime import datetime
5from zoneinfo import ZoneInfo
6
7from glom import glom
8from loguru import logger
9from pyrogram.client import Client
10from pyrogram.types import Chat, Message
11from pyrogram.types.messages_and_media.message import Str
12
13from config import TZ, cache
14from custom.config import CHANNEL_YOUTUBE, SYNC_YOUTUBE_FREQUENCY_SECONDS, SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS
15from database.d1 import create_d1_table, insert_d1, query_d1
16from database.kv import get_cf_kv
17from messages.main import preview_social_media
18from podcast.utils import get_pubdate
19from podcast.xml import parse_feed
20from preview.youtube import get_youtube_vinfo
21from utils import nowdt, rand_number
22
23
24async def sync_youtube(client: Client):
25 if os.getenv("SYNC_YOUTUBE_DISABLED", "0") == "1":
26 return
27 if cache.get("sync_youtube"):
28 return
29 cache.set("sync_youtube", 1, ttl=SYNC_YOUTUBE_FREQUENCY_SECONDS)
30 kv = await get_cf_kv("youtube", silent=True)
31 if not kv.get("data"):
32 return
33 await create_d1_table(
34 table_name="youtube",
35 columns="vid TEXT PRIMARY KEY, timestamp INTEGER, channel TEXT, title TEXT, url TEXT, status TEXT",
36 idx_cols=["vid", "timestamp", "channel", "status"],
37 silent=True,
38 )
39 for conf in kv["data"]:
40 await sync_one_channel(client, conf)
41
42
43async def sync_one_channel(client: Client, channel_conf: dict):
44 channel_id = channel_conf["channel_id"]
45 feed = await parse_feed(f"https://cors.zydou.me/https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}")
46 entries = feed.get("entries", [])
47 d1 = await query_d1(sql=f"SELECT * FROM youtube WHERE channel = '{channel_id}' ORDER BY timestamp DESC LIMIT 20", silent=True)
48 if "hx_error" in d1:
49 logger.error(f"【D1】查询YouTube视频失败: {channel_conf['title']} ({d1['hx_error']})")
50 return
51 records = glom(d1, "result.0.results", default=[])
52 await save_entry_to_d1(entries, records)
53 for entry in sorted(records, key=lambda x: x["timestamp"]): # old to new
54 vid = entry["vid"]
55 if entry["status"] == "done":
56 continue
57 # check again
58 d1 = await query_d1(sql=f"SELECT timestamp FROM youtube WHERE vid = '{vid}'", silent=True)
59 if glom(d1, "result.0.results.0.status", default="") == "done":
60 continue
61 dt = datetime.fromtimestamp(entry["timestamp"], tz=ZoneInfo(TZ))
62 delta = nowdt(TZ) - dt
63 if delta.total_seconds() > SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS * 3600:
64 continue
65 info = await get_youtube_vinfo(vid)
66 if error := info.get("error_msg"):
67 if error in ["❌无法获取此视频信息", "❌私享视频不可下载"]:
68 entry["status"] = "done"
69 await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
70 logger.error(f"获取视频信息失败: {entry['url']} ({error})")
71 continue
72 logger.warning(f"开始下载【{info['author']}】新视频: {info['title']} ({entry['url']})")
73 params: dict = {
74 "url": entry["url"],
75 "platform": "youtube",
76 "vid": vid,
77 "use_db": True,
78 "ytdlp_download_video": True,
79 "ytdlp_send_video": True,
80 "ytdlp_send_audio": False,
81 "youtube_comments": False,
82 "reply_msg_id": -1,
83 "ytdlp_video_target": CHANNEL_YOUTUBE,
84 "ytdlp_audio_target": CHANNEL_YOUTUBE,
85 "ytdlp_send_subtitle": True,
86 "ytdlp_send_summary": True,
87 "asr_engine": "whisper",
88 "to_telegraph": True,
89 "show_author": True,
90 "show_title": True,
91 "show_pubdate": True,
92 "show_statistics": False,
93 "show_description": False,
94 "show_progress": False,
95 }
96 params.update(channel_conf.get("params", {}))
97 msg = Message(id=rand_number(), chat=Chat(id=rand_number()), text=Str("/dl " + entry["url"]))
98 sent_messages = await preview_social_media(client, msg, **params)
99 if any(m for m in sent_messages if m.video): # video message is sent
100 logger.success(f"发送完成: {info['title']} ({entry['url']})")
101 entry["status"] = "done"
102 await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
103 else:
104 logger.warning(f"下载失败: {info['title']} ({entry['url']})")
105
106
107async def save_entry_to_d1(entries: list[dict], records: list[dict]):
108 saved_vids = {x["vid"] for x in records}
109 for entry in entries:
110 vid = entry["yt_videoid"]
111 if vid in saved_vids:
112 continue
113 record = {
114 "timestamp": round(get_pubdate(entry).timestamp()),
115 "channel": entry["yt_channelid"],
116 "vid": vid,
117 "title": entry["title"],
118 "url": f"https://www.youtube.com/watch?v={vid}",
119 "status": "new",
120 }
121 logger.info(f"【D1】保存YouTube视频: {entry['title']}")
122 await query_d1(**insert_d1("youtube", record, update_on_conflict="vid"), silent=True)
123 records.append(record)