main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3import os
4from urllib.parse import unquote_plus
5
6import feedparser
7from glom import glom
8from loguru import logger
9from pyrogram.client import Client
10from pyrogram.types import Chat, Message
11
12from config import cache
13from database.d1 import create_d1_table, insert_d1, query_d1
14from messages.main import process_message
15from messages.sender import send2tg
16from networking import hx_req
17from podcast.utils import get_pubdate
18from utils import bare_url, https_url, nowdt, rand_number, true
19
20HEADERS = {
21 "User-Agent": "feedparser/6.0.11 +https://github.com/kurtmckee/feedparser/",
22 "Accept": "application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
23}
24TABLE = "rss"
25
26RSS: list[dict] = [
27 {
28 "title": "Blibili时间线",
29 "feed_url": "https://rsshub.zydou.me/bilibili/followings/video/280035751",
30 "interval": 60,
31 "target_chat": -1002178783945,
32 "suffix": " #set_ytdlp_video_target=-1002178783945 #with_use_aria2 #no_ytdlp_send_audio #with_ytdlp_send_subtitle #with_to_telegraph #with_summary_ytdlp #set_reply_msg_id=-1 #no_bilibili_comments #no_show_statistics #no_show_progress #set_asr_engine=tencent #set_summary_ytdlp_model=general",
33 "is_social_link": 1,
34 },
35 {
36 "title": "华中师范大学",
37 "feed_url": "https://rss.zydou.me/public.php?op=rss&id=30&is_cat=1&key=a8gt976079ed4023481", # gitleaks:allow
38 "guid_key": ["link", "title"],
39 "target_chat": "@CCNU_EDU",
40 "interval": 14400,
41 },
42 {
43 "title": "微博时间线",
44 "feed_url": "https://rss.zydou.me/public.php?op=rss&id=22&is_cat=1&key=wc73rk679236153c269", # gitleaks:allow
45 "target_chat": -1002328010080,
46 "interval": 300,
47 "is_social_link": 1,
48 },
49 {"title": "陈一发儿", "feed_url": "https://rsshub.zydou.me/weibo/user/7357828611", "target_chat": -1001433673794, "interval": 60, "is_social_link": 1},
50]
51
52
53async def update_rss(client: Client):
54 if os.getenv("UPDATE_RSS_DISABLED", "0") == "1":
55 return
56 await create_d1_table(
57 table_name=TABLE,
58 columns="key TEXT PRIMARY KEY, timestamp INTEGER, feed_title TEXT, title TEXT, url TEXT",
59 idx_cols=["key", "timestamp", "feed_title"],
60 silent=True,
61 )
62 for feed in RSS:
63 if cache.get(f"rss-{feed['feed_url']}"):
64 continue
65 interval = int(feed.get("interval", 3600))
66 cache.set(f"rss-{feed['feed_url']}", "1", ttl=interval)
67 feed_title = feed.get("title", "")
68 remote_content = await hx_req(feed["feed_url"], rformat="text", headers=HEADERS, timeout=60, max_retry=0, silent=True)
69 if not remote_content.get("text"):
70 continue
71 parsed = feedparser.parse(remote_content["text"]) # do not parse feed url, because it doesn't support timeout.
72 for entry in await get_new_entries(feed, parsed): # old to latest
73 logger.info(f"RSS【{feed_title}】: {entry['title']} {entry['link']}")
74 if true(feed.get("is_social_link")):
75 texts = feed.get("prefix", "") + entry["link"] + feed.get("suffix", "")
76 message = Message(id=rand_number(), chat=Chat(id=0), text=texts)
77 options = (
78 {
79 "ytdlp_send_audio": False,
80 "douyin_comments_provider": False,
81 "twitter_comments": False,
82 "bilibili_comments": False,
83 "youtube_comments": False,
84 "show_statistics": False,
85 "enable_corrector": True,
86 "need_prefix": False,
87 "show_progress": False,
88 "reply_msg_id": -1,
89 "ttl": "360d",
90 }
91 | feed.get("options", {})
92 | {"target_chat": feed["target_chat"]}
93 )
94 await process_message(client, message, **options)
95 records = {
96 "timestamp": entry["timestamp"],
97 "feed_title": feed_title,
98 "title": entry["title"],
99 "url": entry["link"],
100 "key": entry["db_key"],
101 }
102 await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
103 else:
104 texts = feed.get("prefix", "")
105 if entry["title"]:
106 texts += f"[{entry['title']}]({entry['link']})"
107 else:
108 texts += entry["link"]
109 while "\n\n" in texts:
110 texts = texts.replace("\n\n", "\n")
111 if feed.get("suffix"):
112 texts += feed.get("suffix", "")
113 await send2tg(
114 client,
115 message=Message(id=rand_number(), chat=Chat(id=0)),
116 texts=texts,
117 target_chat=feed["target_chat"],
118 reply_msg_id=-1,
119 cooldown=int(feed.get("sleep", 1)),
120 )
121 records = {
122 "timestamp": entry["timestamp"],
123 "feed_title": feed_title,
124 "title": entry["title"],
125 "url": entry["link"],
126 "key": entry["db_key"],
127 }
128 await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
129
130
131async def get_new_entries(feed_config: dict, parsed: dict) -> list[dict]:
132 """Get new entries from feed."""
133 feed_title = feed_config.get("title", "FeedTitle")
134 d1 = await query_d1(sql=f"SELECT timestamp,key FROM {TABLE} WHERE feed_title = '{feed_title}' ORDER BY timestamp DESC LIMIT 100", silent=True)
135 if not d1.get("success"):
136 return []
137 finished_keys = set(glom(d1, "result.**.key", default=[]))
138 if len(finished_keys) == 0:
139 return []
140 try:
141 guid_keys = feed_config.get("guid_key", ["link"])
142 now = nowdt()
143 new_entries = []
144 for entry in sorted(parsed["entries"], key=lambda x: x.get("published", x.get("updated", now)), reverse=False): # old to latest
145 key = "".join([bare_url(unquote_plus(entry.get(key, ""))) for key in guid_keys])
146 if key in finished_keys:
147 continue
148 # check again
149 d1 = await query_d1(sql=f"SELECT timestamp FROM {TABLE} WHERE key = '{key}'", silent=True)
150 if glom(d1, "result.0.results.0.timestamp", default=0):
151 continue
152 entry["title"] = entry.get("title", "")
153 entry["link"] = https_url(entry.get("link", ""))
154 entry["timestamp"] = round(get_pubdate(entry).timestamp())
155 entry["db_key"] = key
156 new_entries.append(entry)
157 if new_entries:
158 logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
159 except Exception as e:
160 logger.error(f"Failed to get new entries: {e}")
161 new_entries = []
162 limit = int(feed_config.get("limit", 0))
163 if limit > 0:
164 return new_entries[:limit]
165 return new_entries