main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3"""This module is used to download podcasts and use AI to summarize them.
  4
  5Supported podcasts input format:
  61. Podcast feed urls (PODCAST_FEED_URLS)
  72. OPML file urls (PODCAST_OPML_URLS)
  83. YouTube channel IDs (PODCAST_YOUTUBE_CHANNEL_IDS)
  9
 10For each feed, it will try to:
 111. download the enclosure file
 122. use ASR to transcribe it.
 133. use GPT to summarize it.
 144. upload the enclosure file with subtitles & summaries to Telegram
 155. add summaries to the feed items
 166. upload the feed to CF-R2 or Alist (configured by PODCAST_FS_ENGINE)
 17
 18If the enclosure file is not available (like youtube videos), it will try to use yt-dlp to download it.
 19And do the same thing for the downloaded file.
 20Besides, it will also upload the enclosure file to GitHub Releases
 21"""
 22
 23import contextlib
 24from pathlib import Path
 25from urllib.parse import unquote_plus
 26
 27import xmltodict
 28from glom import Coalesce, glom
 29from loguru import logger
 30from pyrogram.client import Client
 31from pyrogram.types import Chat, Message
 32
 33from config import AI, PODCAST, PROXY
 34from database.github import gh_clean_assets
 35from database.r2 import get_cf_r2, set_cf_r2
 36from messages.sender import send2tg
 37from networking import download_file, hx_req
 38from podcast.asr import get_duration, get_transcripts
 39from podcast.utils import HEADERS, clean_feed_url, feed_saved_target, get_pubdate
 40from podcast.xml import get_feed_title, parse_feed, save_xml, update_xml_desc
 41from preview.bilibili import get_bilibili_vinfo
 42from preview.youtube import get_youtube_vinfo
 43from summarize.summarize import summarize
 44from utils import bare_url, convert2html, count_subtitles, https_url, nowdt, rand_number, seconds_to_hms, strings_list
 45from ytdlp.download import ytdlp_download
 46
 47
 48async def summary_pods(client: Client):
 49    """Summary podcast RSS feeds."""
 50    pods = await get_feed_url_with_title()  # {feed_url: title}
 51    if not pods:
 52        return
 53    # pods = {"https://feed.xyzfm.space/q88qwmydeuw8": "声动早咖啡"}  # for debug
 54    for feed_url, feed_title in pods.items():
 55        feed = await parse_feed(feed_url)
 56        if not feed:
 57            continue
 58        processed_xml = await parse_feed(feed_saved_target(feed_url), raw_xml=True)
 59        has_update = False
 60        homepage = clean_feed_url(glom(feed, "feed.link", default=feed_url))
 61        for entry in await get_new_entries(feed_title, feed, processed_xml):
 62            logger.info(f"Updating podcast {feed_title}: {entry['title']}")
 63            message = Message(id=rand_number(), chat=Chat(id=PODCAST.TID))
 64            info = await download_enclosure(entry, cover_url=glom(feed, "feed.image.href", default=""))
 65            if not Path(info["path"]).is_file():
 66                logger.error(f"Failed download podcast {feed_title} -- {entry['title']}")
 67                await send2tg(client, message, texts=f"Failed download podcast {feed_title} -- {entry['title']}", reply_msg_id=-1)
 68                continue
 69            try:
 70                transcripts = await get_transcripts(info["asr_path"], feed_title, feed_url, entry)
 71                if not transcripts:
 72                    continue
 73                duration = await get_duration(info["asr_path"], entry)
 74                duration = seconds_to_hms(duration)
 75                dt = get_pubdate(entry)
 76                pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
 77                caption = f"🎧[{feed_title}]({homepage})\n📝[{entry['title']}]({entry['link']})\n🕒{pubdate}\n{duration} #️⃣字数: {count_subtitles(transcripts)}"
 78                desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
 79                desc_html = desc if desc.startswith("<") else convert2html(desc)
 80                enclosure_url = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
 81                enclosure_mime = next((x["type"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "audio/mp4")
 82                desc_html = f'<audio controls preload="metadata" style="width:100%;"><source src="{enclosure_url}" type="{enclosure_mime}">该浏览器不支持音频播放</audio>\n' + desc_html
 83                prompt = f"该转录稿对应于播客栏目《{feed_title}》的一期节目,节目详情:\n标题: {entry['title']}\n日期: {pubdate}\n时长: {duration}\n节目简介: {desc}"
 84                summary = await summarize(
 85                    sources=[{"type": "system_prompt", "text": prompt}, {"type": "transcripts", "text": transcripts}],
 86                    model=AI.PODCAST_SUMMARY_MODEL_ALIAS,
 87                    title=entry["title"],
 88                    author=feed_title,
 89                    url=entry["link"],
 90                    date=dt,
 91                    description={"emoji": "🎧", "name": "播客详情", "html": desc_html},
 92                    ttl="forever",
 93                )
 94                if telegraph_url := summary.get("telegraph_url"):
 95                    caption += f"\n[🤖AI导读]({telegraph_url})"
 96                media = (
 97                    [{"audio": info["asr_path"], "title": entry["title"], "performer": feed_title, "thumb": info["thumb"]}]
 98                    if Path(info["path"]).suffix in [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
 99                    else [{"video": info["path"], "thumb": info["thumb"]}]
100                )
101
102                messages = await send2tg(client, message, texts=caption, media=media, reply_msg_id=-1)
103                processed_xml = await update_xml_desc(feed_url, processed_xml, entry, summary=summary.get("texts", ""), audio_path=info["asr_path"])
104                if isinstance(messages[0], Message):
105                    await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"]})
106                has_update = True
107                Path(info["path"]).unlink(missing_ok=True)
108                Path(info["asr_path"]).unlink(missing_ok=True)
109                Path(info["thumb"]).unlink(missing_ok=True) if info["thumb"] else None
110            except Exception as e:
111                logger.error(f"Failed podcast {feed_title} -- {entry['title']}: {e}")
112                await send2tg(client, message, texts=f"Failed podcast {feed_title} -- {entry['title']}: {e}", reply_msg_id=-1)
113                continue
114        if has_update:
115            await save_xml(processed_xml, feed_saved_target(feed_url))
116            await gh_clean_assets(release_name=feed_title, gh_repo=PODCAST.GH_REPO, gh_token=PODCAST.GH_TOKEN, keep_latest=50)
117
118    # save opml
119    opml = {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
120    opml["opml"]["body"]["outline"] = [
121        {
122            "@text": feed_title,
123            "@type": "rss",
124            "@xmlUrl": feed_saved_target(feed_url),
125            "@title": feed_title,
126        }
127        for feed_url, feed_title in pods.items()
128    ]
129    await save_xml(opml, feed_saved_target("opml.xml"))
130    logger.success("Podcast has been updated.")
131
132
133async def get_feed_url_with_title() -> dict[str, str]:
134    """Get all podcast feed urls and titles.
135
136    Returns:
137        dict: {feed_url: title}
138    """
139    # get from Feed Urls
140    pods = {feed_url: await get_feed_title(feed_url) for feed_url in strings_list(PODCAST.FEED_URLS)}
141    # get from OPML
142    for opml in strings_list(PODCAST.OPML_URLS):
143        opml_data = await hx_req(opml, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PROXY.PODCAST)
144        data = {}
145        with contextlib.suppress(Exception):
146            data = xmltodict.parse(opml_data["text"])
147        for feed in glom(data, "opml.body.outline", default=[]):
148            if feed_url := feed.get("@xmlUrl"):
149                pods[feed_url] = feed.get("@title", await get_feed_title(feed_url))
150    # get from YouTube Channel
151    for yt_cid in strings_list(PODCAST.YOUTUBE_CHANNEL_IDS):
152        feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={yt_cid}"
153        pods[feed_url] = await get_feed_title(feed_url)
154    return pods
155
156
157async def get_new_entries(feed_title: str, feed: dict, processed: dict) -> list[dict]:
158    """Get new entries from feed.
159
160    Will skip entries when the following conditions are met:
161    1. the entry link is already processed
162    2. the entry is older than PODCAST.IGNORE_OLD_THAN_SECONDS
163    3. the guid is found on CF-R2
164
165    Args:
166        feed_title (str): feed title
167        feed (dict): feed parsed by feedparser
168        processed (dict): processed feed in raw xml format
169    """
170    try:
171        now = nowdt()
172        new_entries = []
173        processed_links = glom(processed, "rss.channel.item.*.link", default=[])
174        sorted_entries = sorted(feed["entries"], key=lambda x: glom(x, Coalesce("published_parsed", "updated"), default=now), reverse=True)  # new to old
175        for entry in sorted_entries:
176            link = https_url(clean_feed_url(entry.get("link", "")))
177            if link in processed_links:
178                continue
179            entry["link"] = link
180            guid = bare_url(unquote_plus(link))
181            entry["db_key"] = f"Podcast/{feed_title}/{guid}"
182            entry["title"] = entry.get("title", "")
183            dt = get_pubdate(entry)
184            delta = now - dt
185            if delta.total_seconds() > PODCAST.IGNORE_OLD_THAN_SECONDS:
186                continue
187            if "youtube.com" in link:
188                vinfo = await get_youtube_vinfo(entry["yt_videoid"])
189            elif "bilibili.com" in link:
190                vinfo = await get_bilibili_vinfo(link)
191            else:
192                vinfo = {}
193            if vinfo.get("error_msg"):
194                logger.warning(f"Failed to get video info: {vinfo['error_msg']}")
195                continue
196            if not await get_cf_r2(entry["db_key"]):
197                new_entries.append(entry)
198        if new_entries:
199            logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
200    except Exception as e:
201        logger.error(f"Failed to get new entries: {e}")
202        new_entries = []
203    return new_entries[::-1]  # old to new
204
205
206async def download_enclosure(entry: dict, cover_url: str = "") -> dict:
207    """Download enclosure of a single podcast entry.
208
209    If the enclosure link is not found in the entry, it will try to download the link via yt-dlp.
210
211    Returns:
212        dict: {
213            "path": Path, (this path will be send to Telegram)
214            "asr_path": Path, (prefer audio path)
215            "thumb": str,
216            "enclosure": str
217        }
218    """
219    enclosure_url = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
220    placeholder = {"path": "", "asr_path": "", "thumb": None, "enclosure": enclosure_url}
221    if enclosure_url:
222        try:
223            retry = 0
224            path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
225            while not Path(path).is_file():
226                retry += 1
227                path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
228                if retry > 3:
229                    return placeholder
230            thumb_url = glom(entry, "image.href", default="") or cover_url
231            thumb = await download_file(thumb_url, proxy=PROXY.PODCAST)
232            thumb = thumb if Path(thumb).is_file() else None
233        except Exception as e:
234            logger.error(f"Failed to download podcast enclosure: {enclosure_url}\n{e}")
235            return placeholder
236        return {"path": path, "asr_path": path, "thumb": thumb, "enclosure": enclosure_url}
237
238    # download via yt-dlp
239    info = await ytdlp_download(entry["link"], ytdlp_download_video=True)
240    if info["video_path"].is_file() and info["audio_path"].is_file():
241        return {"path": info["video_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
242
243    if info["video_path"].is_file():
244        return {"path": info["video_path"], "asr_path": info["video_path"], "thumb": info["thumb"], "enclosure": ""}
245
246    if info["audio_path"].is_file():
247        return {"path": info["audio_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
248    logger.error(f"Failed download podcast via ytdlp: {entry['link']}")
249    return placeholder