main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import json
5import re
6from datetime import datetime
7from urllib.parse import quote_plus
8from zoneinfo import ZoneInfo
9
10from bs4 import BeautifulSoup
11from glom import glom
12from loguru import logger
13from pyrogram.client import Client
14from pyrogram.types import Message
15
16from bridge.social import send_to_social_media_bridge
17from config import API, DB, DOWNLOAD_DIR, PROVIDER, PROXY, TELEGRAM_UA, TOKEN, TZ, cache
18from cookies import get_weibo_cookies
19from database.database import get_db
20from messages.database import copy_messages_from_db, save_messages
21from messages.progress import modify_progress
22from messages.sender import send2tg
23from messages.utils import blockquote, summay_media
24from networking import download_file, download_first_success_urls, download_media, hx_req
25from others.emoji import emojify
26from utils import rand_string, readable_count, soup_to_text, split_parts, true
27
28
29async def preview_weibo(
30 client: Client,
31 message: Message,
32 url: str,
33 db_key: str = "",
34 post_id: str = "",
35 *,
36 weibo_provider: str = PROVIDER.WEIBO,
37 weibo_comments: bool = True,
38 show_author: bool = True,
39 show_pubdate: bool = True,
40 show_ip: bool = True,
41 show_device: bool = True,
42 show_statistics: bool = True,
43 **kwargs,
44):
45 """Preview weibo link in the message.
46
47 Args:
48 client (Client): The Pyrogram client.
49 message (Message): The trigger message object.
50 url (str, optional): Weibo link.
51 db_key (str, optional): The cache key.
52 post_id (str, optional): Weibo post ID
53 weibo_provider (str, optional): The weibo provider.
54 weibo_comments (bool, optional): Fetch weibo comments. Defaults to True.
55 """
56 if post_id.startswith("weibovideo"):
57 post_id = await weibo_vid_to_postid(post_id)
58
59 real_post_id = real_weibo_post_id(post_id)
60 db_key = db_key.replace(post_id, real_post_id)
61 if kwargs.get("show_progress") and "progress" not in kwargs:
62 res = await send2tg(client, message, texts=f"🔗正在解析微博链接\n{url}", **kwargs)
63 kwargs["progress"] = res[0]
64 if kv := await get_db(db_key):
65 logger.debug(f"Weibo preview {DB.ENGINE} cache hit for key={url}")
66 if await copy_messages_from_db(client, message, key=url, kv=kv, **kwargs):
67 return
68 await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
69 this_info = await parse_weibo_info(post_id, **kwargs)
70 if error_msg := this_info.get("error_msg"):
71 if "bridge" in weibo_provider and not this_info.get("force_stop"):
72 await modify_progress(text=f"❌微博解析失败: {error_msg}\n尝试第三方Bot...", **kwargs)
73 kwargs |= {"target_mid": message.id}
74 await send_to_social_media_bridge(client, message, url, **kwargs)
75 else:
76 await modify_progress(text=f"❌微博解析失败: {error_msg}", force_update=True, **kwargs)
77 return
78 quote_info = await parse_weibo_info(post_id, this_info["reply_data"], **kwargs) if this_info.get("reply_data") else {}
79
80 # 生成图片数量说明
81 num_this = len(this_info["media"])
82 num_quote = len(quote_info.get("media", []))
83 part_strs = split_parts(num_this, last=num_quote)
84
85 msg = ""
86 if true(show_author) and this_info.get("author"):
87 msg += f"\n🧣**[{this_info['author']}]({this_info.get('author_url', 'weibo.com')})**"
88
89 if true(show_pubdate) and this_info["dt"]:
90 msg += f"\n🕒{this_info['dt']}"
91 if part_strs["first"] and quote_info: # 当有quote_info时, 附加图片数量说明:
92 msg += f" {part_strs['first']}"
93
94 if true(show_ip) and this_info.get("region"):
95 msg += f"\n📍{this_info['region']}"
96
97 if true(show_device) and this_info.get("device"):
98 msg += f" 📱{this_info['device']}"
99
100 if true(show_statistics) and this_info.get("statistics"):
101 msg += f"\n{this_info['statistics']}"
102
103 if texts := this_info.get("texts"):
104 msg += f"\n{texts}"
105
106 media = this_info.get("media", [])
107 if quote_info:
108 msg += "\n🔁"
109 if true(show_author) and quote_info.get("author"):
110 msg += f"\n**[{quote_info['author']}](https://m.weibo.cn/detail/{quote_info['post_id']})**"
111 msg = msg.replace("\n🔁\n", "\n🔁")
112
113 if true(show_pubdate) and quote_info.get("dt"):
114 msg += f"\n🕒{quote_info['dt']}"
115
116 if part_strs["last"]:
117 msg += f" {part_strs['last']}"
118
119 if true(show_ip) and quote_info.get("region"):
120 msg += f"\n📍{quote_info['region']}"
121
122 if true(show_device) and quote_info.get("device"):
123 msg += f" 📱{quote_info['device']}"
124
125 if true(show_statistics) and quote_info.get("statistics"):
126 msg += f"\n{quote_info['statistics']}"
127
128 if texts := quote_info.get("texts"):
129 msg += f"\n{texts}"
130
131 media.extend(quote_info["media"])
132
133 comments = []
134 if true(weibo_comments):
135 comments = await parse_weibo_comments(post_id)
136 sent_messages = await send2tg(client, message, texts=emojify(msg.strip()), media=media, comments=comments, **kwargs)
137 await modify_progress(del_status=True, **kwargs)
138 await save_messages(messages=sent_messages, key=db_key)
139
140
141@cache.memoize(ttl=30)
142async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) -> dict:
143 info = {}
144 if not data:
145 weibo_url = f"https://m.weibo.cn/detail/{post_id}"
146 logger.info(f"Weibo link preview for {weibo_url}")
147 headers = {"referer": "https://m.weibo.cn", "cookie": await get_weibo_cookies()}
148 try:
149 resp = await hx_req(weibo_url, headers=headers, proxy=PROXY.WEIBO, mobile=True, rformat="text")
150 if not resp.get("text"):
151 info["error_msg"] = f"Weibo webpage not found: {weibo_url}"
152 return info
153 if not (matched := re.search(r"var \$render_data = (\[.*?\])\[0\]", str(resp["text"]), re.DOTALL)):
154 info["error_msg"] = "Weibo API empty response"
155 if "微博不存在" in str(resp["text"]) or "暂无查看权限" in str(resp["text"]):
156 info["error_msg"] = "微博不存在或暂无查看权限!"
157 info["force_stop"] = True
158 logger.error(info["error_msg"])
159 return info
160 json_data: dict = json.loads(matched.group(1))
161 if not json_data:
162 logger.error(f"Weibo API response cannot be parsed: {matched.group(1)}")
163 info["error_msg"] = "Weibo API response cannot be parsed"
164 return info
165 data: dict = glom(json_data, "0.status", default={}) or {}
166 data["text"] = soup_to_text(soup=BeautifulSoup(data.get("text", ""), "html.parser"))
167 await modify_progress(text="✅解析成功, 正在处理...", **kwargs)
168 except Exception as e:
169 info["error_msg"] = f"Weibo API failed: {e}"
170 logger.error(info["error_msg"])
171 return info
172 else:
173 await modify_progress(text="✅正在解析转发微博...", **kwargs)
174
175 logger.trace(data)
176 media = []
177 for x in data.get("pics", []):
178 pid = glom(x, "pid", default=rand_string())
179 mtype = glom(x, "type", default="photo")
180 photo_url = glom(x, "large.url", default=x.get("url"))
181 video_url = x.get("videoSrc")
182 if mtype == "livephoto":
183 # media.append({"photo": download_file(photo_url, **kwargs)}) # main photo
184 media.append({"video": download_file(video_url, path=f"{DOWNLOAD_DIR}/{pid}.mov", headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
185 elif mtype in ["video", "gifvideos"]:
186 media.append({"video": download_file(video_url, suffix=".mp4", headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
187 else:
188 media.append({"photo": download_file(photo_url, headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
189 if page_info := data.get("page_info", {}):
190 videos = page_info.get("urls", {})
191 if video_urls := [videos.get(quality) for quality in ["mp4_720p_mp4", "mp4_hd_mp4", "mp4_ld_mp4"] if videos.get(quality)]:
192 # This maybe already downloaded by the above loop (for loop in data['pics'])
193 media.append({"video": download_first_success_urls(video_urls, skip_exist=True, suffix=".mp4", headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
194
195 statistics = ""
196 if like := glom(data, "attitudes_count", default=0):
197 statistics += f"👍{readable_count(like)}"
198 if comment := glom(data, "comments_count", default=0):
199 statistics += f"💬{readable_count(comment)}"
200 if share := glom(data, "reposts_count", default=0):
201 statistics += f"↗️{readable_count(share)}"
202
203 info["post_id"] = glom(data, "id", default=post_id)
204 info["author"] = glom(data, "user.screen_name", default="")
205 info["author_url"] = f"https://m.weibo.cn/detail/{post_id}" # for weibo post, use post url as author url
206 info["region"] = data.get("region_name", "").removeprefix("发布于").strip()
207 info["dt"] = ""
208 with contextlib.suppress(Exception):
209 dt = datetime.strptime(data["created_at"], "%a %b %d %H:%M:%S %z %Y").astimezone(ZoneInfo(TZ))
210 info["dt"] = f"{dt:%Y-%m-%d %H:%M:%S}"
211 info["device"] = data.get("source", "")
212 info["texts"] = soup_to_text(BeautifulSoup(data.get("text", ""), "html.parser"))
213 info["reply_data"] = data.get("retweeted_status", {})
214 info["statistics"] = statistics
215 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
216 media = await download_media(media, **kwargs)
217 # de-duplicate media
218 media_paths = set()
219 final_media = []
220 for x in media:
221 if x.get("photo") and x["photo"] not in media_paths:
222 final_media.append(x)
223 media_paths.add(x["photo"])
224 elif x.get("video") and x["video"] not in media_paths:
225 final_media.append(x)
226 media_paths.add(x["video"])
227 info["media"] = final_media
228 return info
229
230
231@cache.memoize(ttl=120)
232async def weibo_vid_to_postid(post_id: str) -> str:
233 if not post_id.startswith("weibovideo"):
234 return ""
235 url = f"https://video.weibo.com/show?fid={post_id.removeprefix('weibovideo')}"
236 api_url = f"{API.TIKHUB_WEIBO_VIDEO}{quote_plus(url)}"
237 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
238 resp = await hx_req(api_url, headers=headers, proxy=PROXY.WEIBO, check_kv={"data.msg": "succ"}, check_keys=["data.data.Component_Play_Playinfo.mid"])
239 return str(glom(resp, "data.data.Component_Play_Playinfo.mid", default=""))
240
241
242@cache.memoize(ttl=30)
243async def parse_weibo_comments(post_id: str) -> str:
244 if not post_id:
245 return ""
246 headers = {
247 "cookie": await get_weibo_cookies(),
248 "accept": "application/json, text/plain, */*",
249 "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
250 "cache-control": "no-cache",
251 "client-version": "v2.47.17",
252 "dnt": "1",
253 "pragma": "no-cache",
254 "priority": "u=1, i",
255 "referer": "https://weibo.com",
256 "sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"',
257 "sec-ch-ua-mobile": "?0",
258 "sec-ch-ua-platform": '"Linux"',
259 "sec-fetch-dest": "empty",
260 "sec-fetch-mode": "cors",
261 "sec-fetch-site": "same-origin",
262 "server-version": "v2024.12.30.2",
263 "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
264 "x-requested-with": "XMLHttpRequest",
265 }
266 params = {
267 "id": post_id,
268 "is_show_bulletin": 2,
269 "is_mix": 0,
270 "count": 10,
271 "fetch_level": 0,
272 "locale": "zh-CN",
273 "max_id": 0,
274 }
275 api = "https://weibo.com/ajax/statuses/buildComments"
276 resp = await hx_req(api, headers=headers, params=params, proxy=PROXY.WEIBO, check_kv={"ok": 1}, max_retry=1)
277 if resp.get("hx_error"):
278 logger.error(f"Weibo Comments API failed: {resp}")
279 return ""
280
281 comments = ""
282 for info in resp.get("data", []):
283 if not info.get("text"):
284 continue
285 cmt = ""
286 uid = glom(info, "user.id", default="")
287 author = glom(info, "user.screen_name", default="")
288 if author and uid:
289 cmt += f"💬**[{author}](https://weibo.com/u/{uid})**"
290 elif author:
291 cmt += f"💬**{author}**"
292 if region := info.get("source", "").removeprefix("来自"):
293 cmt += f"({region})"
294 cmt += ":"
295 if text := info.get("text"):
296 cmt += f" {soup_to_text(BeautifulSoup(text, 'html.parser'))}"
297 cmt = emojify(cmt)
298 comments += f"\n{blockquote(cmt)}"
299 if comments:
300 comments = f"\n{blockquote('💬**点此展开评论区**:')}{comments}"
301 return comments
302
303
304def real_weibo_post_id(post_id: str) -> str:
305 """Convert weibo post ID from base62 to decimal format.
306
307 These are the same post:
308 - https://m.weibo.cn/detail/Pdlnlnt0E
309 - https://m.weibo.cn/status/5131804355593060
310
311 This function converts: "Pdlnlnt0E" -> "5131804355593060"
312
313 Args:
314 post_id (str): The base62 weibo post ID to convert.
315
316 Returns:
317 str: The decimal weibo post ID.
318
319 Reference:
320 https://blog.csdn.net/steven30832/article/details/8292230
321 """
322 post_id = str(post_id)
323 if post_id.isdigit():
324 return post_id
325 mapping = {c: i for i, c in enumerate("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")}
326
327 def base62_to_b10(str_62):
328 value = 0
329 for s in str_62:
330 value = value * 62 + mapping[s]
331 return value
332
333 length = len(post_id)
334 mid = ""
335 group = int(length / 4) # four characters per group
336 last_count = length % 4 # head group character counts
337
338 for loop in range(group):
339 value = base62_to_b10(post_id[length - (loop + 1) * 4 : length - loop * 4])
340 mid = str(value) + mid
341 if last_count:
342 value = base62_to_b10(post_id[: length - group * 4])
343 mid = str(value) + mid
344 return mid