main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3"""This module is used to download podcasts and use AI to summarize them.
4
5Supported podcasts input format:
61. Podcast feed urls (PODCAST_FEED_URLS)
72. OPML file urls (PODCAST_OPML_URLS)
83. YouTube channel IDs (PODCAST_YOUTUBE_CHANNEL_IDS)
9
10For each feed, it will try to:
111. download the enclosure file
122. use ASR to transcribe it.
133. use GPT to summarize it.
144. upload the enclosure file with subtitles & summaries to Telegram
155. add summaries to the feed items
166. upload the feed to CF-R2 or Alist (configured by PODCAST_FS_ENGINE)
17
18If the enclosure file is not available (like youtube videos), it will try to use yt-dlp to download it.
19And do the same thing for the downloaded file.
20Besides, it will also upload the enclosure file to GitHub Releases
21"""
22
23import contextlib
24from pathlib import Path
25from urllib.parse import unquote_plus
26
27import xmltodict
28from glom import Coalesce, glom
29from loguru import logger
30from pyrogram.client import Client
31from pyrogram.types import Chat, Message
32
33from config import AI, PODCAST, PROXY
34from database.github import gh_clean_assets
35from database.r2 import get_cf_r2, set_cf_r2
36from messages.sender import send2tg
37from networking import download_file, hx_req
38from podcast.asr import get_duration, get_transcripts
39from podcast.utils import HEADERS, clean_feed_url, feed_saved_target, get_pubdate
40from podcast.xml import get_feed_title, parse_feed, save_xml, update_xml_desc
41from preview.bilibili import get_bilibili_vinfo
42from preview.youtube import get_youtube_vinfo
43from summarize.summarize import summarize
44from utils import bare_url, convert2html, count_subtitles, https_url, nowdt, rand_number, seconds_to_hms, strings_list
45from ytdlp.download import ytdlp_download
46
47
48async def summary_pods(client: Client):
49 """Summary podcast RSS feeds."""
50 pods = await get_feed_url_with_title() # {feed_url: title}
51 if not pods:
52 return
53 # pods = {"https://feed.xyzfm.space/q88qwmydeuw8": "声动早咖啡"} # for debug
54 for feed_url, feed_title in pods.items():
55 feed = await parse_feed(feed_url)
56 if not feed:
57 continue
58 processed_xml = await parse_feed(feed_saved_target(feed_url), raw_xml=True)
59 has_update = False
60 homepage = clean_feed_url(glom(feed, "feed.link", default=feed_url))
61 for entry in await get_new_entries(feed_title, feed, processed_xml):
62 logger.info(f"Updating podcast {feed_title}: {entry['title']}")
63 message = Message(id=rand_number(), chat=Chat(id=PODCAST.TID))
64 info = await download_enclosure(entry, cover_url=glom(feed, "feed.image.href", default=""))
65 if not Path(info["path"]).is_file():
66 logger.error(f"Failed download podcast {feed_title} -- {entry['title']}")
67 await send2tg(client, message, texts=f"Failed download podcast {feed_title} -- {entry['title']}", reply_msg_id=-1)
68 continue
69 try:
70 transcripts = await get_transcripts(info["asr_path"], feed_title, feed_url, entry)
71 if not transcripts:
72 continue
73 duration = await get_duration(info["asr_path"], entry)
74 duration = seconds_to_hms(duration)
75 dt = get_pubdate(entry)
76 pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
77 caption = f"🎧[{feed_title}]({homepage})\n📝[{entry['title']}]({entry['link']})\n🕒{pubdate}\n⏳{duration} #️⃣字数: {count_subtitles(transcripts)}"
78 desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
79 desc_html = desc if desc.startswith("<") else convert2html(desc)
80 enclosure_url = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
81 enclosure_mime = next((x["type"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "audio/mp4")
82 desc_html = f'<audio controls preload="metadata" style="width:100%;"><source src="{enclosure_url}" type="{enclosure_mime}">该浏览器不支持音频播放</audio>\n' + desc_html
83 prompt = f"该转录稿对应于播客栏目《{feed_title}》的一期节目,节目详情:\n标题: {entry['title']}\n日期: {pubdate}\n时长: {duration}\n节目简介: {desc}"
84 summary = await summarize(
85 sources=[{"type": "system_prompt", "text": prompt}, {"type": "transcripts", "text": transcripts}],
86 model=AI.PODCAST_SUMMARY_MODEL_ALIAS,
87 title=entry["title"],
88 author=feed_title,
89 url=entry["link"],
90 date=dt,
91 description={"emoji": "🎧", "name": "播客详情", "html": desc_html},
92 ttl="forever",
93 )
94 if telegraph_url := summary.get("telegraph_url"):
95 caption += f"\n[🤖AI导读]({telegraph_url})"
96 media = (
97 [{"audio": info["asr_path"], "title": entry["title"], "performer": feed_title, "thumb": info["thumb"]}]
98 if Path(info["path"]).suffix in [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
99 else [{"video": info["path"], "thumb": info["thumb"]}]
100 )
101
102 messages = await send2tg(client, message, texts=caption, media=media, reply_msg_id=-1)
103 processed_xml = await update_xml_desc(feed_url, processed_xml, entry, summary=summary.get("texts", ""), audio_path=info["asr_path"])
104 if isinstance(messages[0], Message):
105 await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"]})
106 has_update = True
107 Path(info["path"]).unlink(missing_ok=True)
108 Path(info["asr_path"]).unlink(missing_ok=True)
109 Path(info["thumb"]).unlink(missing_ok=True) if info["thumb"] else None
110 except Exception as e:
111 logger.error(f"Failed podcast {feed_title} -- {entry['title']}: {e}")
112 await send2tg(client, message, texts=f"Failed podcast {feed_title} -- {entry['title']}: {e}", reply_msg_id=-1)
113 continue
114 if has_update:
115 await save_xml(processed_xml, feed_saved_target(feed_url))
116 await gh_clean_assets(release_name=feed_title, gh_repo=PODCAST.GH_REPO, gh_token=PODCAST.GH_TOKEN, keep_latest=50)
117
118 # save opml
119 opml = {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
120 opml["opml"]["body"]["outline"] = [
121 {
122 "@text": feed_title,
123 "@type": "rss",
124 "@xmlUrl": feed_saved_target(feed_url),
125 "@title": feed_title,
126 }
127 for feed_url, feed_title in pods.items()
128 ]
129 await save_xml(opml, feed_saved_target("opml.xml"))
130 logger.success("Podcast has been updated.")
131
132
133async def get_feed_url_with_title() -> dict[str, str]:
134 """Get all podcast feed urls and titles.
135
136 Returns:
137 dict: {feed_url: title}
138 """
139 # get from Feed Urls
140 pods = {feed_url: await get_feed_title(feed_url) for feed_url in strings_list(PODCAST.FEED_URLS)}
141 # get from OPML
142 for opml in strings_list(PODCAST.OPML_URLS):
143 opml_data = await hx_req(opml, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PROXY.PODCAST)
144 data = {}
145 with contextlib.suppress(Exception):
146 data = xmltodict.parse(opml_data["text"])
147 for feed in glom(data, "opml.body.outline", default=[]):
148 if feed_url := feed.get("@xmlUrl"):
149 pods[feed_url] = feed.get("@title", await get_feed_title(feed_url))
150 # get from YouTube Channel
151 for yt_cid in strings_list(PODCAST.YOUTUBE_CHANNEL_IDS):
152 feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={yt_cid}"
153 pods[feed_url] = await get_feed_title(feed_url)
154 return pods
155
156
157async def get_new_entries(feed_title: str, feed: dict, processed: dict) -> list[dict]:
158 """Get new entries from feed.
159
160 Will skip entries when the following conditions are met:
161 1. the entry link is already processed
162 2. the entry is older than PODCAST.IGNORE_OLD_THAN_SECONDS
163 3. the guid is found on CF-R2
164
165 Args:
166 feed_title (str): feed title
167 feed (dict): feed parsed by feedparser
168 processed (dict): processed feed in raw xml format
169 """
170 try:
171 now = nowdt()
172 new_entries = []
173 processed_links = glom(processed, "rss.channel.item.*.link", default=[])
174 sorted_entries = sorted(feed["entries"], key=lambda x: glom(x, Coalesce("published_parsed", "updated"), default=now), reverse=True) # new to old
175 for entry in sorted_entries:
176 link = https_url(clean_feed_url(entry.get("link", "")))
177 if link in processed_links:
178 continue
179 entry["link"] = link
180 guid = bare_url(unquote_plus(link))
181 entry["db_key"] = f"Podcast/{feed_title}/{guid}"
182 entry["title"] = entry.get("title", "")
183 dt = get_pubdate(entry)
184 delta = now - dt
185 if delta.total_seconds() > PODCAST.IGNORE_OLD_THAN_SECONDS:
186 continue
187 if "youtube.com" in link:
188 vinfo = await get_youtube_vinfo(entry["yt_videoid"])
189 elif "bilibili.com" in link:
190 vinfo = await get_bilibili_vinfo(link)
191 else:
192 vinfo = {}
193 if vinfo.get("error_msg"):
194 logger.warning(f"Failed to get video info: {vinfo['error_msg']}")
195 continue
196 if not await get_cf_r2(entry["db_key"]):
197 new_entries.append(entry)
198 if new_entries:
199 logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
200 except Exception as e:
201 logger.error(f"Failed to get new entries: {e}")
202 new_entries = []
203 return new_entries[::-1] # old to new
204
205
206async def download_enclosure(entry: dict, cover_url: str = "") -> dict:
207 """Download enclosure of a single podcast entry.
208
209 If the enclosure link is not found in the entry, it will try to download the link via yt-dlp.
210
211 Returns:
212 dict: {
213 "path": Path, (this path will be send to Telegram)
214 "asr_path": Path, (prefer audio path)
215 "thumb": str,
216 "enclosure": str
217 }
218 """
219 enclosure_url = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
220 placeholder = {"path": "", "asr_path": "", "thumb": None, "enclosure": enclosure_url}
221 if enclosure_url:
222 try:
223 retry = 0
224 path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
225 while not Path(path).is_file():
226 retry += 1
227 path = await download_file(enclosure_url, stream=True, proxy=PROXY.PODCAST)
228 if retry > 3:
229 return placeholder
230 thumb_url = glom(entry, "image.href", default="") or cover_url
231 thumb = await download_file(thumb_url, proxy=PROXY.PODCAST)
232 thumb = thumb if Path(thumb).is_file() else None
233 except Exception as e:
234 logger.error(f"Failed to download podcast enclosure: {enclosure_url}\n{e}")
235 return placeholder
236 return {"path": path, "asr_path": path, "thumb": thumb, "enclosure": enclosure_url}
237
238 # download via yt-dlp
239 info = await ytdlp_download(entry["link"], ytdlp_download_video=True)
240 if info["video_path"].is_file() and info["audio_path"].is_file():
241 return {"path": info["video_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
242
243 if info["video_path"].is_file():
244 return {"path": info["video_path"], "asr_path": info["video_path"], "thumb": info["thumb"], "enclosure": ""}
245
246 if info["audio_path"].is_file():
247 return {"path": info["audio_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
248 logger.error(f"Failed download podcast via ytdlp: {entry['link']}")
249 return placeholder