Commit fbb1e2d

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-05-24 10:01:02
feat(podcast): support summary podcast feeds
1 parent 5c00853
src/llm/utils.py
@@ -5,6 +5,7 @@ import re
 import tempfile
 from pathlib import Path
 
+import markdown
 import tiktoken
 from loguru import logger
 from markitdown import MarkItDown
@@ -228,6 +229,17 @@ def convert_md(path: str | Path | None = None, html: str | None = None) -> str:
     return ""
 
 
+def convert_html(texts: str = "", path: str | Path | None = None) -> str:
+    """Convert to markdown format."""
+    if path is not None:
+        path = Path(path).expanduser().resolve()
+        if not path.is_file():
+            return ""
+        texts = path.read_text()
+    texts = markdown.markdown(texts)
+    return texts.replace("\n", "<br>")
+
+
 def split_reasoning(text: str) -> tuple[str, str]:
     """Split reasoning from text.
 
src/others/podcast.py
@@ -0,0 +1,268 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import contextlib
+import io
+import re
+import shutil
+from datetime import UTC, datetime
+from pathlib import Path
+from urllib.parse import quote_plus, unquote_plus, urlparse
+from zoneinfo import ZoneInfo
+
+import anyio
+import feedparser
+import xmltodict
+from glom import Coalesce, glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Chat, Message
+from pyrogram.types.messages_and_media.message import Str
+
+from asr.voice_recognition import asr_file
+from config import DB, DOWNLOAD_DIR, PODCAST, PREFIX, READING_SPEED, TZ, cache
+from database import get_cf_r2, set_cf_r2, upload_alist
+from llm.gpt import gpt_response
+from llm.utils import convert_html, convert_md, remove_consecutive_newlines
+from messages.sender import send2tg
+from networking import download_file, hx_req
+from publish import publish_telegraph
+from utils import bare_url, count_subtitles, https_url, nowdt, rand_number, rand_string
+
+HEADERS = {
+    "User-Agent": "feedparser/6.0.11 +https://github.com/kurtmckee/feedparser/",
+    "Accept": "application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
+}
+AUDIO_EXT = [".3gp", ".aac", ".amr", ".avi", ".flac", ".flv", ".m4a", ".mkv", ".mov", ".mp3", ".mp4", ".mpeg", ".oga", ".ogg", ".opus", ".wav", ".wma", ".wmv"]
+
+
+@cache.memoize(ttl=PODCAST.UPDATE_INTERVAL)
+async def summary_pods(client: Client):
+    """Summary podcast RSS feeds."""
+    pods = await get_all_pods()
+    if not pods:
+        return
+    for feed_url, feed_title in pods.items():
+        data = await hx_req(feed_url, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
+        if not data.get("text"):
+            continue
+        feed = feedparser.parse(data["text"])  # do not parse feed url, because it doesn't support timeout.
+        feed_xml = load_xml(data["text"])
+        save_feed_url = align_opml_url(feed_url)
+        data = await hx_req(save_feed_url, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
+        save_feed_xml = load_xml(data["text"]) if data.get("text") else feed_xml
+        has_update = False
+        pod_url = clean_pod_url(feed.feed.link)  # type: ignore
+        for entry in await get_new_entries(feed_title, feed):
+            message = Message(id=rand_number(), chat=Chat(id=PODCAST.TID))
+            enclosure = next((x["href"] for x in entry["links"] if x.get("rel", "") == "enclosure"), "")
+            if not enclosure:
+                continue
+            logger.info(f"Updating podcast {feed_title}: {entry['title']}")
+            try:
+                retry = 0
+                path = await download_file(enclosure)
+                while not path:
+                    retry += 1
+                    path = await download_file(enclosure)
+                    if retry > 3:
+                        logger.error(f"Failed download podcast {feed_title} -- {entry['title']}")
+                        await send2tg(client, message, texts=f"Failed download podcast {feed_title} -- {entry['title']}", reply_msg_id=-1)
+                        return
+                thumb_url = glom(entry, "image.href", default="") or glom(feed, "feed.image.href", default="")
+                thumb = await download_file(thumb_url)
+                thumb = thumb if Path(thumb).is_file() else None
+                asr_path = Path(path).with_stem(rand_string())
+                shutil.copy(path, asr_path)  # make a backup for sending audio to TG
+                raw_desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
+                desc = convert_md(html=raw_desc)
+                desc = remove_consecutive_newlines(desc, newline_level=2)
+                struct_time = entry["published_parsed"]
+                dt = datetime(*struct_time[:6], tzinfo=UTC).astimezone(ZoneInfo(TZ))
+                pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
+                audio_caption = f"🎧播客: [{feed_title}]({pod_url})\n📝标题: [{entry['title']}]({entry['link']})\n🕒日期: {pubdate}\n⏳时长: {entry['itunes_duration']}\n📖简介: {desc}"
+                media = [{"video": path, "thumb": thumb}] if Path(path).suffix in AUDIO_EXT else [{"audio": path, "title": entry["title"], "performer": feed_title, "thumb": thumb}]
+                prompt = f"请转录播客栏目《{feed_title}》的一期节目的音频。\n该期节目标题: {entry['title']}\n节目时长: {entry['itunes_duration']}\n节目简介: {desc}"
+                engine = get_pod_asr_engine(feed_title, feed_url)
+                asr_res = await asr_file(asr_path, prompt=prompt, engine=engine, client=client, message=message, silent=True)
+                if asr_res.get("error") or len(asr_res.get("texts", "")) == 0:
+                    return
+                subtitles = asr_res.get("texts", "")
+                subtitle_caption = f"🎧播客名称: [{feed_title}]({pod_url})\n📝节目标题: [{entry['title']}]({entry['link']})\n🕒发布日期: {pubdate}\n"
+                subtitle_caption += f"⏳节目时长: {entry['itunes_duration']}\n#️⃣文本字数: {count_subtitles(subtitles)}\n⏳阅读时长: {count_subtitles(subtitles) / READING_SPEED:.1f}分钟"
+                if telegraph_url := await publish_telegraph(title=entry["title"], html=convert_html(audio_caption + subtitles), author=feed_title, url=entry["link"]):
+                    subtitle_caption += f"\n⚡️[即时预览]({telegraph_url})"
+                await send2tg(client, message, texts=remove_img(audio_caption), media=media, reply_msg_id=-1)  # Telegram DO NOT allow img tag in messages
+                with io.BytesIO(subtitles.encode("utf-8")) as f:
+                    await client.send_document(message.chat.id, f, file_name=f"{entry['title']}.txt", caption=subtitle_caption)
+
+                prompt = f"这是播客栏目《{feed_title}》的一期节目详情:\n节目标题: {entry['title']}\n节目播出日期: {pubdate}"
+                prompt += f"\n节目时长: {entry['itunes_duration']}\n节目简介: {desc}"
+                prompt += "\n请解读该播客内容, 只需关注内容本身, 不用概述播客的基本信息, 例如播客的标题, 日期, 时长等"
+                # Construct a message to call GPT
+                ai_msg = Message(
+                    id=rand_number(),
+                    chat=message.chat,
+                    text=Str(f"{PREFIX.GPT} {remove_img(prompt)}"),
+                    reply_to_message=Message(id=rand_number(), chat=message.chat, text=Str(subtitles)),
+                )
+                gpt_res = await gpt_response(client, ai_msg, include_thoughts=False, append_grounding=False)
+                feed_item = match_item(feed_xml, entry)
+                update_item(save_feed_xml, feed_item, prefix_desc=gpt_res["texts"])
+                await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"], "file": enclosure})
+                has_update = True
+            except Exception as e:
+                logger.error(f"Failed podcast {feed_title} -- {entry['title']}: {e}")
+                await send2tg(client, message, texts=f"Failed podcast {feed_title} -- {entry['title']}: {e}", reply_msg_id=-1)
+                return
+        if has_update:
+            await save_xml(save_feed_xml, feed_url)
+
+    # save opml
+    opml = load_xml("", template="opml")
+    opml["opml"]["body"]["outline"] = [
+        {
+            "@text": feed_title,
+            "@type": "rss",
+            "@xmlUrl": align_opml_url(feed_url),
+            "@title": feed_title,
+        }
+        for feed_url, feed_title in pods.items()
+    ]
+    await save_xml(opml, "opml.xml")
+    logger.success("Podcast has been updated.")
+
+
+async def get_new_entries(feed_title: str, remote: dict) -> list[dict]:
+    """Get new entries from feed."""
+    try:
+        now = nowdt()
+        new_entries = []
+        sorted_entries = sorted(remote["entries"], key=lambda x: x.get("published_parsed", x.get("updated", now)), reverse=True)  # new to old
+        for entry in sorted_entries:
+            entry["link"] = https_url(clean_pod_url(entry.get("link", "")))
+            guid = bare_url(unquote_plus(entry["link"]))
+            entry["db_key"] = f"Podcast/{feed_title}/{guid}"
+            entry["title"] = entry.get("title", "")
+            struct_time = entry["published_parsed"]
+            dt = datetime(*struct_time[:6], tzinfo=UTC).astimezone(ZoneInfo(TZ))
+            delta = now - dt
+            if delta.total_seconds() > PODCAST.IGNORE_OLD_THAN_SECONDS:
+                continue
+            if not await get_cf_r2(entry["db_key"]):
+                new_entries.append(entry)
+        if new_entries:
+            logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
+    except Exception as e:
+        logger.error(f"Failed to get new entries: {e}")
+        new_entries = []
+    return new_entries[::-1]  # old to new
+
+
+async def get_all_pods() -> dict[str, str]:
+    """Get all podcast feed urls and titles.
+
+    Returns:
+        dict: {feed_url: title}
+    """
+    pods = {x.strip(): urlparse(x.strip()).netloc for x in PODCAST.FEED_URLS.split(",") if x.strip()}
+    for opml in [x.strip() for x in PODCAST.OPML_URLS.split(",") if x.strip()]:
+        opml_data = await hx_req(opml, rformat="text", headers=HEADERS, timeout=10, silent=True, proxy=PODCAST.PROXY)
+        data = xmltodict.parse(opml_data["text"])
+        for feed in glom(data, "opml.body.outline", default=[]):
+            if feed.get("@xmlUrl"):
+                pods[feed["@xmlUrl"]] = feed.get("@title", "")
+    return pods
+
+
+def get_pod_asr_engine(feed_title: str, feed_url: str) -> str:
+    if feed_title in [x.strip() for x in PODCAST.ASR_FORCE_GEMINI_TITLES.split(",") if x.strip()]:
+        return "gemini"
+    if urlparse(feed_url.strip()).netloc in [x.strip() for x in PODCAST.ASR_FORCE_GEMINI_DOMAINS.split(",") if x.strip()]:
+        return "gemini"
+    return PODCAST.ASR_ENGINE
+
+
+def remove_img(markdown: str):
+    """Removes all image tags from a markdown string."""
+    image_pattern = r"!\[.*?\]\((.*?)\)"  # Matches both with and without alt text
+    return re.sub(image_pattern, "", markdown)
+
+
+def clean_pod_url(url: str) -> str:
+    if not url:
+        return ""
+    return url.removesuffix("?utm_source=rss")
+
+
+def load_xml(data: str, template: str = "rss") -> dict:
+    with contextlib.suppress(Exception):
+        return xmltodict.parse(data)
+
+    if template == "rss":
+        logger.trace("use default rss template")
+        return {
+            "rss": {
+                "@version": "2.0",
+                "@xmlns:itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
+                "@xmlns:atom": "http://www.w3.org/2005/Atom",
+                "@xmlns:rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
+                "@xmlns:podcast": "https://podcastindex.org/namespace/1.0",
+                "@xmlns:content": "http://purl.org/rss/1.0/modules/content/",
+                "channel": {},
+            }
+        }
+
+    logger.trace("use default opml template")
+    return {"opml": {"@version": "1.0", "head": {"title": "Podcast"}, "body": {"outline": []}}}
+
+
+async def save_xml(feed_xml: dict, feed_url: str):
+    if glom(feed_xml, "rss.channel.item", default=[]):
+        items = glom(feed_xml, "rss.channel.item", default=[])
+        feed_xml["rss"]["channel"]["item"] = items[: PODCAST.KEEP_LATEST_ENTRIES]
+    xml_str = xmltodict.unparse(feed_xml, pretty=True, full_document=False)
+    if PODCAST.FS_ENGINE == "CF-R2":
+        await set_cf_r2(f"Podcast/feeds/{bare_url(feed_url)}", data=xml_str, mime_type="application/xml")
+        return
+    if PODCAST.FS_ENGINE == "alist":
+        save_path = Path(DOWNLOAD_DIR) / quote_plus(bare_url(feed_url))
+        save_path = Path(save_path)
+        save_path.parent.mkdir(parents=True, exist_ok=True)
+
+        async with await anyio.open_file(save_path, "w") as f:
+            await f.write(xml_str)
+        await upload_alist(save_path)
+        save_path.unlink(missing_ok=True)
+
+
+def align_opml_url(url: str) -> str:
+    if PODCAST.FS_ENGINE == "CF-R2":
+        return DB.CF_R2_PUBLIC_URL.rstrip("/") + f"/Podcast/feeds/{bare_url(url)}"
+    if PODCAST.FS_ENGINE == "alist":
+        return DB.ALIST_SERVER.removesuffix("/") + "/d/" + DB.ALIST_BASR_PATH.strip("/") + "/" + bare_url(url)
+    return url
+
+
+def match_item(feed_xml: dict, entry: dict) -> dict:
+    """Match feed item according to entry."""
+    for item in glom(feed_xml, "rss.channel.item", default=[]):
+        item_link = https_url(clean_pod_url(item.get("link", "")))
+        if item_link == entry["link"]:
+            return item
+    return {}
+
+
+def update_item(feed_xml: dict, item: dict, prefix_desc: str):
+    """Update description."""
+    hit = False
+    description = glom(item, Coalesce("description", "content:encoded"), default="") or ""
+    description = convert_html(prefix_desc) + "<p>----------------------------------</p>" + description
+    for x in glom(feed_xml, "rss.channel.item", default=[]):
+        x.pop("content:encoded", None)
+        if x["link"] == item["link"]:
+            x["description"] = description
+            hit = True
+    if not hit:
+        item["description"] = description
+        item.pop("content:encoded", None)
+        feed_xml["rss"]["channel"]["item"].insert(0, item)
src/config.py
@@ -200,6 +200,20 @@ class ASR:
     DEEPGRAM_API = os.getenv("ASR_DEEPGRAM_API", "")  # comma separated keys for load balance. e.g. "key1,key2,key3"
 
 
+class PODCAST:
+    PROXY = os.getenv("PODCAST_PROXY", None)
+    FEED_URLS = os.getenv("PODCAST_FEED_URLS", "")  # comma separated feed urls
+    OPML_URLS = os.getenv("PODCAST_OPML_URLS", "")  # comma separated opml urls
+    TID = int(os.getenv("PODCAST_TID", "0"))  # send to this chat id
+    FS_ENGINE = os.getenv("PODCAST_FS_ENGINE", "CF-R2")  # file storage engine for hosting podcast feeds
+    ASR_ENGINE = os.getenv("PODCAST_ASR_ENGINE", "gemini")  # default ASR engine
+    ASR_FORCE_GEMINI_TITLES = os.getenv("PODCAST_ASR_FORCE_GEMINI_TITLES", "")  # comma separated titles for force Gemini ASR. (Bypass censorship)
+    ASR_FORCE_GEMINI_DOMAINS = os.getenv("PODCAST_ASR_FORCE_GEMINI_DOMAINS", "anchor.fm,feeds.acast.com")  # comma separated domains for force Gemini ASR. (Bypass censorship)
+    UPDATE_INTERVAL = int(os.getenv("PODCAST_UPDATE_INTERVAL", "3600"))  # in seconds
+    IGNORE_OLD_THAN_SECONDS = int(os.getenv("PODCAST_IGNORE_OLD_THAN_SECONDS", "14400"))  # in seconds
+    KEEP_LATEST_ENTRIES = int(os.getenv("PODCAST_KEEP_LATEST_ENTRIES", "99999999"))  # keep latest entries
+
+
 class GPT:
     """This is for OpenAI compatible API.
 
