main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import os
  4from datetime import datetime
  5from zoneinfo import ZoneInfo
  6
  7from glom import glom
  8from loguru import logger
  9from pyrogram.client import Client
 10from pyrogram.types import Chat, Message
 11from pyrogram.types.messages_and_media.message import Str
 12
 13from config import TZ, cache
 14from custom.config import CHANNEL_YOUTUBE, SYNC_YOUTUBE_FREQUENCY_SECONDS, SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS
 15from database.d1 import create_d1_table, insert_d1, query_d1
 16from messages.main import preview_social_media
 17from podcast.utils import get_pubdate
 18from podcast.xml import parse_feed
 19from preview.youtube import get_youtube_vinfo
 20from utils import nowdt, rand_number
 21
 22CHANNELS = [
 23    {
 24        "title": "陈一发儿",
 25        "channel_id": "UC7QVieoTCNwwW84G0bddXpA",
 26        "params": {
 27            "ytdlp_send_audio": True,
 28            "ytdlp_send_subtitle": False,
 29            "summary_ytdlp": False,
 30            "show_author": False,
 31            "ytdlp_video_target": -1001738950624,
 32            "ytdlp_audio_target": -1001738950624,
 33        },
 34    },
 35    {"title": "阿发bot", "channel_id": "UCvAD1cWOL6Cp0pKWTSYDZuQ"},
 36    {"title": "小德MOMO", "channel_id": "UCp6azASL-FiSDSP4lZiJpQg"},
 37    {"title": "我发超可爱呢", "channel_id": "UCyM6vUOsD7C23VUvXXzebRg"},
 38    {"title": "尼古拉斯狗蛋儿", "channel_id": "UCT4JqjPvX3kE_sBxVkGWMBQ"},
 39    {"title": "小米轰炸姬", "channel_id": "UCIlHqEYm2JIN80KCT7dxp8g"},
 40    # {"title": "中国数字时代", "channel_id": "UCwXewCWwaK1-yec8niJLrqg", "params": {"summary_ytdlp_model": "gemini-free"}},
 41    {"title": "中国数字时代", "channel_id": "UCwXewCWwaK1-yec8niJLrqg", "params": {"summary_ytdlp_model": "gemini-free"}},
 42    {"title": "柴静 Chai Jing", "channel_id": "UCjuNibFJ21MiSNpu8LZyV4w", "params": {"summary_ytdlp_model": "gemini-free"}},
 43    {"title": "大脸撑在小胸上", "channel_id": "UCv8djBlOdCZWZ-7Nal-3pJQ"},
 44    {"title": "二爷故事", "channel_id": "UCapPWG_ppEuYvelriReTcOA", "params": {"summary_ytdlp_model": "gemini-free"}},
 45    {"title": "LIFEANO CLUB", "channel_id": "UC5xunxPS6oZ1zzKufgREFuA"},
 46    {"title": "蘭森環遊世界", "channel_id": "UCuPEBfHpW4hILoGuKfYKTnw"},
 47    {"title": "蘭森世界觀", "channel_id": "UCZ9TdlZSewgz3VLpDcAzDaQ"},
 48    {"title": "老梁", "channel_id": "UCr_F4Y9iboUKlg_ZPm4jkVQ"},
 49    {"title": "老周横眉", "channel_id": "UCFDMMIHbtRdrVhHlGXVzApA", "params": {"summary_ytdlp_model": "gemini-free"}},
 50    {"title": "李老师不是你老师", "channel_id": "UCrMjr7dY8syS_m9TdqM-g_Q", "params": {"summary_ytdlp_model": "gemini-free"}},
 51    {"title": "多伦多方脸", "channel_id": "UCzYYzigb1vXR0GQXXBja2kg", "params": {"summary_ytdlp_model": "gemini-free"}},
 52    {"title": "小敌台", "channel_id": "UCMlSfmeXjgTt-NDSmkrpy0A"},
 53    {"title": "JJ的显微万花筒", "channel_id": "UCTzS8sDi26INKMHa9GiMr4g"},
 54]
 55
 56
 57async def sync_youtube(client: Client):
 58    if os.getenv("SYNC_YOUTUBE_DISABLED", "0") == "1":
 59        return
 60    if cache.get("sync_youtube"):
 61        return
 62    cache.set("sync_youtube", 1, ttl=SYNC_YOUTUBE_FREQUENCY_SECONDS)
 63    await create_d1_table(
 64        table_name="youtube",
 65        columns="vid TEXT PRIMARY KEY, timestamp INTEGER, channel TEXT, title TEXT, url TEXT, status TEXT",
 66        idx_cols=["vid", "timestamp", "channel", "status"],
 67        silent=True,
 68    )
 69    for channel in CHANNELS:
 70        await sync_one_channel(client, channel)
 71
 72
 73async def sync_one_channel(client: Client, channel_conf: dict):
 74    channel_id = channel_conf["channel_id"]
 75    feed = await parse_feed(f"https://cors.zydou.me/https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}")
 76    entries = feed.get("entries", [])
 77    d1 = await query_d1(sql=f"SELECT * FROM youtube WHERE channel = '{channel_id}' ORDER BY timestamp DESC LIMIT 20", silent=True)
 78    if "hx_error" in d1:
 79        logger.error(f"【D1】查询YouTube视频失败: {channel_conf['title']} ({d1['hx_error']})")
 80        return
 81    records = glom(d1, "result.0.results", default=[])
 82    await save_entry_to_d1(entries, records)
 83    for entry in sorted(records, key=lambda x: x["timestamp"]):  # old to new
 84        vid = entry["vid"]
 85        if entry["status"] == "done":
 86            continue
 87        # check again
 88        d1 = await query_d1(sql=f"SELECT timestamp FROM youtube WHERE vid = '{vid}'", silent=True)
 89        if glom(d1, "result.0.results.0.status", default="") == "done":
 90            continue
 91        dt = datetime.fromtimestamp(entry["timestamp"], tz=ZoneInfo(TZ))
 92        delta = nowdt(TZ) - dt
 93        if delta.total_seconds() > SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS * 3600:
 94            continue
 95        info = await get_youtube_vinfo(vid)
 96        if error := info.get("error_msg"):
 97            if error in ["❌无法获取此视频信息", "❌私享视频不可下载"]:
 98                entry["status"] = "done"
 99                await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
