main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import os
  4from datetime import datetime
  5from zoneinfo import ZoneInfo
  6
  7from glom import glom
  8from loguru import logger
  9from pyrogram.client import Client
 10from pyrogram.types import Chat, Message
 11from pyrogram.types.messages_and_media.message import Str
 12
 13from config import TZ, cache
 14from custom.config import CHANNEL_YOUTUBE, SYNC_YOUTUBE_FREQUENCY_SECONDS, SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS
 15from database.d1 import create_d1_table, insert_d1, query_d1
 16from database.kv import get_cf_kv
 17from messages.main import preview_social_media
 18from podcast.utils import get_pubdate
 19from podcast.xml import parse_feed
 20from preview.youtube import get_youtube_vinfo
 21from utils import nowdt, rand_number
 22
 23
 24async def sync_youtube(client: Client):
 25    if os.getenv("SYNC_YOUTUBE_DISABLED", "0") == "1":
 26        return
 27    if cache.get("sync_youtube"):
 28        return
 29    cache.set("sync_youtube", 1, ttl=SYNC_YOUTUBE_FREQUENCY_SECONDS)
 30    kv = await get_cf_kv("youtube", silent=True)
 31    if not kv.get("data"):
 32        return
 33    await create_d1_table(
 34        table_name="youtube",
 35        columns="vid TEXT PRIMARY KEY, timestamp INTEGER, channel TEXT, title TEXT, url TEXT, status TEXT",
 36        idx_cols=["vid", "timestamp", "channel", "status"],
 37        silent=True,
 38    )
 39    for conf in kv["data"]:
 40        await sync_one_channel(client, conf)
 41
 42
 43async def sync_one_channel(client: Client, channel_conf: dict):
 44    channel_id = channel_conf["channel_id"]
 45    feed = await parse_feed(f"https://cors.zydou.me/https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}")
 46    entries = feed.get("entries", [])
 47    d1 = await query_d1(sql=f"SELECT * FROM youtube WHERE channel = '{channel_id}' ORDER BY timestamp DESC LIMIT 20", silent=True)
 48    if "hx_error" in d1:
 49        logger.error(f"【D1】查询YouTube视频失败: {channel_conf['title']} ({d1['hx_error']})")
 50        return
 51    records = glom(d1, "result.0.results", default=[])
 52    await save_entry_to_d1(entries, records)
 53    for entry in sorted(records, key=lambda x: x["timestamp"]):  # old to new
 54        vid = entry["vid"]
 55        if entry["status"] == "done":
 56            continue
 57        # check again
 58        d1 = await query_d1(sql=f"SELECT timestamp FROM youtube WHERE vid = '{vid}'", silent=True)
 59        if glom(d1, "result.0.results.0.status", default="") == "done":
 60            continue
 61        dt = datetime.fromtimestamp(entry["timestamp"], tz=ZoneInfo(TZ))
 62        delta = nowdt(TZ) - dt
 63        if delta.total_seconds() > SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS * 3600:
 64            continue
 65        info = await get_youtube_vinfo(vid)
 66        if error := info.get("error_msg"):
 67            if error in ["❌无法获取此视频信息", "❌私享视频不可下载"]:
 68                entry["status"] = "done"
 69                await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
 70            logger.error(f"获取视频信息失败: {entry['url']} ({error})")
 71            continue
 72        logger.warning(f"开始下载【{info['author']}】新视频: {info['title']} ({entry['url']})")
 73        params: dict = {
 74            "url": entry["url"],
 75            "platform": "youtube",
 76            "vid": vid,
 77            "use_db": True,
 78            "ytdlp_download_video": True,
 79            "ytdlp_send_video": True,
 80            "ytdlp_send_audio": False,
 81            "youtube_comments": False,
 82            "reply_msg_id": -1,
 83            "ytdlp_video_target": CHANNEL_YOUTUBE,
 84            "ytdlp_audio_target": CHANNEL_YOUTUBE,
 85            "ytdlp_send_subtitle": True,
 86            "ytdlp_send_summary": True,
 87            "asr_engine": "whisper",
 88            "to_telegraph": True,
 89            "show_author": True,
 90            "show_title": True,
 91            "show_pubdate": True,
 92            "show_statistics": False,
 93            "show_description": False,
 94            "show_progress": False,
 95        }
 96        params.update(channel_conf.get("params", {}))
 97        msg = Message(id=rand_number(), chat=Chat(id=rand_number()), text=Str("/dl " + entry["url"]))
 98        sent_messages = await preview_social_media(client, msg, **params)
 99        if any(m for m in sent_messages if m.video):  # video message is sent
100            logger.success(f"发送完成: {info['title']} ({entry['url']})")
101            entry["status"] = "done"
102            await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
103        else:
104            logger.warning(f"下载失败: {info['title']} ({entry['url']})")
105
106
107async def save_entry_to_d1(entries: list[dict], records: list[dict]):
108    saved_vids = {x["vid"] for x in records}
109    for entry in entries:
110        vid = entry["yt_videoid"]
111        if vid in saved_vids:
112            continue
113        record = {
114            "timestamp": round(get_pubdate(entry).timestamp()),
115            "channel": entry["yt_channelid"],
116            "vid": vid,
117            "title": entry["title"],
118            "url": f"https://www.youtube.com/watch?v={vid}",
119            "status": "new",
120        }
121        logger.info(f"【D1】保存YouTube视频: {entry['title']}")
122        await query_d1(**insert_d1("youtube", record, update_on_conflict="vid"), silent=True)
123        records.append(record)