main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3import os
  4from urllib.parse import unquote_plus
  5
  6import feedparser
  7from glom import glom
  8from loguru import logger
  9from pyrogram.client import Client
 10from pyrogram.types import Chat, Message
 11
 12from config import cache
 13from database.d1 import create_d1_table, insert_d1, query_d1
 14from messages.main import process_message
 15from messages.sender import send2tg
 16from networking import hx_req
 17from podcast.utils import get_pubdate
 18from utils import bare_url, https_url, nowdt, rand_number, true
 19
 20HEADERS = {
 21    "User-Agent": "feedparser/6.0.11 +https://github.com/kurtmckee/feedparser/",
 22    "Accept": "application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
 23}
 24TABLE = "rss"
 25
 26RSS: list[dict] = [
 27    {
 28        "title": "Blibili时间线",
 29        "feed_url": "https://rsshub.zydou.me/bilibili/followings/video/280035751",
 30        "interval": 60,
 31        "target_chat": -1002178783945,
 32        "suffix": " #set_ytdlp_video_target=-1002178783945 #no_ytdlp_send_audio #with_ytdlp_send_subtitle #with_to_telegraph #with_ytdlp_send_summary #set_reply_msg_id=-1 #no_bilibili_comments #no_show_statistics #no_show_progress #set_asr_engine=tencent #set_summary_model_id=bilibili",
 33        "is_social_link": 1,
 34    },
 35    {
 36        "title": "华中师范大学",
 37        "feed_url": "https://rss.zydou.me/public.php?op=rss&id=30&is_cat=1&key=a8gt976079ed4023481",  # gitleaks:allow
 38        "guid_key": ["link", "title"],
 39        "target_chat": "@CCNU_EDU",
 40        "interval": 14400,
 41    },
 42    {
 43        "title": "微博时间线",
 44        "feed_url": "https://rss.zydou.me/public.php?op=rss&id=22&is_cat=1&key=wc73rk679236153c269",  # gitleaks:allow
 45        "target_chat": -1002328010080,
 46        "interval": 300,
 47        "is_social_link": 1,
 48    },
 49    {"title": "陈一发儿", "feed_url": "https://rsshub.zydou.me/weibo/user/7357828611", "target_chat": -1001433673794, "interval": 60, "is_social_link": 1},
 50]
 51
 52
 53async def update_rss(client: Client):
 54    if os.getenv("UPDATE_RSS_DISABLED", "0") == "1":
 55        return
 56    await create_d1_table(
 57        table_name=TABLE,
 58        columns="key TEXT PRIMARY KEY, timestamp INTEGER, feed_title TEXT, title TEXT, url TEXT",
 59        idx_cols=["key", "timestamp", "feed_title"],
 60        silent=True,
 61    )
 62    for feed in RSS:
 63        if cache.get(f"rss-{feed['feed_url']}"):
 64            continue
 65        interval = int(feed.get("interval", 3600))
 66        cache.set(f"rss-{feed['feed_url']}", "1", ttl=interval)
 67        feed_title = feed.get("title", "")
 68        remote_content = await hx_req(feed["feed_url"], rformat="text", headers=HEADERS, timeout=60, max_retry=0, silent=True)
 69        if not remote_content.get("text"):
 70            continue
 71        parsed = feedparser.parse(remote_content["text"])  # do not parse feed url, because it doesn't support timeout.
 72        for entry in await get_new_entries(feed, parsed):  # old to latest
 73            logger.info(f"RSS【{feed_title}】: {entry['title']} {entry['link']}")
 74            if true(feed.get("is_social_link")):
 75                texts = feed.get("prefix", "") + entry["link"] + feed.get("suffix", "")
 76                message = Message(id=rand_number(), chat=Chat(id=0), text=texts)
 77                options = (
 78                    {
 79                        "ytdlp_send_audio": False,
 80                        "douyin_comments_provider": False,
 81                        "twitter_comments": False,
 82                        "bilibili_comments": False,
 83                        "youtube_comments": False,
 84                        "twitter_provider": "fxtwitter-vxtwitter",
 85                        "show_statistics": False,
 86                        "need_prefix": False,
 87                        "show_progress": False,
 88                        "reply_msg_id": -1,
 89                    }
 90                    | feed.get("options", {})
 91                    | {"target_chat": feed["target_chat"]}
 92                )
 93                await process_message(client, message, **options)
 94                records = {
 95                    "timestamp": entry["timestamp"],
 96                    "feed_title": feed_title,
 97                    "title": entry["title"],
 98                    "url": entry["link"],
 99                    "key": entry["db_key"],
100                }
101                await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
102            else:
103                texts = feed.get("prefix", "")
104                if entry["title"]:
105                    texts += f"[{entry['title']}]({entry['link']})"
106                else:
107                    texts += entry["link"]
108                while "\n\n" in texts:
109                    texts = texts.replace("\n\n", "\n")
110                if feed.get("suffix"):
111                    texts += feed.get("suffix", "")
112                await send2tg(
113                    client,
114                    message=Message(id=rand_number(), chat=Chat(id=0)),
115                    texts=texts,
116                    target_chat=feed["target_chat"],
117                    reply_msg_id=-1,
118                    cooldown=int(feed.get("sleep", 1)),
119                )
120                records = {
121                    "timestamp": entry["timestamp"],
122                    "feed_title": feed_title,
123                    "title": entry["title"],
124                    "url": entry["link"],
125                    "key": entry["db_key"],
126                }
127                await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
128
129
130async def get_new_entries(feed_config: dict, parsed: dict) -> list[dict]:
131    """Get new entries from feed."""
132    feed_title = feed_config.get("title", "FeedTitle")
133    d1 = await query_d1(sql=f"SELECT timestamp,key FROM {TABLE} WHERE feed_title = '{feed_title}' ORDER BY timestamp DESC LIMIT 100", silent=True)
134    if not d1.get("success"):
135        return []
136    finished_keys = set(glom(d1, "result.**.key", default=[]))
137    if len(finished_keys) == 0:
138        return []
139    try:
140        guid_keys = feed_config.get("guid_key", ["link"])
141        now = nowdt()
142        new_entries = []
143        for entry in sorted(parsed["entries"], key=lambda x: x.get("published", x.get("updated", now)), reverse=False):  # old to latest
144            key = "".join([bare_url(unquote_plus(entry.get(key, ""))) for key in guid_keys])
145            if key in finished_keys:
146                continue
147            # check again
148            d1 = await query_d1(sql=f"SELECT timestamp FROM {TABLE} WHERE key = '{key}'", silent=True)
149            if glom(d1, "result.0.results.0.timestamp", default=0):
150                continue
151            entry["title"] = entry.get("title", "")
152            entry["link"] = https_url(entry.get("link", ""))
153            entry["timestamp"] = round(get_pubdate(entry).timestamp())
154            entry["db_key"] = key
155            new_entries.append(entry)
156        if new_entries:
157            logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
158    except Exception as e:
159        logger.error(f"Failed to get new entries: {e}")
160        new_entries = []
161    limit = int(feed_config.get("limit", 0))
162    if limit > 0:
163        return new_entries[:limit]
164    return new_entries