main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3"""This module is used to download podcasts and use AI to summarize them.
  4
  5Supported podcasts input format:
  61. Podcast feed urls (PODCAST_FEED_URLS)
  72. OPML file urls (PODCAST_OPML_URLS)
  83. YouTube channel IDs (PODCAST_YOUTUBE_CHANNEL_IDS)
  9
 10For each feed, it will try to:
 111. download the enclosure file
 122. use ASR to transcribe it.
 133. use GPT to summarize it.
 144. upload the enclosure file with subtitles & summaries to Telegram
 155. add summaries to the feed items
 166. upload the feed to CF-R2 or Alist (configured by PODCAST_FS_ENGINE)
 17
 18If the enclosure file is not available (like youtube videos), it will try to use yt-dlp to download it.
 19And do the same thing for the downloaded file.
 20Besides, it will also upload the enclosure file to GitHub Releases
 21"""
 22
 23import contextlib
 24from pathlib import Path
 25from urllib.parse import unquote_plus
 26
 27import xmltodict
 28from glom import Coalesce, glom
 29from loguru import logger
 30from pyrogram.client import Client
 31from pyrogram.types import Chat, Message
 32from pyrogram.types.messages_and_media.message import Str
 33
 34from ai.main import ai_text_generation
 35from config import AI, PODCAST, PREFIX, PROXY
 36from database.github import gh_clean_assets
 37from database.r2 import get_cf_r2, set_cf_r2
 38from messages.sender import send2tg
 39from networking import download_file, hx_req
 40from podcast.asr import get_duration, get_transcripts
 41from podcast.utils import HEADERS, clean_feed_url, feed_saved_target, get_pubdate
 42from podcast.xml import get_feed_title, parse_feed, save_xml, update_xml_desc
 43from preview.bilibili import get_bilibili_vinfo
 44from preview.youtube import get_youtube_vinfo
 45from publish import publish_telegraph
 46from utils import bare_url, convert_html, convert_md, count_subtitles, https_url, nowdt, rand_number, remove_consecutive_newlines, seconds_to_hms, strings_list
 47from ytdlp.download import ytdlp_download
 48
 49
 50async def summary_pods(client: Client):
 51    """Summary podcast RSS feeds."""
 52    pods = await get_feed_url_with_title()
 53    if not pods:
 54        return
 55    for feed_url, feed_title in pods.items():
 56        feed = await parse_feed(feed_url)
 57        if not feed:
 58            continue
 59        processed_xml = await parse_feed(feed_saved_target(feed_url), raw_xml=True)
 60        has_update = False
 61        homepage = clean_feed_url(glom(feed, "feed.link", default=feed_url))
 62        for entry in await get_new_entries(feed_title, feed, processed_xml):
 63            logger.info(f"Updating podcast {feed_title}: {entry['title']}")
 64            message = Message(id=rand_number(), chat=Chat(id=PODCAST.TID))
 65            info = await download_enclosure(entry, cover_url=glom(feed, "feed.image.href", default=""))
 66            if not Path(info["path"]).is_file():
 67                logger.error(f"Failed download podcast {feed_title} -- {entry['title']}")
 68                await send2tg(client, message, texts=f"Failed download podcast {feed_title} -- {entry['title']}", reply_msg_id=-1)
 69                continue
 70            try:
 71                transcripts = await get_transcripts(info["asr_path"], feed_title, feed_url, entry)
 72                if not transcripts:
 73                    continue
 74                duration = await get_duration(info["asr_path"], entry)
 75                duration = seconds_to_hms(duration)
 76                dt = get_pubdate(entry)
 77                pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
 78                caption = f"🎧[{feed_title}]({homepage})\n📝[{entry['title']}]({entry['link']})\n🕒{pubdate}\n{duration} #️⃣字数: {count_subtitles(transcripts)}"
 79                markdown_desc = convert_md(html=glom(entry, Coalesce("content.0.value", "summary"), default=""))
 80                markdown_desc = remove_consecutive_newlines(markdown_desc, newline_level=2)
 81                prompt = f"这是播客栏目《{feed_title}》的一期节目详情:\n节目标题: {entry['title']}\n节目播出日期: {pubdate}"
 82                prompt += f"\n节目时长: {duration}\n节目简介: {markdown_desc}"
 83                prompt += "\n请解读本期节目内容。要求: 直接输出节目内容解读, 以“该节目讲述了”开头"
 84                ai_msg = Message(  # Construct a message for AI
 85                    id=rand_number(),
 86                    chat=message.chat,
 87                    text=Str(f"{PREFIX.AI_TEXT_GENERATION} @{AI.PODCAST_SUMMARY_MODEL_ALIAS} {prompt}"),
 88                    reply_to_message=Message(id=rand_number(), chat=message.chat, text=Str(transcripts)),
 89                )
 90                ai_res = await ai_text_generation(client, ai_msg, silent=True)
 91                telegraph_content = ""
 92                if ai_res.get("texts"):
 93                    telegraph_content += f"\n🤖**{ai_res['model_name']}总结**:\n{ai_res['texts']}"
 94                telegraph_content += f"\n📖**节目简介**:\n {markdown_desc}" if markdown_desc else ""
 95                telegraph_content += f"\n🔤**转录字幕**:\n{transcripts}"
 96
 97                if telegraph_url := await publish_telegraph(title=entry["title"], html=convert_html(telegraph_content), author=feed_title, url=entry["link"]):
 98                    caption += f"\n[🤖总结 & 🔤字幕]({telegraph_url})"
 99
