Commit c447868
src/others/podcast.py
@@ -1,297 +0,0 @@
-#!/venv/bin/python
-# -*- coding: utf-8 -*-
-import contextlib
-import io
-import re
-import shutil
-from datetime import UTC, datetime
-from pathlib import Path
-from urllib.parse import quote_plus, unquote_plus, urlparse
-from zoneinfo import ZoneInfo
-
-import anyio
-import feedparser
-import xmltodict
-from glom import Coalesce, glom
-from loguru import logger
-from pyrogram.client import Client
-from pyrogram.types import Chat, Message
-from pyrogram.types.messages_and_media.message import Str
-
-from asr.voice_recognition import asr_file
-from config import DB, DOWNLOAD_DIR, GPT, PODCAST, READING_SPEED, TZ, cache
-from database.alist import upload_alist
-from database.r2 import get_cf_r2, set_cf_r2
-from llm.gpt import gpt_response
-from llm.utils import convert_html, convert_md, remove_consecutive_newlines
-from messages.sender import send2tg
-from networking import download_file, hx_req
-from publish import publish_telegraph
-from utils import bare_url, count_subtitles, https_url, nowdt, rand_number, rand_string, readable_time, strings_list
-
-HEADERS = {
- "User-Agent": "feedparser/6.0.11 +https://github.com/kurtmckee/feedparser/",
- "Accept": "application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
-}
-AUDIO_EXT = [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
-
-
-async def summary_pods(client: Client):
- """Summary podcast RSS feeds."""
- pods = await get_all_pods()
- if not pods:
- return
- for feed_url, feed_title in pods.items():
- data = await hx_req(feed_url, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
- if not data.get("text"):
- continue
- feed = feedparser.parse(data["text"]) # do not parse feed url, because it doesn't support timeout.
- feed_xml = load_xml(data["text"])
- save_feed_url = align_opml_url(feed_url)
- data = await hx_req(save_feed_url, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
- saved_xml = load_xml(data["text"]) if data.get("text") else feed_xml
- has_update = False
- pod_url = clean_pod_url(feed.feed.link) # type: ignore
- for entry in await get_new_entries(feed_title, feed, saved_xml):
- message = Message(id=rand_number(), chat=Chat(id=PODCAST.TID))
- enclosure = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
- if not enclosure:
- continue
- logger.info(f"Updating podcast {feed_title}: {entry['title']}")
- try:
- retry = 0
- path = await download_file(enclosure, stream=True)
- while not path:
- retry += 1
- path = await download_file(enclosure, stream=True)
- if retry > 3:
- logger.error(f"Failed download podcast {feed_title} -- {entry['title']}")
- await send2tg(client, message, texts=f"Failed download podcast {feed_title} -- {entry['title']}", reply_msg_id=-1)
- break
- if not Path(path).is_file():
- continue
- thumb_url = glom(entry, "image.href", default="") or glom(feed, "feed.image.href", default="")
- thumb = await download_file(thumb_url)
- thumb = thumb if Path(thumb).is_file() else None
- asr_path = Path(path).with_stem(rand_string())
- shutil.copy(path, asr_path) # make a backup for sending audio to TG
- raw_desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
- desc = convert_md(html=raw_desc)
- desc = remove_consecutive_newlines(desc, newline_level=2)
- struct_time = entry["published_parsed"]
- dt = datetime(*struct_time[:6], tzinfo=UTC).astimezone(ZoneInfo(TZ))
- pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
- audio_caption = f"🎧播客: [{feed_title}]({pod_url})\n📝标题: [{entry['title']}]({entry['link']})\n🕒日期: {pubdate}\n⏳时长: {readable_time(entry['itunes_duration'])}\n📖简介: {desc}"
- media = [{"audio": path, "title": entry["title"], "performer": feed_title, "thumb": thumb}] if Path(path).suffix in AUDIO_EXT else [{"video": path, "thumb": thumb}]
- prompt = f"请转录播客栏目《{feed_title}》的一期节目的音频。\n该期节目标题: {entry['title']}\n节目时长: {readable_time(entry['itunes_duration'])}\n节目简介: {desc}"
- engine = get_pod_asr_engine(feed_title, feed_url)
- asr_res = await asr_file(asr_path, prompt=prompt, engine=engine, client=client, message=message, silent=True)
- if asr_res.get("error") or len(asr_res.get("texts", "")) == 0:
- continue
- subtitles = asr_res.get("texts", "")
- subtitle_caption = f"🎧播客: [{feed_title}]({pod_url})\n📝标题: [{entry['title']}]({entry['link']})\n🕒日期: {pubdate}\n⏳时长: {readable_time(entry['itunes_duration'])}"
- subtitle_caption += f"\n#️⃣字数: {count_subtitles(subtitles)}\n⏳阅读: {readable_time(60 * count_subtitles(subtitles) / READING_SPEED)}"
- if telegraph_url := await publish_telegraph(title=entry["title"], html=convert_html(f"{audio_caption}\n{subtitles}"), author=feed_title, url=entry["link"]):
- subtitle_caption += f"\n⚡️[即时预览]({telegraph_url})"
- await send2tg(client, message, texts=remove_img(audio_caption), media=media, reply_msg_id=-1) # Telegram DO NOT allow img tag in messages
- with io.BytesIO(subtitles.encode("utf-8")) as f:
- txt_msg: Message = await client.send_document(message.chat.id, f, file_name=f"{entry['title']}.txt", caption=subtitle_caption) # type: ignore
-
- prompt = f"这是播客栏目《{feed_title}》的一期节目详情:\n节目标题: {entry['title']}\n节目播出日期: {pubdate}"
- prompt += f"\n节目时长: {readable_time(entry['itunes_duration'])}\n节目简介: {desc}"
- prompt += "\n请解读该播客内容, 只需关注内容本身, 不用概述播客的基本信息, 例如播客的标题, 日期, 时长等"
- # Construct a message to call GPT
- cache.delete(f"parse_msg-{txt_msg.chat.id}-{txt_msg.id}")
- ai_msg = Message(
- id=txt_msg.id,
- chat=txt_msg.chat,
- text=Str(f"/ai {remove_img(prompt)}"),
- reply_to_message=Message(id=rand_number(), chat=message.chat, text=Str(subtitles)),
- )
- gpt_res = await gpt_response(client, ai_msg, custom_model_id=GPT.PODCAST_SUMMARY_MODEL_ID, include_thoughts=False, append_grounding=False, show_progress=True)
- cache.delete(f"parse_msg-{txt_msg.chat.id}-{txt_msg.id}")
- feed_item = match_item(feed_xml, entry)
- update_item(saved_xml, feed_item, prefix_desc=gpt_res.get("texts", ""))
- await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"], "file": enclosure})
- has_update = True
- except Exception as e:
- logger.error(f"Failed podcast {feed_title} -- {entry['title']}: {e}")
- await send2tg(client, message, texts=f"Failed podcast {feed_title} -- {entry['title']}: {e}", reply_msg_id=-1)
- continue
- if has_update:
- await save_xml(saved_xml, feed_url)
-
- # save opml
- opml = load_xml("", template="opml")
- opml["opml"]["body"]["outline"] = [
- {
- "@text": feed_title,
- "@type": "rss",
- "@xmlUrl": align_opml_url(feed_url),
- "@title": feed_title,
- }
- for feed_url, feed_title in pods.items()
- ]
- await save_xml(opml, "opml.xml")
- logger.success("Podcast has been updated.")
-
-
-async def get_new_entries(feed_title: str, remote: dict, saved: dict) -> list[dict]:
- """Get new entries from feed."""
- try:
- now = nowdt()
- new_entries = []
- saved_enclosure_urls = glom(saved, "rss.channel.item.*.enclosure.@url", default=[])
- sorted_entries = sorted(remote["entries"], key=lambda x: x.get("published_parsed", x.get("updated", now)), reverse=True) # new to old
- for entry in sorted_entries:
- enclosure = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
- if enclosure in saved_enclosure_urls:
- continue
- entry["link"] = https_url(clean_pod_url(entry.get("link", "")))
- guid = bare_url(unquote_plus(entry["link"]))
- entry["db_key"] = f"Podcast/{feed_title}/{guid}"
- entry["title"] = entry.get("title", "")
- entry["itunes_duration"] = glom(entry, Coalesce("itunes_duration", "duration"), default="0")
- struct_time = entry["published_parsed"]
- dt = datetime(*struct_time[:6], tzinfo=UTC).astimezone(ZoneInfo(TZ))
- delta = now - dt
- if delta.total_seconds() > PODCAST.IGNORE_OLD_THAN_SECONDS:
- continue
- if not await get_cf_r2(entry["db_key"]):
- new_entries.append(entry)
- if new_entries:
- logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
- except Exception as e:
- logger.error(f"Failed to get new entries: {e}")
- new_entries = []
- return new_entries[::-1] # old to new
-
-
-async def get_all_pods() -> dict[str, str]:
- """Get all podcast feed urls and titles.
-
- Returns:
- dict: {feed_url: title}
- """
- pods = {x.strip(): urlparse(x.strip()).netloc for x in PODCAST.FEED_URLS.split(",") if x.strip()}
- for opml in [x.strip() for x in PODCAST.OPML_URLS.split(",") if x.strip()]:
- opml_data = await hx_req(opml, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
- data = xmltodict.parse(opml_data["text"])
- for feed in glom(data, "opml.body.outline", default=[]):
- if feed.get("@xmlUrl"):
- pods[feed["@xmlUrl"]] = feed.get("@title", "")
- return pods
-
-
-def get_pod_asr_engine(feed_title: str, feed_url: str) -> str:
- if feed_title in strings_list(PODCAST.ASR_FORCE_GEMINI_TITLES):
- return "gemini"
- if feed_title in strings_list(PODCAST.ASR_FORCE_GROQ_TITLES):
- return "groq"
- if feed_title in strings_list(PODCAST.ASR_FORCE_CLOUDFLARE_TITLES):
- return "cloudflare"
- if feed_title in strings_list(PODCAST.ASR_FORCE_WHISPER_TITLES):
- return "whisper"
- if feed_title in strings_list(PODCAST.ASR_FORCE_UNCENSORED_TITLES):
- return "uncensored"
-
- if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_GEMINI_DOMAINS):
- return "gemini"
- if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_GROQ_DOMAINS):
- return "groq"
- if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_CLOUDFLARE_DOMAINS):
- return "cloudflare"
- if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_WHISPER_DOMAINS):
- return "whisper"
- if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_UNCENSORED_DOMAINS):
- return "uncensored"
- return PODCAST.ASR_ENGINE
-
-
-def remove_img(markdown: str):
- """Removes all image tags from a markdown string."""
- image_pattern = r"!\[.*?\]\((.*?)\)" # Matches both with and without alt text
- return re.sub(image_pattern, "", markdown)
-
-
-def clean_pod_url(url: str) -> str:
- if not url:
- return ""
- return url.removesuffix("?utm_source=rss")
-
-
-def load_xml(data: str, template: str = "rss") -> dict:
- with contextlib.suppress(Exception):
- return xmltodict.parse(data)
-
- if template == "rss":
- logger.trace("use default rss template")
- return {
- "rss": {
- "@version": "2.0",
- "@xmlns:itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
- "@xmlns:atom": "http://www.w3.org/2005/Atom",
- "@xmlns:rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
- "@xmlns:podcast": "https://podcastindex.org/namespace/1.0",
- "@xmlns:content": "http://purl.org/rss/1.0/modules/content/",
- "channel": {"item": []},
- }
- }
-
- logger.trace("use default opml template")
- return {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
-
-
-async def save_xml(feed_xml: dict, feed_url: str):
- if glom(feed_xml, "rss.channel.item", default=[]):
- items = glom(feed_xml, "rss.channel.item", default=[])
- feed_xml["rss"]["channel"]["item"] = items[: PODCAST.KEEP_LATEST_ENTRIES]
- xml_str = xmltodict.unparse(feed_xml, pretty=True, full_document=False)
- if PODCAST.FS_ENGINE == "CF-R2":
- await set_cf_r2(f"Podcast/feeds/{bare_url(feed_url)}", data=xml_str, mime_type="application/xml")
- return
- if PODCAST.FS_ENGINE == "alist":
- save_path = Path(DOWNLOAD_DIR) / quote_plus(bare_url(feed_url))
- save_path = Path(save_path)
- save_path.parent.mkdir(parents=True, exist_ok=True)
-
- async with await anyio.open_file(save_path, "w") as f:
- await f.write(xml_str)
- await upload_alist(save_path)
- save_path.unlink(missing_ok=True)
-
-
-def align_opml_url(url: str) -> str:
- if PODCAST.FS_ENGINE == "CF-R2":
- return DB.CF_R2_PUBLIC_URL.rstrip("/") + f"/Podcast/feeds/{bare_url(url)}"
- if PODCAST.FS_ENGINE == "alist":
- return DB.ALIST_SERVER.removesuffix("/") + "/d/" + DB.ALIST_BASR_PATH.strip("/") + "/" + bare_url(url)
- return url
-
-
-def match_item(feed_xml: dict, entry: dict) -> dict:
- """Match feed item according to entry."""
- for item in glom(feed_xml, "rss.channel.item", default=[]):
- item_link = https_url(clean_pod_url(item.get("link", "")))
- if item_link == entry["link"]:
- return item
- return {}
-
-
-def update_item(feed_xml: dict, item: dict, prefix_desc: str):
- """Update description."""
- hit = False
- description = glom(item, Coalesce("description", "content:encoded"), default="") or ""
- description = convert_html(prefix_desc) + "<p>----------------------------------</p>" + description
- items = glom(feed_xml, "rss.channel.item", default=[])
- if not isinstance(items, list):
- feed_xml["rss"]["channel"]["item"] = [items]
- for x in feed_xml["rss"]["channel"]["item"]:
- x.pop("content:encoded", None)
- if x["link"] == item["link"]:
- x["description"] = description
- hit = True
- if not hit:
- item["description"] = description
- item.pop("content:encoded", None)
- feed_xml["rss"]["channel"]["item"].insert(0, item)
src/podcast/asr.py
@@ -0,0 +1,92 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import shutil
+from pathlib import Path
+from urllib.parse import urlparse
+
+from glom import Coalesce, glom
+
+from asr.utils import audio_duration
+from asr.voice_recognition import asr_file
+from config import PODCAST
+from networking import match_social_media_link
+from preview.bilibili import get_bilibili_vinfo
+from preview.youtube import get_youtube_vinfo
+from subtitles.base import fetch_subtitle
+from utils import rand_string, readable_time, strings_list
+
+
+async def get_transcripts(
+ audio_path: str | Path,
+ feed_title: str,
+ feed_url: str,
+ entry: dict,
+) -> str:
+ """Get podcast transcripts.
+
+ If the link of this entry has embedded subtitles (YouTube, Bilibili links), use it directly.
+ Otherwise, generate the transcript via ASR.
+ """
+ if urlparse(entry["link"]).netloc in ["www.youtube.com", "www.bilibili.com"]: # get subtitle from API first
+ res = await fetch_subtitle(entry["link"])
+ if res.get("subtitles"):
+ return res["subtitles"]
+
+ # generate transcript via ASR
+ # The audio file will be deleted after ASR is done.
+ # So we need to copy the file to another path before generating the transcript.
+ duration = await get_duration(audio_path, entry)
+ tmp_path = backup_audio(audio_path)
+ desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
+ prompt = f"请转录播客栏目《{feed_title}》的一期节目的音频。\n该期节目标题: {entry['title']}\n节目时长: {readable_time(duration)}\n节目简介: {desc}"
+ engine = get_asr_engine(feed_title, feed_url)
+ asr_res = await asr_file(tmp_path, prompt=prompt, engine=engine, silent=True)
+ Path(tmp_path).unlink(missing_ok=True)
+ return asr_res.get("texts", "")
+
+
+def get_asr_engine(feed_title: str, feed_url: str) -> str:
+ if feed_title in strings_list(PODCAST.ASR_FORCE_GEMINI_TITLES):
+ return "gemini"
+ if feed_title in strings_list(PODCAST.ASR_FORCE_GROQ_TITLES):
+ return "groq"
+ if feed_title in strings_list(PODCAST.ASR_FORCE_CLOUDFLARE_TITLES):
+ return "cloudflare"
+ if feed_title in strings_list(PODCAST.ASR_FORCE_WHISPER_TITLES):
+ return "whisper"
+ if feed_title in strings_list(PODCAST.ASR_FORCE_UNCENSORED_TITLES):
+ return "uncensored"
+
+ if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_GEMINI_DOMAINS):
+ return "gemini"
+ if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_GROQ_DOMAINS):
+ return "groq"
+ if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_CLOUDFLARE_DOMAINS):
+ return "cloudflare"
+ if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_WHISPER_DOMAINS):
+ return "whisper"
+ if urlparse(feed_url.strip()).netloc in strings_list(PODCAST.ASR_FORCE_UNCENSORED_DOMAINS):
+ return "uncensored"
+ return PODCAST.ASR_ENGINE
+
+
+async def get_duration(path: str | Path, entry: dict) -> float:
+ """Get duration of audio file."""
+ # get duration from video info
+ vinfo = {}
+ matched = await match_social_media_link(entry["link"])
+ if matched["platform"] == "youtube":
+ vinfo = await get_youtube_vinfo(matched["vid"])
+ elif matched["platform"] == "bilibili":
+ vinfo = await get_bilibili_vinfo(matched["bvid"])
+ if vinfo.get("duration"):
+ return vinfo["duration"]
+
+ # get duration from audio file
+ return audio_duration(path)
+
+
+def backup_audio(path: str | Path) -> str:
+ tmp_path = Path(path).with_stem(rand_string(12))
+ shutil.copy(path, tmp_path)
+ return tmp_path.as_posix()
src/podcast/main.py
@@ -0,0 +1,253 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+"""This module is used to download podcasts and use AI to summarize them.
+
+Supported podcasts input format:
+1. Podcast feed urls (PODCAST_FEED_URLS)
+2. OPML file urls (PODCAST_OPML_URLS)
+3. YouTube channel IDs (PODCAST_YOUTUBE_CHANNEL_IDS)
+
+For each feed, it will try to:
+1. download the enclosure file
+2. use ASR to transcribe it.
+3. use GPT to summarize it.
+4. upload the enclosure file with subtitles & summaries to Telegram
+5. add summaries to the feed items
+6. upload the feed to CF-R2 or Alist (configured by PODCAST_FS_ENGINE)
+
+If the enclosure file is not available (like youtube videos), it will try to use yt-dlp to download it.
+And do the same thing for the downloaded file.
+Besides, it will also upload the enclosure file to GitHub Releases
+"""
+
+import contextlib
+import io
+from pathlib import Path
+from urllib.parse import unquote_plus
+
+import xmltodict
+from glom import Coalesce, glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Chat, Message
+from pyrogram.types.messages_and_media.message import Str
+
+from config import GPT, PODCAST, READING_SPEED, cache
+from database.r2 import get_cf_r2, set_cf_r2
+from llm.gpt import gpt_response
+from llm.utils import convert_html, convert_md, remove_consecutive_newlines
+from messages.sender import send2tg
+from networking import download_file, hx_req
+from podcast.asr import backup_audio, get_duration, get_transcripts
+from podcast.utils import HEADERS, clean_feed_url, feed_saved_target, get_pubdate, remove_img_tag
+from podcast.xml import get_feed_title, parse_feed, save_xml, update_xml_desc
+from publish import publish_telegraph
+from utils import bare_url, count_subtitles, https_url, nowdt, rand_number, seconds_to_hms, strings_list
+from ytdlp.download import ytdlp_download
+
+
+async def summary_pods(client: Client):
+ """Summary podcast RSS feeds."""
+ pods = await get_feed_url_with_title()
+ if not pods:
+ return
+ for feed_url, feed_title in pods.items():
+ feed = await parse_feed(feed_url)
+ if not feed:
+ continue
+ processed_xml = await parse_feed(feed_saved_target(feed_url), raw_xml=True)
+ has_update = False
+ homepage = clean_feed_url(glom(feed, "feed.link", default=feed_url))
+ for entry in await get_new_entries(feed_title, feed, processed_xml):
+ logger.info(f"Updating podcast {feed_title}: {entry['title']}")
+ message = Message(id=rand_number(), chat=Chat(id=PODCAST.TID))
+ info = await download_enclosure(entry, cover_url=glom(feed, "feed.image.href", default=""))
+ if not info["path"]:
+ logger.error(f"Failed download podcast {feed_title} -- {entry['title']}")
+ await send2tg(client, message, texts=f"Failed download podcast {feed_title} -- {entry['title']}", reply_msg_id=-1)
+ continue
+ try:
+ transcripts = await get_transcripts(info["asr_path"], feed_title, feed_url, entry)
+ if not transcripts:
+ continue
+ duration = await get_duration(info["asr_path"], entry)
+ dt = get_pubdate(entry)
+ pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
+ base_caption = f"🎧播客: [{feed_title}]({homepage})\n📝标题: [{entry['title']}]({entry['link']})\n🕒日期: {pubdate}\n⏳时长: {seconds_to_hms(duration)}"
+ desc = convert_md(html=glom(entry, Coalesce("content.0.value", "summary"), default=""))
+ desc = remove_consecutive_newlines(desc, newline_level=2)
+ audio_caption = base_caption + f"\n📖简介: {desc}" if desc else base_caption
+ transcript_caption = base_caption + f"\n#️⃣字数: {count_subtitles(transcripts)}\n⏳阅读: {seconds_to_hms(60 * count_subtitles(transcripts) / READING_SPEED)}"
+ if telegraph_url := await publish_telegraph(title=entry["title"], html=convert_html(f"{audio_caption}\n{transcripts}"), author=feed_title, url=entry["link"]):
+ transcript_caption += f"\n⚡️[即时预览]({telegraph_url})"
+ media = (
+ [
+ {
+ "audio": backup_audio(info["asr_path"]),
+ "title": entry["title"],
+ "performer": feed_title,
+ "thumb": info["thumb"],
+ }
+ ]
+ if Path(info["path"]).suffix in [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
+ else [{"video": info["path"], "thumb": info["thumb"]}]
+ )
+ await send2tg(client, message, texts=remove_img_tag(audio_caption), media=media, reply_msg_id=-1) # Telegram DO NOT allow img tag in messages
+ with io.BytesIO(transcripts.encode("utf-8")) as f:
+ txt_msg: Message = await client.send_document(message.chat.id, f, file_name=f"{entry['title']}.txt", caption=transcript_caption) # type: ignore
+
+ prompt = f"这是播客栏目《{feed_title}》的一期节目详情:\n节目标题: {entry['title']}\n节目播出日期: {pubdate}"
+ prompt += f"\n节目时长: {seconds_to_hms(duration)}\n节目简介: {desc}"
+ prompt += "\n请解读该播客内容, 只需关注内容本身, 不用概述播客的基本信息, 例如播客的标题, 日期, 时长等"
+ # Construct a message to call GPT
+ cache.delete(f"parse_msg-{txt_msg.chat.id}-{txt_msg.id}")
+ ai_msg = Message(
+ id=message.id,
+ chat=message.chat,
+ text=Str(f"/ai {prompt}"),
+ reply_to_message=Message(id=rand_number(), chat=message.chat, text=Str(transcripts)),
+ )
+ gpt_res = await gpt_response(
+ client,
+ ai_msg,
+ custom_model_id=GPT.PODCAST_SUMMARY_MODEL_ID,
+ custom_model_name=GPT.PODCAST_SUMMARY_MODEL_NAME,
+ include_thoughts=False,
+ append_grounding=False,
+ silent=True,
+ )
+ if gpt_res.get("texts"):
+ await send2tg(client, txt_msg, texts=gpt_res["prefix"] + gpt_res["texts"])
+ processed_xml = await update_xml_desc(feed_url, processed_xml, entry, summary=gpt_res.get("texts", ""), audio_path=info["asr_path"])
+ await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"]})
+ has_update = True
+ except Exception as e:
+ logger.error(f"Failed podcast {feed_title} -- {entry['title']}: {e}")
+ await send2tg(client, message, texts=f"Failed podcast {feed_title} -- {entry['title']}: {e}", reply_msg_id=-1)
+ continue
+ if has_update:
+ await save_xml(processed_xml, feed_saved_target(feed_url))
+
+ # save opml
+ opml = {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
+ opml["opml"]["body"]["outline"] = [
+ {
+ "@text": feed_title,
+ "@type": "rss",
+ "@xmlUrl": feed_saved_target(feed_url),
+ "@title": feed_title,
+ }
+ for feed_url, feed_title in pods.items()
+ ]
+ await save_xml(opml, feed_saved_target("opml.xml"))
+ logger.success("Podcast has been updated.")
+
+
+async def get_feed_url_with_title() -> dict[str, str]:
+ """Get all podcast feed urls and titles.
+
+ Returns:
+ dict: {feed_url: title}
+ """
+ # get from Feed Urls
+ pods = {feed_url: await get_feed_title(feed_url) for feed_url in strings_list(PODCAST.FEED_URLS)}
+ # get from OPML
+ for opml in strings_list(PODCAST.OPML_URLS):
+ opml_data = await hx_req(opml, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
+ data = {}
+ with contextlib.suppress(Exception):
+ data = xmltodict.parse(opml_data["text"])
+ for feed in glom(data, "opml.body.outline", default=[]):
+ if feed_url := feed.get("@xmlUrl"):
+ pods[feed_url] = feed.get("@title", await get_feed_title(feed_url))
+ # get from YouTube Channel
+ for yt_cid in strings_list(PODCAST.YOUTUBE_CHANNEL_IDS):
+ feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={yt_cid}"
+ pods[feed_url] = await get_feed_title(feed_url)
+ return pods
+
+
+async def get_new_entries(feed_title: str, feed: dict, processed: dict) -> list[dict]:
+ """Get new entries from feed.
+
+ Will skip entries when the following conditions are met:
+ 1. the entry link is already processed
+ 2. the entry is older than PODCAST.IGNORE_OLD_THAN_SECONDS
+ 3. the guid is found on CF-R2
+
+ Args:
+ feed_title (str): feed title
+ feed (dict): feed parsed by feedparser
+ processed (dict): processed feed in raw xml format
+ """
+ try:
+ now = nowdt()
+ new_entries = []
+ processed_links = glom(processed, "rss.channel.item.*.link", default=[])
+ sorted_entries = sorted(feed["entries"], key=lambda x: glom(x, Coalesce("published_parsed", "updated"), default=now), reverse=True) # new to old
+ for entry in sorted_entries:
+ link = https_url(clean_feed_url(entry.get("link", "")))
+ if link in processed_links:
+ continue
+ entry["link"] = link
+ guid = bare_url(unquote_plus(link))
+ entry["db_key"] = f"Podcast/{feed_title}/{guid}"
+ entry["title"] = entry.get("title", "")
+ dt = get_pubdate(entry)
+ delta = now - dt
+ if delta.total_seconds() > PODCAST.IGNORE_OLD_THAN_SECONDS:
+ continue
+ if not await get_cf_r2(entry["db_key"]):
+ new_entries.append(entry)
+ if new_entries:
+ logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
+ except Exception as e:
+ logger.error(f"Failed to get new entries: {e}")
+ new_entries = []
+ return new_entries[::-1] # old to new
+
+
+async def download_enclosure(entry: dict, cover_url: str = "") -> dict:
+ """Download enclosure of a single podcast entry.
+
+ If the enclosure link is not found in the entry, it will try to download the link via yt-dlp.
+
+ Returns:
+ dict: {
+ "path": Path, (this path will be send to Telegram)
+ "asr_path": Path, (prefer audio path)
+ "thumb": str,
+ "enclosure": str
+ }
+ """
+ enclosure_url = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
+ placeholder = {"path": "", "asr_path": "", "thumb": None, "enclosure": enclosure_url}
+ if enclosure_url:
+ try:
+ retry = 0
+ path = await download_file(enclosure_url, stream=True)
+ while not Path(path).is_file():
+ retry += 1
+ path = await download_file(enclosure_url, stream=True)
+ if retry > 3:
+ return placeholder
+ thumb_url = glom(entry, "image.href", default="") or cover_url
+ thumb = await download_file(thumb_url)
+ thumb = thumb if Path(thumb).is_file() else None
+ except Exception as e:
+ logger.error(f"Failed to download podcast enclosure: {enclosure_url}\n{e}")
+ return placeholder
+ return {"path": path, "asr_path": path, "thumb": thumb, "enclosure": enclosure_url}
+
+ # download via yt-dlp
+ info = await ytdlp_download(entry["link"], ytdlp_download_video=True)
+ if info["video_path"].is_file() and info["audio_path"].is_file():
+ return {"path": info["video_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
+
+ if info["video_path"].is_file():
+ return {"path": info["video_path"], "asr_path": info["video_path"], "thumb": info["thumb"], "enclosure": ""}
+
+ if info["audio_path"].is_file():
+ return {"path": info["audio_path"], "asr_path": info["audio_path"], "thumb": info["thumb"], "enclosure": ""}
+ logger.error(f"Failed download podcast via ytdlp: {entry['link']}")
+ return placeholder
src/podcast/utils.py
@@ -0,0 +1,48 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import base64
+import re
+import string
+from datetime import UTC, datetime
+from zoneinfo import ZoneInfo
+
+from glom import Coalesce, glom
+from loguru import logger
+
+from config import DB, PODCAST, TZ
+from utils import bare_url, nowdt
+
+HEADERS = {
+ "User-Agent": "feedparser/6.0.11 +https://github.com/kurtmckee/feedparser/",
+ "Accept": "application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
+}
+
+
+def remove_img_tag(markdown: str):
+ """Removes all image tags from a markdown string."""
+ image_pattern = r"!\[.*?\]\((.*?)\)" # Matches both with and without alt text
+ return re.sub(image_pattern, "", markdown)
+
+
+def clean_feed_url(url: str) -> str:
+ if not url:
+ return ""
+ return url.removesuffix("?utm_source=rss")
+
+
+def feed_saved_target(feed_url: str) -> str:
+ # encode url if needed
+ key = bare_url(feed_url)
+ if any(c not in string.ascii_letters + string.digits + "-._=" for c in key):
+ key = base64.urlsafe_b64encode(feed_url.encode()).decode().rstrip("=")[-60:] + ".xml"
+ if PODCAST.FS_ENGINE == "CF-R2":
+ return DB.CF_R2_PUBLIC_URL.rstrip("/") + f"/Podcast/feeds/{key}"
+ if PODCAST.FS_ENGINE == "alist":
+ return DB.ALIST_SERVER.rstrip("/") + "/d/" + DB.ALIST_BASR_PATH.strip("/") + "/" + key
+ logger.error("Unknown PODCAST_FS_ENGINE: " + PODCAST.FS_ENGINE)
+ return feed_url
+
+
+def get_pubdate(entry: dict) -> datetime:
+ struct_time = glom(entry, Coalesce("published_parsed", "updated_parsed", "feed.published_parsed", "feed.updated_parsed"), default=nowdt().timetuple())
+ return datetime(*struct_time[:6], tzinfo=UTC).astimezone(ZoneInfo(TZ))
src/podcast/xml.py
@@ -0,0 +1,198 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import base64
+import contextlib
+import uuid
+from pathlib import Path
+from urllib.parse import urlparse
+
+import anyio
+import feedparser
+import xmltodict
+from glom import Coalesce, glom
+
+from asr.utils import audio_duration
+from config import DB, DOWNLOAD_DIR, PODCAST, cache
+from database.alist import upload_alist
+from database.github import gh_upload_asset
+from database.r2 import set_cf_r2
+from llm.utils import convert_html
+from networking import hx_req
+from podcast.utils import HEADERS, clean_feed_url, get_pubdate
+from preview.youtube import get_youtube_channel_thumb
+from utils import bare_url, https_url, nowdt
+
+
+@cache.memoize(ttl=600)
+async def parse_feed(feed_url: str, *, raw_xml: bool = False) -> dict:
+ """Get feed content by url.
+
+ DO NOT use feedparser.parse(feed_url) because it doesn't support timeout.
+ """
+ data = await hx_req(feed_url, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
+ with contextlib.suppress(Exception):
+ if raw_xml:
+ return xmltodict.parse(data["text"])
+ feed = feedparser.parse(data["text"])
+ return feed if isinstance(feed, dict) else {}
+ return {}
+
+
+async def get_feed_title(feed_url: str) -> str:
+ """Get feed title by url."""
+ feed = await parse_feed(feed_url)
+ if title := glom(feed, Coalesce("feed.title", "feed.title_detail.value", "feed.itunes_title"), default=""):
+ return title
+ return urlparse(feed_url).netloc
+
+
+async def gen_pod_header(feed_url: str) -> dict:
+ """Generate podcast header for RSS feed."""
+ now = nowdt()
+ feed = await parse_feed(feed_url)
+ pub_date = get_pubdate(feed)
+ return {
+ "rss": {
+ "@version": "2.0",
+ "@xmlns:itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
+ "@xmlns:atom": "http://www.w3.org/2005/Atom",
+ "@xmlns:rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
+ "@xmlns:podcast": "https://podcastindex.org/namespace/1.0",
+ "@xmlns:content": "http://purl.org/rss/1.0/modules/content/",
+ "channel": {
+ # Required tags
+ "atom:link": {
+ "@href": feed_url,
+ "@rel": "self",
+ "@type": "application/rss+xml",
+ },
+ "title": glom(feed, "feed.title", default=""),
+ "description": glom(feed, Coalesce("feed.summary", "feed.subtitle"), default=""),
+ "itunes:image": {"@href": glom(feed, "feed.image.href", default=await get_cover(feed_url))},
+ "language": "en-us",
+ "itunes:category": {"@text": "TV & Film"},
+ "itunes:explicit": "no",
+ # Recommended tags
+ "podcast:locked": "yes",
+ "podcast:guid": gen_uuid(feed_url),
+ "itunes:author": glom(feed, Coalesce("feed.author", "feed.title"), default=""),
+ "link": feed_url,
+ # Situational tags
+ "itunes:title": glom(feed, "feed.title", default=""),
+ "itunes:type": "Episodic",
+ "itunes:block": "yes",
+ # Common tags for rss
+ "category": "TV & Film",
+ "generator": "BennyBot",
+ "lastBuildDate": f"{now:%a, %d %b %Y %H:%M:%S %z}",
+ "pubDate": f"{pub_date:%a, %d %b %Y %H:%M:%S %z}",
+ "image": {
+ "url": glom(feed, "feed.image.href", default=await get_cover(feed_url)),
+ "title": glom(feed, "feed.title", default=""),
+ "link": feed_url,
+ },
+ "item": [],
+ },
+ }
+ }
+
+
+def gen_opml_header():
+ """Generate opml header for OPML feed."""
+ return {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
+
+
+async def update_xml_desc(feed_url: str, processed_xml: dict, entry: dict, summary: str, audio_path: str | Path) -> dict:
+ """Add AI summary to item description.
+
+ Args:
+ feed_url (str): original feed url
+ processed_xml (dict): processed feed xml
+ entry (dict): feed entry parsed by feedparser
+ summary (str): AI summary
+ """
+ original_desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
+ description = convert_html(summary) + "<p>----------------------------------</p>" + original_desc
+ # try to find the item in feed_xml
+ feed_xml = await parse_feed(feed_url, raw_xml=True)
+ new_item = entry
+ for item in glom(feed_xml, "rss.channel.item", default=[]):
+ item_link = https_url(clean_feed_url(item.get("link", "")))
+ if item_link == entry["link"]:
+ new_item = item # Found!
+ break
+ new_item.pop("content:encoded", None) # redundant
+ new_item["description"] = description
+ if not glom(new_item, "enclosure.@url", default=""): # This is a video rss feed. upload the audio to github
+ tag_name = base64.urlsafe_b64encode(feed_url.encode()).decode().rstrip("=")[-64:]
+ feed_title = await get_feed_title(feed_url)
+ enclosure_url = await gh_upload_asset(audio_path, tag_name=tag_name, release_name=feed_title, gh_repo=PODCAST.GH_REPO, gh_token=PODCAST.GH_TOKEN)
+ mime_type = {".mp3": "audio/mpeg", ".m4a": "audio/x-m4a", ".flac": "audio/flac"}.get(Path(audio_path).suffix, "audio/mpeg")
+ new_item = {
+ # Required tags
+ "title": entry["title"],
+ "enclosure": {
+ "@url": enclosure_url,
+ "@length": Path(audio_path).stat().st_size,
+ "@type": mime_type,
+ },
+ "guid": bare_url(entry["link"]),
+ # Recommended tags
+ "pubDate": get_pubdate(entry).strftime("%a, %d %b %Y %H:%M:%S %z"),
+ "description": description,
+ "itunes:duration": int(audio_duration(audio_path)),
+ "link": entry["link"],
+ "itunes:explicit": "false",
+ }
+ if not processed_xml:
+ processed_xml = await gen_pod_header(feed_url)
+ items = processed_xml["rss"]["channel"]["item"]
+ if not isinstance(items, list): # only one item, will be converted to list
+ items = [items]
+ items.insert(0, new_item)
+ processed_xml["rss"]["channel"]["item"] = items
+ return processed_xml
+
+
+def gen_uuid(url: str):
+ """Generate podcast UUID from URL.
+
+ Docs: https://github.com/Podcastindex-org/podcast-namespace/blob/main/docs/1.0.md#guid
+ The value is a UUIDv5, and is generated from the RSS feed url,
+ with the protocol scheme and trailing slashes stripped off,
+ combined with a unique "podcast" namespace which has a UUID of ead4c236-bf58-58c6-a2c6-a6b28d128cb6
+
+ Args:
+ url (str): feed url
+ """
+ url = url.strip().strip("/").removeprefix("http://").removeprefix("https://")
+ pod_uuid = uuid.uuid5(uuid.UUID("ead4c236-bf58-58c6-a2c6-a6b28d128cb6"), url)
+ return str(pod_uuid)
+
+
+async def get_cover(feed_url: str) -> str:
+ """Get podcast cover from feed url."""
+ if feed_url.startswith("https://www.youtube.com/feeds/videos.xml?channel_id="):
+ channel_id = feed_url.removeprefix("https://www.youtube.com/feeds/videos.xml?channel_id=")
+ return await get_youtube_channel_thumb(channel_id)
+ feed = await parse_feed(feed_url)
+ return glom(feed, "feed.image.href", default="https://upload.wikimedia.org/wikipedia/commons/c/c8/Podcast_iOS.png")
+
+
+async def save_xml(feed_xml: dict, save_url: str):
+ if glom(feed_xml, "rss.channel.item", default=[]):
+ items = glom(feed_xml, "rss.channel.item", default=[])
+ feed_xml["rss"]["channel"]["item"] = items[: PODCAST.KEEP_LATEST_ENTRIES]
+ xml_str = xmltodict.unparse(feed_xml, pretty=True, full_document=False)
+ if PODCAST.FS_ENGINE == "CF-R2":
+ r2_key = save_url.removeprefix(DB.CF_R2_PUBLIC_URL).lstrip("/")
+ await set_cf_r2(r2_key, data=xml_str, mime_type="application/xml")
+ return
+ if PODCAST.FS_ENGINE == "alist":
+ save_path = Path(DOWNLOAD_DIR) / Path(save_url).name
+ save_path = Path(save_path)
+ save_path.parent.mkdir(parents=True, exist_ok=True)
+ async with await anyio.open_file(save_path, "w") as f:
+ await f.write(xml_str)
+ await upload_alist(save_path)
+ save_path.unlink(missing_ok=True)
src/config.py
@@ -235,7 +235,7 @@ class DB:
TURSO_API_TOKEN = os.getenv("TURSO_API_TOKEN", "")
TURSO_GROUP_TOKEN = os.getenv("TURSO_GROUP_TOKEN", "")
GH_USER = os.getenv("DB_GH_USER", "")
- GH_REPO = os.getenv("DB_GH_REPO", "bennybot")
+ GH_REPO = os.getenv("DB_GH_REPO", "bennybot") # just repo name, not `owner/repo`
GH_TOKEN = os.getenv("DB_GH_TOKEN", "")
@@ -303,6 +303,7 @@ class PODCAST:
PROXY = os.getenv("PODCAST_PROXY", None)
FEED_URLS = os.getenv("PODCAST_FEED_URLS", "") # comma separated feed urls
OPML_URLS = os.getenv("PODCAST_OPML_URLS", "") # comma separated opml urls
+ YOUTUBE_CHANNEL_IDS = os.getenv("PODCAST_YOUTUBE_CHANNEL_IDS", "") # comma separated youtube channel ids
TID = int(os.getenv("PODCAST_TID", "0")) # send to this chat id
FS_ENGINE = os.getenv("PODCAST_FS_ENGINE", "CF-R2") # file storage engine for hosting podcast feeds
ASR_ENGINE = os.getenv("PODCAST_ASR_ENGINE", "auto") # default ASR engine
@@ -319,6 +320,8 @@ class PODCAST:
ASR_FORCE_WHISPER_DOMAINS = os.getenv("PODCAST_ASR_FORCE_WHISPER_DOMAINS", "")
ASR_FORCE_UNCENSORED_TITLES = os.getenv("PODCAST_ASR_FORCE_UNCENSORED_TITLES", "")
ASR_FORCE_UNCENSORED_DOMAINS = os.getenv("PODCAST_ASR_FORCE_UNCENSORED_DOMAINS", "anchor.fm,feeds.acast.com")
+ GH_REPO = os.getenv("PODCAST_GH_REPO", "podcast")
+ GH_TOKEN = os.getenv("PODCAST_GH_TOKEN", "")
class FAVORITE:
@@ -417,6 +420,7 @@ class GPT:
SUMMARY_WHITELIST_CUSTOM_CHATS = os.getenv("GPT_SUMMARY_WHITELIST_CUSTOM_CHATS", "")
CHAT_SUMMARY_MODEL_ID = os.getenv("CHAT_SUMMARY_MODEL_ID", "") # Specify the model id for `/summary` command (If not set, use the default model)
PODCAST_SUMMARY_MODEL_ID = os.getenv("PODCAST_SUMMARY_MODEL_ID", "") # for generating podcast summary (If not set, use the default AI model)
+ PODCAST_SUMMARY_MODEL_NAME = os.getenv("PODCAST_SUMMARY_MODEL_NAME", "")
SUBTITLE_SUMMARY_MODEL_ID = os.getenv("SUBTITLE_SUMMARY_MODEL_ID", "") # for generating podcast summary (If not set, use the default AI model)
# For tool_call. Some models doesn't support tool call, so we use this model to do the tool_call first.
# Then construct the new questions for the original model.
src/main.py
@@ -27,8 +27,8 @@ from history.sync import backup_chat_history, sync_chat_history
from llm.summary import daily_summary
from llm.utils import clean_gemini_files
from messages.parser import parse_msg
-from others.podcast import summary_pods
from permission import check_permission
+from podcast.main import summary_pods
from price.entrypoint import match_symbol_category
from utils import cleanup_old_files, to_int