Commit 9cf8900
Changed files (8)
src/preview/douyin.py
@@ -21,6 +21,7 @@ async def preview_douyin(
client: Client,
message: Message,
url: str = "",
+ db_key: str = "",
platform: str = "douyin",
douyin_extractor: str | None = None,
douyin_comments_extractor: str | None = None,
@@ -35,6 +36,7 @@ async def preview_douyin(
client (Client): The Pyrogram client.
message (Message): The trigger message object.
url (str, optional): The douyin or tiktok link.
+ db_key (str, optional): The cache key.
platform(str, optional): The platform name. Defaults to "douyin".
douyin_extractor (str, optional): The douyin extractor: "free" or "tikhub". Defaults to "free".
douyin_comments_extractor (str, optional): The douyin comments extractor: "free" or "tikhub". Defaults to "free".
@@ -44,7 +46,6 @@ async def preview_douyin(
if kwargs.get("show_progress") and "progress" not in kwargs:
res = await send2tg(client, message, texts=f"🔗正在解析抖音链接\n{url}", **kwargs)
kwargs["progress"] = res[0]
- db_key = url
if kv := await get_db(db_key):
logger.debug(f"{platform} preview {DB.ENGINE} cache hit for key={db_key}")
if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
src/preview/instagram.py
@@ -19,19 +19,19 @@ from networking import download_file, download_media, hx_req
@cache.memoize(ttl=30)
-async def preview_instagram(client: Client, message: Message, url: str = "", *, fallback: bool = True, **kwargs):
+async def preview_instagram(client: Client, message: Message, url: str = "", db_key: str = "", *, fallback: bool = True, **kwargs):
"""Preview instagram link in the message.
Args:
client (Client): The Pyrogram client.
message (Message): The trigger message object.
url (str, optional): Tnstagram link.
+ db_key (str, optional): The cache key.
fallback (bool, optional): Fallback to other bots. Defaults to True.
"""
if kwargs.get("show_progress") and "progress" not in kwargs:
res = await send2tg(client, message, texts=f"🔗正在解析Instagram链接\n{url}", **kwargs)
kwargs["progress"] = res[0]
- db_key = url
if kv := await get_db(db_key):
logger.debug(f"Instagram preview {DB.ENGINE} cache hit for key={db_key}")
if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
src/preview/twitter.py
@@ -18,24 +18,23 @@ from utils import remove_none_values, split_parts, true
@cache.memoize(ttl=30)
-async def preview_twitter(client: Client, message: Message, domain: str = "", handle: str = "", post_id: str = "", twitter_extractor: str | None = None, **kwargs):
+async def preview_twitter(client: Client, message: Message, url: str = "", db_key: str = "", platform: str = "", twitter_extractor: str | None = None, **kwargs):
"""Preview twitter link in the message.
Args:
client (Client): The Pyrogram client.
message (Message): The trigger message object.
- domain (str): The domain of the link: twitter.com, x.com, fxtwitter.com, fixupx.com.
+ platform (str): The domain of the link: twitter, x, fxtwitter, fixupx
handle (str): The twitter handle.
post_id (str): The twitter post id.
twitter_extractor (str): The extractor to use: fxtwitter or tikhub. Defaults to "tikhub".
If skip_fxtwitter is set to True, and the domain is fxtwitter or fixupx, this function is skipped.
"""
- if true(kwargs.get("skip_fxtwitter")) and domain in ["fxtwitter", "fixupx"]:
+ if true(kwargs.get("skip_fxtwitter")) and platform in ["fxtwitter", "fixupx"]:
return
- db_key = f"https://x.com/{handle}/status/{post_id}"
if kwargs.get("show_progress") and "progress" not in kwargs:
- res = await send2tg(client, message, texts=f"🔗正在解析推特链接\n{db_key}", **kwargs)
+ res = await send2tg(client, message, texts=f"🔗正在解析推特链接\n{url}", **kwargs)
kwargs["progress"] = res[0]
if kv := await get_db(db_key):
logger.debug(f"Twitter preview {DB.ENGINE} cache hit for key={db_key}")
@@ -45,7 +44,7 @@ async def preview_twitter(client: Client, message: Message, domain: str = "", ha
succ = False
if twitter_extractor is None or twitter_extractor == "tikhub": # try tikhub first
try:
- this_info = await get_tweet_info_via_tikhub(post_id=post_id, **kwargs)
+ this_info = await get_tweet_info_via_tikhub(url=url, **kwargs)
if not this_info:
await modify_progress(text="❌推特解析失败", **kwargs)
await asyncio.sleep(1)
@@ -58,7 +57,7 @@ async def preview_twitter(client: Client, message: Message, domain: str = "", ha
if not succ: # try fxtwitter
try:
- this_info = await get_tweet_info_via_fxtwitter(handle=handle, post_id=post_id)
+ this_info = await get_tweet_info_via_fxtwitter(url=url)
if not this_info:
await modify_progress(text="❌推特解析失败", **kwargs)
await asyncio.sleep(1)
@@ -143,7 +142,7 @@ async def preview_twitter(client: Client, message: Message, domain: str = "", ha
msg += f"\n🕊**{this_tweet_type}内容:**"
if author := this_info.get("author"):
- msg += f"\n🕊[{author}](https://x.com/{handle}/status/{post_id})"
+ msg += f"\n🕊[{author}]({url})"
if time_str := this_info.get("time"):
msg += f"\n🕒{time_str}"
@@ -195,8 +194,13 @@ async def preview_twitter(client: Client, message: Message, domain: str = "", ha
@cache.memoize(ttl=30)
-async def get_tweet_info_via_tikhub(post_id: str = "", quote_info: dict | None = None, **kwargs) -> dict: # type: ignore
- """Get a single tweet info."""
+async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info: dict | None = None, **kwargs) -> dict: # type: ignore
+ """Get a single tweet info.
+
+ url: https://x.com/{handle}/status/{post_id}
+ """
+ if not post_id:
+ post_id = url.split("/")[-1]
api_url = f"{API.TIKHUB_TWITTER}{post_id}"
logger.info(f"Twitter preview via TikHub: {api_url}")
data = {}
@@ -263,8 +267,14 @@ async def get_tweet_info_via_tikhub(post_id: str = "", quote_info: dict | None =
@cache.memoize(ttl=30)
-async def get_tweet_info_via_fxtwitter(handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict: # type: ignore
- """Get a single tweet info."""
+async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id: str = "", quote_info: dict | None = None) -> dict: # type: ignore
+ """Get a single tweet info.
+
+ url: https://x.com/{handle}/status/{post_id}
+ """
+ if not handle or not post_id:
+ handle = url.split("/")[-3]
+ post_id = url.split("/")[-1]
data = {}
if quote_info:
data = copy.deepcopy(quote_info)
src/preview/xiaohongshu.py
@@ -19,20 +19,18 @@ from others.emoji import emojify
@cache.memoize(ttl=30)
-async def preview_xhs(client: Client, message: Message, xhslink: str = "", post_id: str = "", xsec_token: str = "", *, fallback: bool = True, **kwargs):
+async def preview_xhs(client: Client, message: Message, url: str = "", db_key: str = "", *, fallback: bool = True, **kwargs):
"""Preview xiaohongshu link in the message.
Args:
client (Client): The Pyrogram client.
message (Message): The trigger message object.
- xhslink (str, optional): xiaohongshu link in xhslink.com domain.
- post_id (str, optional): xhs post ID
- xsec_token (str, optional): xhs xsec_token. (This is mandatory for links are not xhslink.com)
+ url (str, optional): xiaohongshu link
+ db_key (str, optional): The cache key.
fallback (bool, optional): Fallback to other bots. Defaults to True.
"""
- db_key = f"https://www.xiaohongshu.com/explore/{post_id}"
if kwargs.get("show_progress") and "progress" not in kwargs:
- res = await send2tg(client, message, texts=f"🔗正在解析小红书链接\n{db_key}", **kwargs)
+ res = await send2tg(client, message, texts=f"🔗正在解析小红书链接\n{url}", **kwargs)
kwargs["progress"] = res[0]
if kv := await get_db(db_key):
logger.debug(f"Xiaohongshu preview {DB.ENGINE} cache hit for key={db_key}")
@@ -40,14 +38,13 @@ async def preview_xhs(client: Client, message: Message, xhslink: str = "", post_
return
await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
- if not xhslink and not xsec_token:
+ if "xhslink.com" not in url and "xsec_token" not in url:
msg = "链接格式错误: 缺少 xsec_token 参数, 请发送完整链接"
msg += "\n或者使用手机APP分享的链接 (xhslink.com域名)"
await send2tg(client, message, texts=msg, **kwargs)
await modify_progress(del_status=True, **kwargs)
return
- url = xhslink if xhslink else f"https://www.xiaohongshu.com/explore/{post_id}?xsec_token={xsec_token}"
logger.info(f"Xiaohongshu link preview for {url}")
xhs_info = await get_xhs_info(url)
note = xhs_info.get("note", {})
src/database.py
@@ -14,7 +14,7 @@ from httpx import AsyncClient, AsyncHTTPTransport
from loguru import logger
from config import DB, cache
-from utils import plain_url, stringfy
+from utils import bare_url, stringfy
async def get_db(key: str) -> dict:
@@ -93,7 +93,7 @@ async def get_cf_r2(key: str) -> dict:
logger.warning("SKIP GET CF-R2: Cloudflare R2 disabled")
return {}
- key = plain_url(unquote_plus(key)) # remove http(s):// prefix
+ key = bare_url(unquote_plus(key)) # remove http(s):// prefix
async with Session().client(
service_name="s3",
endpoint_url=f"https://{DB.CF_ACCOUNT_ID}.r2.cloudflarestorage.com",
@@ -178,7 +178,7 @@ async def set_cf_r2(key: str, data: dict | list | str | None = None, metadata: d
if not DB.CF_R2_ENABLED:
logger.warning("SKIP SET CF-R2: Cloudflare R2 disabled")
return True
- key = plain_url(unquote_plus(key)) # remove http(s):// prefix
+ key = bare_url(unquote_plus(key)) # remove http(s):// prefix
payload = {
"CacheControl": "no-cache",
"Bucket": DB.CF_R2_BUCKET_NAME,
@@ -246,7 +246,7 @@ async def del_cf_r2(key: str):
if not DB.CF_R2_ENABLED:
logger.warning("SKIP SET CF-R2: Cloudflare R2 disabled")
return
- key = plain_url(unquote_plus(key)) # remove http(s):// prefix
+ key = bare_url(unquote_plus(key)) # remove http(s):// prefix
async with Session().client(
service_name="s3",
endpoint_url=f"https://{DB.CF_ACCOUNT_ID}.r2.cloudflarestorage.com",
src/handler.py
@@ -146,7 +146,7 @@ async def handle_social_media(
await preview_douyin(client, message, **kwargs)
if instagram and matched["platform"] == "instagram" and ENABLE.INSTAGRAM:
await preview_instagram(client, message, **kwargs)
- if twitter and matched["platform"] == "twitter" and ENABLE.TWITTER:
+ if twitter and matched["platform"] in ["x", "twitter", "fxtwitter", "fixupx"] and ENABLE.TWITTER:
await preview_twitter(client, message, **kwargs)
if weibo and matched["platform"] == "weibo" and ENABLE.WEIBO:
await preview_weibo(client, message, **kwargs)
src/networking.py
@@ -16,7 +16,7 @@ from loguru import logger
from config import DOWNLOAD_DIR, PROXY, UA, cache, semaphore
from message_utils import modify_progress, summay_media
-from utils import https_url, readable_size
+from utils import bare_url, https_url, readable_size
# ruff: noqa: RUF001
MOBILE_HEADERS = {
@@ -286,8 +286,10 @@ async def match_social_media_link(text: str, *, flatten_first: bool = False) ->
text (str): The text to search for social media links.
Returns:
- dict: A dictionary containing the matched information. (must have a key named "platform")
-
+ dict: A dictionary containing the matched information. At least "platform", "url", and "db_key" keys are present.
+ platform: The social media platform name.
+ url: The matched URL.
+ db_key: The key to store in the cache.
#! TODO: Handle multiple links in one message.
"""
if flatten_first:
@@ -295,52 +297,52 @@ async def match_social_media_link(text: str, *, flatten_first: bool = False) ->
matched_info = {"platform": ""}
# https://www.douyin.com/video/7398813386827468041
if matched := re.search(r"(https?://)?(www\.)?douyin\.com/video/(\d+)", text):
- matched_info = {"url": https_url(matched.group(0)), "platform": "douyin"}
+ matched_info = {"url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "douyin"}
# https://www.douyin.com/user/MS4wLjABAAAAXgBuOEcyavDhrRBqnD8x7d4pj7RIL5QFRlLehCnem8couoAg8yXR-MGhUK0i4riF?modal_id=7451543857952492810
if matched := re.search(r"(https?://)?(www\.)?douyin\.com/user/(.*?)\?(.*?)modal_id=(\d+)", text):
- matched_info = {"url": f"https://www.douyin.com/video/{matched.group(5)}", "platform": "douyin"}
+ matched_info = {"url": f"https://www.douyin.com/video/{matched.group(5)}", "db_key": f"www.douyin.com/video/{matched.group(5)}", "platform": "douyin"}
# https://www.douyin.com/note/7458195074434846004
if matched := re.search(r"(https?://)?(www\.)?douyin\.com/note/(\d+)", text):
- matched_info = {"url": f"https://www.douyin.com/note/{matched.group(3)}", "platform": "douyin"}
+ matched_info = {"url": f"https://www.douyin.com/note/{matched.group(3)}", "db_key": f"www.douyin.com/note/{matched.group(3)}", "platform": "douyin"}
# https://www.tiktok.com/@baymermel/video/7460653893941267755\?_t\=ZS-8t8YbVWqv5k\&_r\=1
if matched := re.search(r"(https?://)?(www\.)?tiktok\.com/(.*?)/(\d+)", text):
- matched_info = {"url": https_url(matched.group(0)), "platform": "tiktok"}
+ matched_info = {"url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "tiktok"}
# https://www.instagram.com/p/C7P3jN8vmEN
# https://www.instagram.com/reel/DBBEGXpvwNF
if matched := re.search(r"(https?://)?(www\.)?instagram\.com/(:?|p|reel)/([^.。,,/\s]+)", text):
- matched_info = {"post_type": matched.group(3), "post_id": matched.group(4), "url": https_url(matched.group(0)), "platform": "instagram"}
+ matched_info = {"post_type": matched.group(3), "post_id": matched.group(4), "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "instagram"}
# https://www.instagram.com/yifaer_chen/p/DEzv9x-vzOn/
if matched := re.search(r"(https?://)?(www\.)?instagram\.com/\w+/(:?|p|reel)/([^.。,,/\s]+)", text):
- matched_info = {"post_type": matched.group(3), "post_id": matched.group(4), "url": https_url(matched.group(0)), "platform": "instagram"}
+ matched_info = {"post_type": matched.group(3), "post_id": matched.group(4), "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "instagram"}
# https://x.com/taylorswift13/status/1794805688696275131
# https://twitter.com/taylorswift13/status/1794805688696275131
# https://fixupx.com/taylorswift13/status/1794805688696275131
# https://fxtwitter.com/taylorswift13/status/1794805688696275131
if matched := re.search(r"(https?://)?(:?twitter|x|fxtwitter|fixupx)\.com\/(\w+)\/status/(\d+)", text):
- domain = matched.group(2)
+ platform = matched.group(2)
handle = matched.group(3)
post_id = matched.group(4)
url = f"https://x.com/{handle}/status/{post_id}"
- matched_info = {"domain": domain, "handle": handle, "post_id": post_id, "url": url, "platform": "twitter"}
+ matched_info = {"platform": platform, "handle": handle, "post_id": post_id, "url": url, "db_key": bare_url(url)}
# https://weibo.com/1736562685/P6lhSjRnI
if matched := re.search(r"(https?://)?(www\.)?weibo\.com/(.*?)/(\w+)", text):
- matched_info = {"post_id": matched.group(4), "url": https_url(matched.group(0)), "platform": "weibo"}
+ matched_info = {"post_id": matched.group(4), "url": https_url(matched.group(0)), "db_key": f"m.weibo.cn/detail/{matched.group(4)}", "platform": "weibo"}
# https://m.weibo.cn/detail/5113333048938691
# https://m.weibo.cn/status/5113333048938691
if matched := re.search(r"(https?://)?m\.weibo\.cn/(:?detail|status)/(\w+)", text):
- matched_info = {"post_id": matched.group(3), "url": https_url(matched.group(0)), "platform": "weibo"}
+ matched_info = {"post_id": matched.group(3), "url": https_url(matched.group(0)), "db_key": f"m.weibo.cn/detail/{matched.group(3)}", "platform": "weibo"}
# https://video.weibo.com/show?fid=1034:5123779299311660
if matched := re.search(r"(https?://)?video\.weibo\.(:?com|cn)/show\?fid=(\d+):(\d+)", text):
- matched_info = {"post_id": f"weibovideo{matched.group(3)}:{matched.group(4)}", "url": https_url(matched.group(0)), "platform": "weibo"}
+ matched_info = {"post_id": f"weibovideo{matched.group(3)}:{matched.group(4)}", "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "weibo"}
# https://weibo.com/tv/show/1034:5123779299311660?from=old_pc_videoshow
if matched := re.search(r"(https?://)?(www\.)?weibo\.(:?com|cn)/tv/show/(\d+):(\d+)", text):
- matched_info = {"post_id": f"weibovideo{matched.group(4)}:{matched.group(5)}", "url": https_url(matched.group(0)), "platform": "weibo"}
+ matched_info = {"post_id": f"weibovideo{matched.group(4)}:{matched.group(5)}", "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "weibo"}
# http://xhslink.com/a/Z3VPXAReU1Y1
xhs_pattern = r"(https?://)?xhslink\.com/(\w?/?)([^,,.。?\s]+)"
@@ -350,14 +352,14 @@ async def match_social_media_link(text: str, *, flatten_first: bool = False) ->
post_id = Path(base_url).stem
queries = parse_qs(urlparse(flatten).query)
xsec_token = queries.get("xsec_token", [""])[0]
- matched_info = {"url": https_url(matched.group(0)), "xhslink": https_url(matched.group(0)), "post_id": post_id, "xsec_token": xsec_token, "platform": "xiaohongshu"}
+ matched_info = {"url": https_url(matched.group(0)), "db_key": f"www.xiaohongshu.com/explore/{post_id}", "platform": "xiaohongshu"}
# https://www.xiaohongshu.com/explore/671a3dfe00000000240161db?xsec_token=ABY-b1JKuAlIm2dX1OSdIFHD7cQFHEdThv5aMyccvmbJo=
if matched := re.search(r"(https?://)?(www\.)?xiaohongshu\.com/([^.。,,\s]+)", text):
base_url = matched.group(0).split("?")[0]
post_id = Path(base_url).stem
queries = parse_qs(urlparse(matched.group(0)).query)
xsec_token = queries.get("xsec_token", [""])[0]
- matched_info = {"post_id": post_id, "xsec_token": xsec_token, "url": f"https://www.xiaohongshu.com/explore/{post_id}?xsec_token={xsec_token}", "platform": "xiaohongshu"}
+ matched_info = {"url": f"https://www.xiaohongshu.com/explore/{post_id}?xsec_token={xsec_token}", "db_key": f"www.xiaohongshu.com/explore/{post_id}", "platform": "xiaohongshu"}
# https://www.bilibili.com/video/BV1TC411J7PK
if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/([^,,.。\s]+)", str(text)):
@@ -366,17 +368,17 @@ async def match_social_media_link(text: str, *, flatten_first: bool = False) ->
queries = parse_qs(urlparse(matched.group(0)).query)
pid = queries.get("p", ["1"])[0]
url = f"https://www.bilibili.com/video/{bvid}?p={pid}".removesuffix("?p=1")
- matched_info = {"url": url, "bvid": bvid, "pid": pid, "platform": "bilibili"}
+ matched_info = {"url": url, "db_key": bare_url(url), "bvid": bvid, "pid": pid, "platform": "bilibili"}
# https://www.youtube.com/watch?v=D6aE2E0RHTc
if matched := re.search(r"(https?://)?(:?m\.|www\.)?youtube\.com/watch([^,,.。\s]+)", str(text)):
queries = parse_qs(urlparse(matched.group(0)).query)
if vid := queries.get("v", [""])[0]:
- matched_info = {"url": f"https://www.youtube.com/watch?v={vid}", "vid": vid, "platform": "youtube"}
+ matched_info = {"url": f"https://www.youtube.com/watch?v={vid}", "db_key": f"www.youtube.com/watch?v={vid}", "vid": vid, "platform": "youtube"}
# https://youtube.com/shorts/lFKHbluAlJw
if matched := re.search(r"(https?://)?(:?m\.|www\.)?youtube\.com/shorts/([^,,.。?\s]+)", str(text)):
vid = matched.group(3)
- matched_info = {"url": f"https://www.youtube.com/watch?v={vid}", "vid": vid, "platform": "youtube"}
+ matched_info = {"url": f"https://www.youtube.com/watch?v={vid}", "db_key": f"www.youtube.com/watch?v={vid}", "vid": vid, "platform": "youtube"}
if matched_info["platform"]:
logger.success(f"Matched: {matched_info}")
src/utils.py
@@ -180,7 +180,7 @@ def https_url(url: str) -> str:
return "https://" + str(url).removeprefix("https://").removeprefix("http://").lstrip("/").rstrip("/")
-def plain_url(url: str) -> str:
+def bare_url(url: str) -> str:
return str(url).removeprefix("https://").removeprefix("http://").lstrip("/").rstrip("/")