main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3"""This module is used to download podcasts and use AI to summarize them.
4
5Supported podcasts input format:
61. Podcast feed urls (PODCAST_FEED_URLS)
72. OPML file urls (PODCAST_OPML_URLS)
83. YouTube channel IDs (PODCAST_YOUTUBE_CHANNEL_IDS)
9
10For each feed, it will try to:
111. download the enclosure file
122. use ASR to transcribe it.
133. use GPT to summarize it.
144. upload the enclosure file with subtitles & summaries to Telegram
155. add summaries to the feed items
166. upload the feed to CF-R2 or Alist (configured by PODCAST_FS_ENGINE)
17
18If the enclosure file is not available (like youtube videos), it will try to use yt-dlp to download it.
19And do the same thing for the downloaded file.
20Besides, it will also upload the enclosure file to GitHub Releases
21"""
22
23import contextlib
24from pathlib import Path
25from urllib.parse import unquote_plus
26
27import xmltodict
28from glom import Coalesce, glom
29from loguru import logger
30from pyrogram.client import Client
31from pyrogram.types import Chat, Message
32from pyrogram.types.messages_and_media.message import Str
33
34from ai.main import ai_text_generation
35from config import AI, PODCAST, PREFIX, PROXY
36from database.github import gh_clean_assets
37from database.r2 import get_cf_r2, set_cf_r2
38from messages.sender import send2tg
39from networking import download_file, hx_req
40from podcast.asr import get_duration, get_transcripts
41from podcast.utils import HEADERS, clean_feed_url, feed_saved_target, get_pubdate
42from podcast.xml import get_feed_title, parse_feed, save_xml, update_xml_desc
43from preview.bilibili import get_bilibili_vinfo
44from preview.youtube import get_youtube_vinfo
45from publish import publish_telegraph
46from utils import bare_url, convert_html, convert_md, count_subtitles, https_url, nowdt, rand_number, remove_consecutive_newlines, seconds_to_hms, strings_list
47from ytdlp.download import ytdlp_download
48
49
50async def summary_pods(client: Client):
51 """Summary podcast RSS feeds."""
52 pods = await get_feed_url_with_title()
53 if not pods:
54 return
55 for feed_url, feed_title in pods.items():
56 feed = await parse_feed(feed_url)
57 if not feed:
58 continue
59 processed_xml = await parse_feed(feed_saved_target(feed_url), raw_xml=True)
60 has_update = False
61 homepage = clean_feed_url(glom(feed, "feed.link", default=feed_url))
62 for entry in await get_new_entries(feed_title, feed, processed_xml):
63 logger.info(f"Updating podcast {feed_title}: {entry['title']}")
64 message = Message(id=rand_number(), chat=Chat(id=PODCAST.TID))
65 info = await download_enclosure(entry, cover_url=glom(feed, "feed.image.href", default=""))
66 if not Path(info["path"]).is_file():
67 logger.error(f"Failed download podcast {feed_title} -- {entry['title']}")
68 await send2tg(client, message, texts=f"Failed download podcast {feed_title} -- {entry['title']}", reply_msg_id=-1)
69 continue
70 try:
71 transcripts = await get_transcripts(info["asr_path"], feed_title, feed_url, entry)
72 if not transcripts:
73 continue
74 duration = await get_duration(info["asr_path"], entry)
75 duration = seconds_to_hms(duration)
76 dt = get_pubdate(entry)
77 pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
78 caption = f"🎧[{feed_title}]({homepage})\n📝[{entry['title']}]({entry['link']})\n🕒{pubdate}\n⏳{duration} #️⃣字数: {count_subtitles(transcripts)}"
79 markdown_desc = convert_md(html=glom(entry, Coalesce("content.0.value", "summary"), default=""))
80 markdown_desc = remove_consecutive_newlines(markdown_desc, newline_level=2)
81 prompt = f"这是播客栏目《{feed_title}》的一期节目详情:\n节目标题: {entry['title']}\n节目播出日期: {pubdate}"
82 prompt += f"\n节目时长: {duration}\n节目简介: {markdown_desc}"
83 prompt += "\n请解读本期节目内容。要求: 直接输出节目内容解读, 以“该节目讲述了”开头"
84 ai_msg = Message( # Construct a message for AI
85 id=rand_number(),
86 chat=message.chat,
87 text=Str(f"{PREFIX.AI_TEXT_GENERATION} @{AI.PODCAST_SUMMARY_MODEL_ALIAS} {prompt}"),
88 reply_to_message=Message(id=rand_number(), chat=message.chat, text=Str(transcripts)),
89 )
90 ai_res = await ai_text_generation(client, ai_msg, silent=True)
91 telegraph_content = ""
92 if ai_res.get("texts"):
93 telegraph_content += f"\n🤖**{ai_res['model_name']}总结**:\n{ai_res['texts']}"
94 telegraph_content += f"\n📖**节目简介**:\n {markdown_desc}" if markdown_desc else ""
95 telegraph_content += f"\n🔤**转录字幕**:\n{transcripts}"
96
97 if telegraph_url := await publish_telegraph(title=entry["title"], html=convert_html(telegraph_content), author=feed_title, url=entry["link"]):
98 caption += f"\n[🤖总结 & 🔤字幕]({telegraph_url})"
99
100 media = (
101 [
102 {
103 "audio": info["asr_path"],
104 "title": entry["title"],
105 "performer": feed_title,
106 "thumb": info["thumb"],
107 }
108 ]
109 if Path(info["path"]).suffix in [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
110 else [{"video": info["path"], "thumb": info["thumb"]}]
111 )
112
113 messages = await send2tg(client, message, texts=caption, media=media, reply_msg_id=-1)
114 processed_xml = await update_xml_desc(feed_url, processed_xml, entry, summary=ai_res.get("texts", ""), audio_path=info["asr_path"])
115 if isinstance(messages[0], Message):
116 await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"]})
117 has_update = True
118 Path(info["path"]).unlink(missing_ok=True)
119 Path(info["asr_path"]).unlink(missing_ok=True)
120 Path(info["thumb"]).unlink(missing_ok=True) if info["thumb"] else None
121 except Exception as e:
122 logger.error(f"Failed podcast {feed_title} -- {entry['title']}: {e}")
123 await send2tg(client, message, texts=f"Failed podcast {feed_title} -- {entry['title']}: {e}", reply_msg_id=-1)
124 continue
125 if has_update:
126 await save_xml(processed_xml, feed_saved_target(feed_url))
127 await gh_clean_assets(release_name=feed_title, gh_repo=PODCAST.GH_REPO, gh_token=PODCAST.GH_TOKEN, keep_latest=50)
128
129 # save opml
130 opml = {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
131 opml["opml"]["body"]["outline"] = [
132 {
133 "@text": feed_title,
134 "@type": "rss",
135 "@xmlUrl": feed_saved_target(feed_url),
136 "@title": feed_title,
137 }
138 for feed_url, feed_title in pods.items()
139 ]
140 await save_xml(opml, feed_saved_target("opml.xml"))
141 logger.success("Podcast has been updated.")
142
143
144async def get_feed_url_with_title() -> dict[str, str]:
145 """Get all podcast feed urls and titles.
146
147 Returns:
148 dict: {feed_url: title}
149 """
150 # get from Feed Urls
151 pods = {feed_url: await get_feed_title(feed_url) for feed_url in strings_list(PODCAST.FEED_URLS)}
152 # get from OPML
153 for opml in strings_list(PODCAST.OPML_URLS):
154 opml_data = await hx_req(opml, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PROXY.PODCAST)
155 data = {}
156 with contextlib.suppress(Exception):
157 data = xmltodict.parse(opml_data["text"])
158 for feed in glom(data, "opml.body.outline", default=[]):
159 if feed_url := feed.get("@xmlUrl"):
160 pods[feed_url] = feed.get("@title", await get_feed_title(feed_url))
161 # get from YouTube Channel
162 for yt_cid in strings_list(PODCAST.YOUTUBE_CHANNEL_IDS):
163 feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={yt_cid}"
164 pods[feed_url] = await get_feed_title(feed_url)
165 return pods
166
167
168async def get_new_entries(feed_title: str, feed: dict, processed: dict) -> list[dict]:
169 """Get new entries from feed.
170
171 Will skip entries when the following conditions are met:
172 1. the entry link is already processed
173 2. the entry is older than PODCAST.IGNORE_OLD_THAN_SECONDS
174 3. the guid is found on CF-R2
175
176 Args:
177 feed_title (str): feed title
178 feed (dict): feed parsed by feedparser
179 processed (dict): processed feed in raw xml format
180 """
181 try:
182 now = nowdt()
183 new_entries = []
184 processed_links = glom(processed, "rss.channel.item.*.link", default=[])
185 sorted_entries = sorted(feed["entries"], key=lambda x: glom(x, Coalesce("published_parsed", "updated"), default=now), reverse=True) # new to old
186 for entry in sorted_entries:
187 link = https_url(clean_feed_url(entry.get("link", "")))
188 if link in processed_links:
189 continue
190 entry["link"] = link
191 guid = bare_url(unquote_plus(link))
192 entry["db_key"] = f"Podcast/{feed_title}/{guid}"
193 entry["title"] = entry.get("title", "")
194 dt = get_pubdate(entry)
195 delta = now - dt
196 if delta.total_seconds() > PODCAST.IGNORE_OLD_THAN_SECONDS:
197 continue
198 if "youtube.com" in link:
199 vinfo = await get_youtube_vinfo(entry["yt_videoid"])
200 elif "bilibili.com" in link:
201 vinfo = await get_bilibili_vinfo(link)
202 else:
203 vinfo = {}
204 if vinfo.get("error_msg"):
205 logger.warning(f"Failed to get video info: {vinfo['error_msg']}")
206 continue
207 if not await get_cf_r2(entry["db_key"]):
208 new_entries.append(entry)
209 if new_entries:
210 logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
211 except Exception as e:
212 logger.error(f"Failed to get new entries: {e}")
213 new_entries = []
214 return new_entries[::-1] # old to new
215
216
217async def download_enclosure(entry: dict, cover_url: str = "") -> dict:
218 """Download enclosure of a single podcast entry.
219
220 If the enclosure link is not found in the entry, it will try to download the link via yt-dlp.
221
222 Returns:
223 dict: {
224 "path": Path, (this path will be send to Telegram)
225 "asr_path": Path, (prefer audio path)
226 "thumb": str,
227 "enclosure": str
228 }
229 """
230 enclosure_url = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
231 placeholder = {"path": "", "asr_path": "", "thumb": None, "enclosure": enclosure_url}
232 if enclosure_url:
233 try:
234 retry = 0
235 path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
236 while not Path(path).is_file():
237 retry += 1
238 path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
239 if retry > 3:
240 return placeholder
241 thumb_url = glom(entry, "image.href", default="") or cover_url
242 thumb = await download_file(thumb_url, proxy=PROXY.PODCAST)
243 thumb = thumb if Path(thumb).is_file() else None
244 except Exception as e:
245 logger.error(f"Failed to download podcast enclosure: {enclosure_url}\n{e}")
246 return placeholder
247 return {"path": path, "asr_path": path, "thumb": thumb, "enclosure": enclosure_url}
248
249 # download via yt-dlp
250 info = await ytdlp_download(entry["link"], ytdlp_download_video=True)
251 if info["video_path"].is_file() and info["audio_path"].is_file():
252 return {"path": info["video_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
253
254 if info["video_path"].is_file():
255 return {"path": info["video_path"], "asr_path": info["video_path"], "thumb": info["thumb"], "enclosure": ""}
256
257 if info["audio_path"].is_file():
258 return {"path": info["audio_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
259 logger.error(f"Failed download podcast via ytdlp: {entry['link']}")
260 return placeholder