src/main.py
@@ -25,6 +25,7 @@ from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
 from handler import handle_social_media, handle_utilities
 from llm.summary import daily_summary
 from messages.parser import parse_msg
+from others.podcast import summary_pods
 from permission import check_permission
 from price.entrypoint import match_symbol_category
 from utils import cleanup_old_files, nowdt, to_int
@@ -124,6 +125,7 @@ async def scheduling(client: Client):
             for chat_id, msg in daliy.items():
                 logger.info(f"Sending daily message to {chat_id}: {msg}")
                 await client.send_message(to_int(chat_id), msg)
+    await summary_pods(client)
 
 
 if __name__ == "__main__":
pyproject.toml
@@ -28,6 +28,7 @@ dependencies = [
   "telegraph[aio]>=2.2.0",
   "tiktoken>=0.8.0",
   "uvloop>=0.21.0",
+  "xmltodict>=0.14.2",
   "youtube-transcript-api>=0.6.3",
   "yt-dlp>=2025.1.12rc",
   "zhconv>=1.4.3",
uv.lock
@@ -274,6 +274,7 @@ dependencies = [
     { name = "telegraph", extra = ["aio"] },
     { name = "tiktoken" },
     { name = "uvloop" },
+    { name = "xmltodict" },
     { name = "youtube-transcript-api" },
     { name = "yt-dlp" },
     { name = "zhconv" },
@@ -314,6 +315,7 @@ requires-dist = [
     { name = "telegraph", extras = ["aio"], specifier = ">=2.2.0" },
     { name = "tiktoken", specifier = ">=0.8.0" },
     { name = "uvloop", specifier = ">=0.21.0" },
+    { name = "xmltodict", specifier = ">=0.14.2" },
     { name = "youtube-transcript-api", specifier = ">=0.6.3" },
     { name = "yt-dlp", specifier = ">=2025.1.12rc0" },
     { name = "zhconv", specifier = ">=1.4.3" },
@@ -2998,6 +3000,15 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/37/b1/a252d499f2760b314fcf264d2b36fcc4343a1ecdb25492b210cb0db70a68/XlsxWriter-3.2.3-py3-none-any.whl", hash = "sha256:593f8296e8a91790c6d0378ab08b064f34a642b3feb787cf6738236bd0a4860d", size = 169433 },
 ]
 
+[[package]]
+name = "xmltodict"
+version = "0.14.2"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/50/05/51dcca9a9bf5e1bce52582683ce50980bcadbc4fa5143b9f2b19ab99958f/xmltodict-0.14.2.tar.gz", hash = "sha256:201e7c28bb210e374999d1dde6382923ab0ed1a8a5faeece48ab525b7810a553", size = 51942 }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/d6/45/fc303eb433e8a2a271739c98e953728422fa61a3c1f36077a49e395c972e/xmltodict-0.14.2-py2.py3-none-any.whl", hash = "sha256:20cc7d723ed729276e808f26fb6b3599f786cbc37e06c65e192ba77c40f20aac", size = 9981 },
+]
+
 [[package]]
 name = "yarl"
 version = "1.18.3"