100                media = (
101                    [
102                        {
103                            "audio": info["asr_path"],
104                            "title": entry["title"],
105                            "performer": feed_title,
106                            "thumb": info["thumb"],
107                        }
108                    ]
109                    if Path(info["path"]).suffix in [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
110                    else [{"video": info["path"], "thumb": info["thumb"]}]
111                )
112
113                messages = await send2tg(client, message, texts=caption, media=media, reply_msg_id=-1)
114                processed_xml = await update_xml_desc(feed_url, processed_xml, entry, summary=ai_res.get("texts", ""), audio_path=info["asr_path"])
115                if isinstance(messages[0], Message):
116                    await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"]})
117                has_update = True
118                Path(info["path"]).unlink(missing_ok=True)
119                Path(info["asr_path"]).unlink(missing_ok=True)
120                Path(info["thumb"]).unlink(missing_ok=True) if info["thumb"] else None
121            except Exception as e:
122                logger.error(f"Failed podcast {feed_title} -- {entry['title']}: {e}")
123                await send2tg(client, message, texts=f"Failed podcast {feed_title} -- {entry['title']}: {e}", reply_msg_id=-1)
124                continue
125        if has_update:
126            await save_xml(processed_xml, feed_saved_target(feed_url))
127            await gh_clean_assets(release_name=feed_title, gh_repo=PODCAST.GH_REPO, gh_token=PODCAST.GH_TOKEN, keep_latest=50)
128
129    # save opml
130    opml = {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
131    opml["opml"]["body"]["outline"] = [
132        {
133            "@text": feed_title,
134            "@type": "rss",
135            "@xmlUrl": feed_saved_target(feed_url),
136            "@title": feed_title,
137        }
138        for feed_url, feed_title in pods.items()
139    ]
140    await save_xml(opml, feed_saved_target("opml.xml"))
141    logger.success("Podcast has been updated.")
142
143
144async def get_feed_url_with_title() -> dict[str, str]:
145    """Get all podcast feed urls and titles.
146
147    Returns:
148        dict: {feed_url: title}
149    """
150    # get from Feed Urls
151    pods = {feed_url: await get_feed_title(feed_url) for feed_url in strings_list(PODCAST.FEED_URLS)}
152    # get from OPML
153    for opml in strings_list(PODCAST.OPML_URLS):
154        opml_data = await hx_req(opml, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PROXY.PODCAST)
155        data = {}
156        with contextlib.suppress(Exception):
157            data = xmltodict.parse(opml_data["text"])
158        for feed in glom(data, "opml.body.outline", default=[]):
159            if feed_url := feed.get("@xmlUrl"):
160                pods[feed_url] = feed.get("@title", await get_feed_title(feed_url))
161    # get from YouTube Channel
162    for yt_cid in strings_list(PODCAST.YOUTUBE_CHANNEL_IDS):
163        feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={yt_cid}"
164        pods[feed_url] = await get_feed_title(feed_url)
165    return pods
166
167
168async def get_new_entries(feed_title: str, feed: dict, processed: dict) -> list[dict]:
169    """Get new entries from feed.
170
171    Will skip entries when the following conditions are met:
172    1. the entry link is already processed
173    2. the entry is older than PODCAST.IGNORE_OLD_THAN_SECONDS
174    3. the guid is found on CF-R2
175
176    Args:
177        feed_title (str): feed title
178        feed (dict): feed parsed by feedparser
179        processed (dict): processed feed in raw xml format
180    """
181    try:
182        now = nowdt()
183        new_entries = []
184        processed_links = glom(processed, "rss.channel.item.*.link", default=[])
185        sorted_entries = sorted(feed["entries"], key=lambda x: glom(x, Coalesce("published_parsed", "updated"), default=now), reverse=True)  # new to old
186        for entry in sorted_entries:
187            link = https_url(clean_feed_url(entry.get("link", "")))
188            if link in processed_links:
189                continue
190            entry["link"] = link
191            guid = bare_url(unquote_plus(link))
192            entry["db_key"] = f"Podcast/{feed_title}/{guid}"
193            entry["title"] = entry.get("title", "")
194            dt = get_pubdate(entry)
195            delta = now - dt
196            if delta.total_seconds() > PODCAST.IGNORE_OLD_THAN_SECONDS:
197                continue
198            if "youtube.com" in link:
199                vinfo = await get_youtube_vinfo(entry["yt_videoid"])
200            elif "bilibili.com" in link:
201                vinfo = await get_bilibili_vinfo(link)
202            else:
203                vinfo = {}
204            if vinfo.get("error_msg"):
205                logger.warning(f"Failed to get video info: {vinfo['error_msg']}")
206                continue
207            if not await get_cf_r2(entry["db_key"]):
208                new_entries.append(entry)
209        if new_entries:
210            logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
211    except Exception as e:
212        logger.error(f"Failed to get new entries: {e}")
213        new_entries = []
214    return new_entries[::-1]  # old to new
215
216
217async def download_enclosure(entry: dict, cover_url: str = "") -> dict:
218    """Download enclosure of a single podcast entry.
219
220    If the enclosure link is not found in the entry, it will try to download the link via yt-dlp.
221
222    Returns:
223        dict: {
224            "path": Path, (this path will be send to Telegram)
225            "asr_path": Path, (prefer audio path)
226            "thumb": str,
227            "enclosure": str
228        }
229    """
230    enclosure_url = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
231    placeholder = {"path": "", "asr_path": "", "thumb": None, "enclosure": enclosure_url}
232    if enclosure_url:
233        try:
234            retry = 0
235            path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
236            while not Path(path).is_file():
237                retry += 1
238                path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
239                if retry > 3:
240                    return placeholder
241            thumb_url = glom(entry, "image.href", default="") or cover_url
242            thumb = await download_file(thumb_url, proxy=PROXY.PODCAST)
243            thumb = thumb if Path(thumb).is_file() else None
244        except Exception as e:
245            logger.error(f"Failed to download podcast enclosure: {enclosure_url}\n{e}")
246            return placeholder
247        return {"path": path, "asr_path": path, "thumb": thumb, "enclosure": enclosure_url}
248
249    # download via yt-dlp
250    info = await ytdlp_download(entry["link"], ytdlp_download_video=True)
251    if info["video_path"].is_file() and info["audio_path"].is_file():
252        return {"path": info["video_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
253
254    if info["video_path"].is_file():
255        return {"path": info["video_path"], "asr_path": info["video_path"], "thumb": info["thumb"], "enclosure": ""}
256
257    if info["audio_path"].is_file():
258        return {"path": info["audio_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
259    logger.error(f"Failed download podcast via ytdlp: {entry['link']}")
260    return placeholder