main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import json
5import re
6from datetime import datetime
7from pathlib import Path
8from urllib.parse import quote_plus
9from zoneinfo import ZoneInfo
10
11from bs4 import BeautifulSoup
12from glom import Coalesce, glom
13from loguru import logger
14from pyrogram.client import Client
15from pyrogram.types import Message
16
17from bridge.social import send_to_social_media_bridge
18from config import AI, API, DOWNLOAD_DIR, PROVIDER, PROXY, TELEGRAM_UA, TOKEN, TZ, cache
19from cookies import get_weibo_cookies
20from database.r2 import get_cf_r2
21from messages.database import copy_messages_from_db, save_messages
22from messages.progress import modify_progress
23from messages.sender import send2tg
24from messages.utils import blockquote, summay_media
25from networking import download_file, download_first_success_urls, download_media, hx_req
26from others.emoji import emojify
27from preview.utils import add_summary_url
28from summarize.summarize import summarize
29from utils import nowstr, rand_string, readable_count, soup_to_text, split_parts, true
30
31
32async def preview_weibo(
33 client: Client,
34 message: Message,
35 url: str,
36 db_key: str = "",
37 post_id: str = "",
38 *,
39 weibo_provider: str = PROVIDER.WEIBO,
40 weibo_comments: bool = True,
41 summary_weibo: bool = False,
42 summary_weibo_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
43 show_author: bool = True,
44 show_pubdate: bool = True,
45 show_ip: bool = True,
46 show_device: bool = True,
47 show_statistics: bool = True,
48 **kwargs,
49):
50 """Preview weibo link in the message.
51
52 Args:
53 client (Client): The Pyrogram client.
54 message (Message): The trigger message object.
55 url (str, optional): Weibo link.
56 db_key (str, optional): The cache key.
57 post_id (str, optional): Weibo post ID
58 weibo_provider (str, optional): The weibo provider.
59 weibo_comments (bool, optional): Fetch weibo comments. Defaults to True.
60 """
61 if post_id.startswith("weibovideo"):
62 post_id = await weibo_vid_to_postid(post_id)
63
64 real_post_id = real_weibo_post_id(post_id)
65 db_key = db_key.replace(post_id, real_post_id)
66 if kv := await get_cf_r2(db_key):
67 logger.debug(f"Weibo preview cache hit for key={url}")
68 if await copy_messages_from_db(client, message, key=url, kv=kv, **kwargs):
69 return
70 logger.warning("❌从缓存中转发失败, 尝试重新解析...")
71 if kwargs.get("show_progress") and "progress" not in kwargs:
72 res = await send2tg(client, message, texts=f"🔗正在解析微博链接\n{url}", **kwargs)
73 kwargs["progress"] = res[0]
74 this_info = await parse_weibo_info(post_id, **kwargs)
75 if error_msg := this_info.get("error_msg"):
76 if "bridge" in weibo_provider and not this_info.get("force_stop"):
77 await modify_progress(text=f"❌微博解析失败: {error_msg}\n尝试第三方Bot...", **kwargs)
78 kwargs |= {"target_mid": message.id}
79 await send_to_social_media_bridge(client, message, url, **kwargs)
80 else:
81 await modify_progress(text=f"❌微博解析失败: {error_msg}", force_update=True, **kwargs)
82 return
83 quote_info = await parse_weibo_info(post_id, this_info["reply_data"], **kwargs) if this_info.get("reply_data") else {}
84
85 # 生成图片数量说明
86 num_this = len(this_info["media"])
87 num_quote = len(quote_info.get("media", []))
88 part_strs = split_parts(num_this, last=num_quote)
89
90 msg = ""
91 if true(show_author) and this_info.get("author"):
92 msg += f"\n🧣**[{this_info['author']}]({this_info.get('author_url', 'weibo.com')})**"
93
94 if true(show_pubdate) and this_info["dt"]:
95 msg += f"\n🕒{this_info['dt']}"
96 if part_strs["first"] and quote_info: # 当有quote_info时, 附加图片数量说明:
97 msg += f" {part_strs['first']}"
98
99 if true(show_ip) and this_info.get("region"):
100 msg += f"\n📍{this_info['region']}"
101
102 if true(show_device) and this_info.get("device"):
103 msg += f" 📱{this_info['device']}"
104
105 if true(show_statistics) and this_info.get("statistics"):
106 msg += f"\n{this_info['statistics']}"
107
108 if texts := this_info.get("texts"):
109 msg += f"\n{texts}"
110
111 media = this_info.get("media", [])
112 if quote_info:
113 msg += "\n🔁"
114 if true(show_author) and quote_info.get("author"):
115 msg += f"\n**[{quote_info['author']}](https://m.weibo.cn/detail/{quote_info['post_id']})**"
116 msg = msg.replace("\n🔁\n", "\n🔁")
117
118 if true(show_pubdate) and quote_info.get("dt"):
119 msg += f"\n🕒{quote_info['dt']}"
120
121 if part_strs["last"]:
122 msg += f" {part_strs['last']}"
123
124 if true(show_ip) and quote_info.get("region"):
125 msg += f"\n📍{quote_info['region']}"
126
127 if true(show_device) and quote_info.get("device"):
128 msg += f" 📱{quote_info['device']}"
129
130 if true(show_statistics) and quote_info.get("statistics"):
131 msg += f"\n{quote_info['statistics']}"
132
133 if texts := quote_info.get("texts"):
134 msg += f"\n{texts}"
135
136 media.extend(quote_info["media"])
137
138 comments = ""
139 if true(weibo_comments):
140 comments = await parse_weibo_comments(post_id)
141 sent_messages = await send2tg(client, message, texts=emojify(msg.strip()) + comments, media=media, keep_file=True, **kwargs)
142 await modify_progress(del_status=True, **kwargs)
143 # Summary weibo
144 # find the first message that has a caption
145 caption_msg = None
146 index = -1
147 for idx, m in enumerate(sent_messages):
148 if isinstance(m, Message) and (m.caption or m.text):
149 caption_msg = m
150 index = idx
151 break
152 if summary_weibo and caption_msg:
153 edited_msg = await summarize_weibo(caption_msg, this_info, quote_info, media, summary_weibo_model, url)
154 sent_messages[index] = edited_msg
155 await save_messages(messages=sent_messages, key=db_key)
156 # Clean up
157 [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
158
159
160@cache.memoize(ttl=30)
161async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) -> dict:
162 info = {}
163 if not data:
164 weibo_url = f"https://m.weibo.cn/detail/{post_id}"
165 logger.info(f"Weibo link preview for {weibo_url}")
166 headers = {"referer": "https://m.weibo.cn", "cookie": await get_weibo_cookies()}
167 try:
168 resp = await hx_req(weibo_url, headers=headers, proxy=PROXY.WEIBO, mobile=True, rformat="text")
169 if not resp.get("text"):
170 info["error_msg"] = f"Weibo webpage not found: {weibo_url}"
171 return info
172 if not (matched := re.search(r"var \$render_data = (\[.*?\])\[0\]", str(resp["text"]), re.DOTALL)):
173 info["error_msg"] = "Weibo API empty response"
174 if "微博不存在" in str(resp["text"]) or "暂无查看权限" in str(resp["text"]):
175 info["error_msg"] = "微博不存在或暂无查看权限!"
176 info["force_stop"] = True
177 logger.error(info["error_msg"])
178 return info
179 json_data: dict = json.loads(matched.group(1))
180 if not json_data:
181 logger.error(f"Weibo API response cannot be parsed: {matched.group(1)}")
182 info["error_msg"] = "Weibo API response cannot be parsed"
183 return info
184 data: dict = glom(json_data, "0.status", default={}) or {}
185 data["text"] = soup_to_text(soup=BeautifulSoup(data.get("text", ""), "html.parser"))
186 await modify_progress(text="✅解析成功, 正在处理...", **kwargs)
187 except Exception as e:
188 info["error_msg"] = f"Weibo API failed: {e}"
189 logger.error(info["error_msg"])
190 return info
191 else:
192 await modify_progress(text="✅正在解析转发微博...", **kwargs)
193
194 logger.trace(data)
195 media = []
196 for x in data.get("pics", []):
197 pid = glom(x, "pid", default=rand_string())
198 mtype = glom(x, "type", default="photo")
199 photo_url = glom(x, "large.url", default=x.get("url"))
200 video_url = x.get("videoSrc")
201 if mtype == "livephoto":
202 # media.append({"photo": download_file(photo_url, **kwargs)}) # main photo
203 media.append({"video": download_file(video_url, path=f"{DOWNLOAD_DIR}/{pid}.mov", headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
204 elif mtype in ["video", "gifvideos"]:
205 media.append({"video": download_file(video_url, suffix=".mp4", headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
206 else:
207 media.append({"photo": download_file(photo_url, headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
208 if page_info := data.get("page_info", {}):
209 videos = page_info.get("urls", {})
210 if video_urls := [videos.get(quality) for quality in ["mp4_720p_mp4", "mp4_hd_mp4", "mp4_ld_mp4"] if videos.get(quality)]:
211 # This maybe already downloaded by the above loop (for loop in data['pics'])
212 media.append({"video": download_first_success_urls(video_urls, skip_exist=True, suffix=".mp4", headers={"user-agent": TELEGRAM_UA}, proxy=PROXY.WEIBO, stream=True, **kwargs)})
213
214 statistics = ""
215 if like := glom(data, "attitudes_count", default=0):
216 statistics += f"👍{readable_count(like)}"
217 if comment := glom(data, "comments_count", default=0):
218 statistics += f"💬{readable_count(comment)}"
219 if share := glom(data, "reposts_count", default=0):
220 statistics += f"↗️{readable_count(share)}"
221
222 info["post_id"] = glom(data, "id", default=post_id)
223 info["author"] = glom(data, "user.screen_name", default="")
224 info["author_url"] = f"https://m.weibo.cn/detail/{post_id}" # for weibo post, use post url as author url
225 info["region"] = data.get("region_name", "").removeprefix("发布于").strip()
226 info["dt"] = ""
227 with contextlib.suppress(Exception):
228 dt = datetime.strptime(data["created_at"], "%a %b %d %H:%M:%S %z %Y").astimezone(ZoneInfo(TZ))
229 info["dt"] = f"{dt:%Y-%m-%d %H:%M:%S}"
230 info["device"] = data.get("source", "")
231 info["texts"] = soup_to_text(BeautifulSoup(data.get("text", ""), "html.parser"))
232 info["reply_data"] = data.get("retweeted_status", {})
233 info["statistics"] = statistics
234 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
235 media = await download_media(media, **kwargs)
236 # de-duplicate media
237 media_paths = set()
238 final_media = []
239 for x in media:
240 if x.get("photo") and x["photo"] not in media_paths:
241 final_media.append(x)
242 media_paths.add(x["photo"])
243 elif x.get("video") and x["video"] not in media_paths:
244 final_media.append(x)
245 media_paths.add(x["video"])
246 info["media"] = final_media
247 return info
248
249
250@cache.memoize(ttl=120)
251async def weibo_vid_to_postid(post_id: str) -> str:
252 if not post_id.startswith("weibovideo"):
253 return ""
254 url = f"https://video.weibo.com/show?fid={post_id.removeprefix('weibovideo')}"
255 api_url = f"{API.TIKHUB_WEIBO_VIDEO}{quote_plus(url)}"
256 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
257 resp = await hx_req(api_url, headers=headers, proxy=PROXY.WEIBO, check_kv={"data.msg": "succ"}, check_keys=["data.data.Component_Play_Playinfo.mid"])
258 return str(glom(resp, "data.data.Component_Play_Playinfo.mid", default=""))
259
260
261@cache.memoize(ttl=30)
262async def parse_weibo_comments(post_id: str) -> str:
263 if not post_id:
264 return ""
265 headers = {
266 "cookie": await get_weibo_cookies(),
267 "accept": "application/json, text/plain, */*",
268 "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
269 "cache-control": "no-cache",
270 "client-version": "v2.47.17",
271 "dnt": "1",
272 "pragma": "no-cache",
273 "priority": "u=1, i",
274 "referer": "https://weibo.com",
275 "sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"',
276 "sec-ch-ua-mobile": "?0",
277 "sec-ch-ua-platform": '"Linux"',
278 "sec-fetch-dest": "empty",
279 "sec-fetch-mode": "cors",
280 "sec-fetch-site": "same-origin",
281 "server-version": "v2024.12.30.2",
282 "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
283 "x-requested-with": "XMLHttpRequest",
284 }
285 params = {
286 "id": post_id,
287 "is_show_bulletin": 2,
288 "is_mix": 0,
289 "count": 10,
290 "fetch_level": 0,
291 "locale": "zh-CN",
292 "max_id": 0,
293 }
294 api = "https://weibo.com/ajax/statuses/buildComments"
295 resp = await hx_req(api, headers=headers, params=params, proxy=PROXY.WEIBO, check_kv={"ok": 1}, max_retry=1)
296 if resp.get("hx_error"):
297 logger.error(f"Weibo Comments API failed: {resp}")
298 return ""
299
300 comments = ""
301 for info in resp.get("data", []):
302 if not info.get("text"):
303 continue
304 cmt = ""
305 uid = glom(info, "user.id", default="")
306 author = glom(info, "user.screen_name", default="")
307 if author and uid:
308 cmt += f"💬**[{author}](https://weibo.com/u/{uid})**"
309 elif author:
310 cmt += f"💬**{author}**"
311 if region := info.get("source", "").removeprefix("来自"):
312 cmt += f"({region})"
313 cmt += ":"
314 if text := info.get("text"):
315 cmt += f" {soup_to_text(BeautifulSoup(text, 'html.parser'))}"
316 cmt = emojify(cmt)
317 comments += f"\n{cmt}"
318 if comments:
319 comments = f"\n{'💬**点击展开评论**:'}{comments}"
320 return blockquote(comments.strip())
321
322
323def real_weibo_post_id(post_id: str) -> str:
324 """Convert weibo post ID from base62 to decimal format.
325
326 These are the same post:
327 - https://m.weibo.cn/detail/Pdlnlnt0E
328 - https://m.weibo.cn/status/5131804355593060
329
330 This function converts: "Pdlnlnt0E" -> "5131804355593060"
331
332 Args:
333 post_id (str): The base62 weibo post ID to convert.
334
335 Returns:
336 str: The decimal weibo post ID.
337
338 Reference:
339 https://blog.csdn.net/steven30832/article/details/8292230
340 """
341 post_id = str(post_id)
342 if post_id.isdigit():
343 return post_id
344 mapping = {c: i for i, c in enumerate("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")}
345
346 def base62_to_b10(str_62):
347 value = 0
348 for s in str_62:
349 value = value * 62 + mapping[s]
350 return value
351
352 length = len(post_id)
353 mid = ""
354 group = int(length / 4) # four characters per group
355 last_count = length % 4 # head group character counts
356
357 for loop in range(group):
358 value = base62_to_b10(post_id[length - (loop + 1) * 4 : length - loop * 4])
359 mid = str(value) + mid
360 if last_count:
361 value = base62_to_b10(post_id[: length - group * 4])
362 mid = str(value) + mid
363 return mid
364
365
366async def summarize_weibo(message: Message, this_info: dict, quote_info: dict, media_list: list[dict], model: str, url: str) -> Message:
367 """Generate source for AI summary."""
368 data = {"platform": "微博"} | this_info | {"quote_post": quote_info}
369
370 def trim(obj: dict) -> dict:
371 if isinstance(obj, dict):
372 return {k: trim(v) for k, v in obj.items() if v not in ["", None, {}]}
373 if isinstance(obj, list):
374 return [trim(item) for item in obj if item not in ["", None, {}]] # ty:ignore[invalid-return-type]
375 return obj
376
377 data = trim(data)
378 sources = []
379 min_text_length = 1000 # skip short tweets
380 min_video_duration = None
381 for media in media_list:
382 if media.get("photo"):
383 sources.append({"type": "image", "path": media["photo"]})
384 if media.get("video"):
385 min_text_length = None # always summarize video
386 min_video_duration = 120
387 sources.append({"type": "video", "path": media["video"]})
388 sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
389 author_name = glom(data, Coalesce("author", "quote_post.author"), default="Anonymous")
390 title = glom(data, Coalesce("texts", "quote_post.texts"), default="微博")
391 created_at = glom(data, Coalesce("dt", "quote_post.dt"), default=None)
392 summary = await summarize(
393 sources=sources,
394 model=model,
395 title=f"🧣{title}",
396 author=author_name,
397 url=url,
398 date=created_at or nowstr(TZ),
399 min_text_length=min_text_length,
400 min_video_duration=min_video_duration,
401 max_video_duration=3600, # skip long videos more than 1 hour
402 )
403 telegraph_url = summary.get("telegraph_url")
404 if not telegraph_url:
405 return message
406 return await add_summary_url(telegraph_url, message) or message