100            logger.error(f"获取视频信息失败: {entry['url']} ({error})")
101            continue
102        logger.warning(f"开始下载【{info['author']}】新视频: {info['title']} ({entry['url']})")
103        params: dict = {
104            "url": entry["url"],
105            "platform": "youtube",
106            "vid": vid,
107            "use_db": True,
108            "ytdlp_download_video": True,
109            "ytdlp_send_video": True,
110            "ytdlp_send_audio": False,
111            "youtube_comments": False,
112            "reply_msg_id": -1,
113            "ytdlp_video_target": CHANNEL_YOUTUBE,
114            "ytdlp_audio_target": CHANNEL_YOUTUBE,
115            "ytdlp_send_subtitle": True,
116            "summary_ytdlp": True,
117            "asr_engine": "whisper",
118            "to_telegraph": True,
119            "show_author": True,
120            "show_title": True,
121            "show_pubdate": True,
122            "show_statistics": False,
123            "show_description": False,
124            "show_progress": False,
125            "summary_ytdlp_model": "general",
126        }
127        params.update(channel_conf.get("params", {}))
128        msg = Message(id=rand_number(), chat=Chat(id=rand_number()), text=Str("/dl " + entry["url"]))
129        sent_messages = await preview_social_media(client, msg, **params)
130        if any(m for m in sent_messages if m.video):  # video message is sent
131            logger.success(f"发送完成: {info['title']} ({entry['url']})")
132            entry["status"] = "done"
133            await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
134        else:
135            logger.warning(f"下载失败: {info['title']} ({entry['url']})")
136
137
138async def save_entry_to_d1(entries: list[dict], records: list[dict]):
139    saved_vids = {x["vid"] for x in records}
140    for entry in entries:
141        vid = entry["yt_videoid"]
142        if vid in saved_vids:
143            continue
144        record = {
145            "timestamp": round(get_pubdate(entry).timestamp()),
146            "channel": entry["yt_channelid"],
147            "vid": vid,
148            "title": entry["title"],
149            "url": f"https://www.youtube.com/watch?v={vid}",
150            "status": "new",
151        }
152        logger.info(f"【D1】保存YouTube视频: {entry['title']}")
153        await query_d1(**insert_d1("youtube", record, update_on_conflict="vid"), silent=True)
154        records.append(record)