main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3import os
4from urllib.parse import unquote_plus
5
6import feedparser
7from glom import glom
8from loguru import logger
9from pyrogram.client import Client
10from pyrogram.types import Chat, Message
11
12from config import cache
13from database.d1 import create_d1_table, insert_d1, query_d1
14from messages.main import process_message
15from messages.sender import send2tg
16from networking import hx_req
17from podcast.utils import get_pubdate
18from utils import bare_url, https_url, nowdt, rand_number, true
19
20HEADERS = {
21 "User-Agent": "feedparser/6.0.11 +https://github.com/kurtmckee/feedparser/",
22 "Accept": "application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
23}
24TABLE = "rss"
25
26RSS: list[dict] = [
27 {
28 "title": "Blibili时间线",
29 "feed_url": "https://rsshub.zydou.me/bilibili/followings/video/280035751",
30 "interval": 60,
31 "target_chat": -1002178783945,
32 "suffix": " #set_ytdlp_video_target=-1002178783945 #no_ytdlp_send_audio #with_ytdlp_send_subtitle #with_to_telegraph #with_ytdlp_send_summary #set_reply_msg_id=-1 #no_bilibili_comments #no_show_statistics #no_show_progress #set_asr_engine=tencent #set_summary_model_id=bilibili",
33 "is_social_link": 1,
34 },
35 {
36 "title": "华中师范大学",
37 "feed_url": "https://rss.zydou.me/public.php?op=rss&id=30&is_cat=1&key=a8gt976079ed4023481", # gitleaks:allow
38 "guid_key": ["link", "title"],
39 "target_chat": "@CCNU_EDU",
40 "interval": 14400,
41 },
42 {
43 "title": "微博时间线",
44 "feed_url": "https://rss.zydou.me/public.php?op=rss&id=22&is_cat=1&key=wc73rk679236153c269", # gitleaks:allow
45 "target_chat": -1002328010080,
46 "interval": 300,
47 "is_social_link": 1,
48 },
49 {"title": "陈一发儿", "feed_url": "https://rsshub.zydou.me/weibo/user/7357828611", "target_chat": -1001433673794, "interval": 60, "is_social_link": 1},
50]
51
52
53async def update_rss(client: Client):
54 if os.getenv("UPDATE_RSS_DISABLED", "0") == "1":
55 return
56 await create_d1_table(
57 table_name=TABLE,
58 columns="key TEXT PRIMARY KEY, timestamp INTEGER, feed_title TEXT, title TEXT, url TEXT",
59 idx_cols=["key", "timestamp", "feed_title"],
60 silent=True,
61 )
62 for feed in RSS:
63 if cache.get(f"rss-{feed['feed_url']}"):
64 continue
65 interval = int(feed.get("interval", 3600))
66 cache.set(f"rss-{feed['feed_url']}", "1", ttl=interval)
67 feed_title = feed.get("title", "")
68 remote_content = await hx_req(feed["feed_url"], rformat="text", headers=HEADERS, timeout=60, max_retry=0, silent=True)
69 if not remote_content.get("text"):
70 continue
71 parsed = feedparser.parse(remote_content["text"]) # do not parse feed url, because it doesn't support timeout.
72 for entry in await get_new_entries(feed, parsed): # old to latest
73 logger.info(f"RSS【{feed_title}】: {entry['title']} {entry['link']}")
74 if true(feed.get("is_social_link")):
75 texts = feed.get("prefix", "") + entry["link"] + feed.get("suffix", "")
76 message = Message(id=rand_number(), chat=Chat(id=0), text=texts)
77 options = (
78 {
79 "ytdlp_send_audio": False,
80 "douyin_comments_provider": False,
81 "twitter_comments": False,
82 "bilibili_comments": False,
83 "youtube_comments": False,
84 "twitter_provider": "fxtwitter-vxtwitter",
85 "show_statistics": False,
86 "need_prefix": False,
87 "show_progress": False,
88 "reply_msg_id": -1,
89 }
90 | feed.get("options", {})
91 | {"target_chat": feed["target_chat"]}
92 )
93 await process_message(client, message, **options)
94 records = {
95 "timestamp": entry["timestamp"],
96 "feed_title": feed_title,
97 "title": entry["title"],
98 "url": entry["link"],
99 "key": entry["db_key"],
100 }
101 await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
102 else:
103 texts = feed.get("prefix", "")
104 if entry["title"]:
105 texts += f"[{entry['title']}]({entry['link']})"
106 else:
107 texts += entry["link"]
108 while "\n\n" in texts:
109 texts = texts.replace("\n\n", "\n")
110 if feed.get("suffix"):
111 texts += feed.get("suffix", "")
112 await send2tg(
113 client,
114 message=Message(id=rand_number(), chat=Chat(id=0)),
115 texts=texts,
116 target_chat=feed["target_chat"],
117 reply_msg_id=-1,
118 cooldown=int(feed.get("sleep", 1)),
119 )
120 records = {
121 "timestamp": entry["timestamp"],
122 "feed_title": feed_title,
123 "title": entry["title"],
124 "url": entry["link"],
125 "key": entry["db_key"],
126 }
127 await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
128
129
130async def get_new_entries(feed_config: dict, parsed: dict) -> list[dict]:
131 """Get new entries from feed."""
132 feed_title = feed_config.get("title", "FeedTitle")
133 d1 = await query_d1(sql=f"SELECT timestamp,key FROM {TABLE} WHERE feed_title = '{feed_title}' ORDER BY timestamp DESC LIMIT 100", silent=True)
134 if not d1.get("success"):
135 return []
136 finished_keys = set(glom(d1, "result.**.key", default=[]))
137 if len(finished_keys) == 0:
138 return []
139 try:
140 guid_keys = feed_config.get("guid_key", ["link"])
141 now = nowdt()
142 new_entries = []
143 for entry in sorted(parsed["entries"], key=lambda x: x.get("published", x.get("updated", now)), reverse=False): # old to latest
144 key = "".join([bare_url(unquote_plus(entry.get(key, ""))) for key in guid_keys])
145 if key in finished_keys:
146 continue
147 # check again
148 d1 = await query_d1(sql=f"SELECT timestamp FROM {TABLE} WHERE key = '{key}'", silent=True)
149 if glom(d1, "result.0.results.0.timestamp", default=0):
150 continue
151 entry["title"] = entry.get("title", "")
152 entry["link"] = https_url(entry.get("link", ""))
153 entry["timestamp"] = round(get_pubdate(entry).timestamp())
154 entry["db_key"] = key
155 new_entries.append(entry)
156 if new_entries:
157 logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
158 except Exception as e:
159 logger.error(f"Failed to get new entries: {e}")
160 new_entries = []
161 limit = int(feed_config.get("limit", 0))
162 if limit > 0:
163 return new_entries[:limit]
164 return new_entries