Commit 590c6bc

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-08-28 07:01:52
fix(xhs): handle xiaohongshu share links from app
1 parent 6a17b43
Changed files (2)
src/preview/xiaohongshu.py
@@ -28,6 +28,7 @@ async def preview_xhs(
     db_key: str = "",
     xsec: str = "",
     *,
+    is_xhs_link: bool = False,
     xhs_provider: str = PROVIDER.XHS,
     **kwargs,
 ):
@@ -39,6 +40,7 @@ async def preview_xhs(
         url (str, optional): xiaohongshu link
         db_key (str, optional): The cache key.
         xsec (str, optional): The xsec token.
+        is_xhs_link (bool, optional): Whether the link is a share link from APP.
         xhs_provider (str, optional): The xiaohongshu provider.
     """
     if kwargs.get("show_progress") and "progress" not in kwargs:
@@ -50,11 +52,10 @@ async def preview_xhs(
             return
         await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
 
-    if "xhslink.com" not in url and "xsec_token" not in url:
+    if not is_xhs_link and "xsec_token" not in url:
         msg = "链接格式错误: 缺少 xsec_token 参数, 请发送完整链接"
         msg += "\n或者使用手机APP分享的链接 (xhslink.com域名)"
-        await send2tg(client, message, texts=msg, **kwargs)
-        await modify_progress(del_status=True, **kwargs)
+        await modify_progress(text=msg, **kwargs)
         return
 
     logger.info(f"Xiaohongshu link preview for {url}")
@@ -63,7 +64,7 @@ async def preview_xhs(
     if not note:
         if "bridge" in xhs_provider:
             await modify_progress(text="❌小红书解析失败, 尝试第三方Bot...", **kwargs)
-            full_url = f"https://{db_key}?xsec_token={xsec}"
+            full_url = f"https://{db_key}?xsec_token={xsec}" if xsec else url
             kwargs |= {"target_mid": message.id}
             await send_to_social_media_bridge(client, message, full_url, **kwargs)
         else:
src/networking.py
@@ -322,19 +322,26 @@ async def match_social_media_link(text: str, *, flatten_first: bool = True) -> d
     # http://xhslink.com/a/Z3VPXAReU1Y1
     xhs_pattern = r"(https?://)?xhslink\.com/(\w?/?)([^,,.。?\s]+)"
     if matched := re.search(xhs_pattern, text):
-        flatten = await flatten_rediercts(https_url(matched.group(0)), pattern=xhs_pattern, proxy=PROXY.XHS)
+        transport = AsyncCurlTransport(proxy=PROXY.XHS, impersonate="safari_ios", default_headers=True, curl_options={CurlOpt.FRESH_CONNECT: True})
+        flatten = await flatten_rediercts(https_url(matched.group(0)), transport=transport, pattern=xhs_pattern, proxy=PROXY.XHS, method="GET")
         base_url = flatten.split("?")[0]
         post_id = Path(base_url).stem
         queries = parse_qs(urlparse(flatten).query)
         xsec = queries.get("xsec_token", [""])[0]
-        return {"url": https_url(matched.group(0)), "db_key": f"www.xiaohongshu.com/explore/{post_id}", "xsec": xsec, "platform": "xiaohongshu"}
+        return {"url": https_url(matched.group(0)), "db_key": f"www.xiaohongshu.com/explore/{post_id}", "xsec": xsec, "is_xhs_link": True, "platform": "xiaohongshu"}
     # https://www.xiaohongshu.com/explore/671a3dfe00000000240161db?xsec_token=ABY-b1JKuAlIm2dX1OSdIFHD7cQFHEdThv5aMyccvmbJo=
     if matched := re.search(r"(https?://)?(www\.)?xiaohongshu\.com/([^。,,\s]+)", text):
         base_url = matched.group(0).split("?")[0]
         post_id = Path(base_url).stem
         queries = parse_qs(urlparse(matched.group(0)).query)
         xsec = queries.get("xsec_token", [""])[0]
-        return {"url": f"https://www.xiaohongshu.com/explore/{post_id}?xsec_token={xsec}", "db_key": f"www.xiaohongshu.com/explore/{post_id}", "xsec": xsec, "platform": "xiaohongshu"}
+        return {
+            "url": f"https://www.xiaohongshu.com/explore/{post_id}?xsec_token={xsec}",
+            "db_key": f"www.xiaohongshu.com/explore/{post_id}",
+            "is_xhs_link": False,
+            "xsec": xsec,
+            "platform": "xiaohongshu",
+        }
 
     # https://www.bilibili.com/video/BV1TC411J7PK
     if matched := re.search(r"(https?://)?(:?m\.|www\.)?bilibili\.com/video/(\w+)", text):
@@ -422,10 +429,16 @@ async def match_social_media_link(text: str, *, flatten_first: bool = True) -> d
 
 
 @cache.memoize(ttl=60)
-async def flatten_rediercts(texts: str | None = None, pattern: str | None = None, headers: dict | None = None, proxy: str | None = None, method: str = "HEAD") -> str:
+async def flatten_rediercts(
+    texts: str | None = None,
+    pattern: str | None = None,
+    headers: dict | None = None,
+    proxy: str | None = None,
+    method: str = "HEAD",
+    transport: AsyncCurlTransport | AsyncHTTPTransport | None = None,
+) -> str:
     if not texts:
         return ""
-
     url = ""
     # v.douyin.com
     if matched := re.search(r"(https?://)?v\.douyin\.com/([^.。,,?&/\s]+)", texts):
@@ -467,20 +480,19 @@ async def flatten_rediercts(texts: str | None = None, pattern: str | None = None
     # custom pattern
     if pattern and (matched := re.search(pattern, texts)):
         url = matched.group(0)
-
     if not url:
         return texts
     # parse redirect
     rediercted_url = https_url(url)
     with contextlib.suppress(Exception):
         if method == "HEAD":
-            async with AsyncClient(http2=True, proxy=proxy, follow_redirects=True, event_hooks={"request": [log_req], "response": [log_resp]}) as hx:
+            async with AsyncClient(http2=True, proxy=proxy, follow_redirects=True, transport=transport, event_hooks={"request": [log_req], "response": [log_resp]}) as hx:
                 resp = await hx.head(https_url(url), headers=headers, timeout=3)
                 rediercted_url = str(resp.url)
         elif method == "GET":
             status_code = 302
             while str(status_code).startswith("3"):
-                async with AsyncClient(http2=True, proxy=proxy, follow_redirects=False, event_hooks={"request": [log_req], "response": [log_resp]}) as hx:
+                async with AsyncClient(http2=True, proxy=proxy, follow_redirects=False, transport=transport, event_hooks={"request": [log_req], "response": [log_resp]}) as hx:
                     resp = await hx.get(rediercted_url, headers=headers, timeout=3)
                     status_code = resp.status_code
                     rediercted_url = resp.headers.get("Location", rediercted_url)
@@ -507,6 +519,7 @@ if __name__ == "__main__":
     # asyncio.run(flatten_rediercts("https://v.douyin.com/CeiJfJMQG/"))
     # asyncio.run(flatten_rediercts("https://www.tiktok.com/t/ZT2mcMA7f/"))
     # asyncio.run(flatten_rediercts("https://t.co/Wwo3x69CQz"))
+    print(asyncio.run(match_social_media_link("https://xhslink.com/n/6xKqXFjpjO1")))
     print(asyncio.run(match_social_media_link("https://www.youtube.com/watch?v=D6aE2E0RHTc")))
     print(asyncio.run(match_social_media_link("https://youtube.com/shorts/lFKHbluAlJw")))
     print(asyncio.run(match_social_media_link("https://youtu.be/vOiP3kfFlrE?si=zPd-Bt1GO03jxpI_")))