main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4
5import re
6from datetime import timedelta
7from pathlib import Path
8
9from httpx import AsyncClient
10from loguru import logger
11
12from config import COOKIE, DOWNLOAD_DIR, PROXY, cache
13from networking import hx_req
14from utils import nowdt
15
16
17@cache.memoize(ttl=3600)
18async def cookie_cloud() -> dict:
19 """Get cookies from cookie cloud.
20
21 https://github.com/easychen/CookieCloud
22 """
23 if not all([COOKIE.CLOUD_SERVER, COOKIE.CLOUD_KEY, COOKIE.CLOUD_PASS]):
24 return {}
25 url = f"{COOKIE.CLOUD_SERVER}/get/{COOKIE.CLOUD_KEY}"
26 params = {"password": COOKIE.CLOUD_PASS}
27 headers = {"content-type": "application/json"}
28 resp = await hx_req(url, method="POST", json_data=params, headers=headers)
29 if not resp.get("hx_error"):
30 return resp
31 return {}
32
33
34async def cookie_cloud_weibo() -> str:
35 data = await cookie_cloud()
36 cookie_data = data.get("cookie_data", {})
37 cookies = []
38 for k, v in cookie_data.items():
39 if "weibo" in k:
40 cookies.extend(v)
41 cookie = ""
42 for x in cookies:
43 if x.get("name") not in ["XSRF-TOKEN", "SUB", "SUBP", "WBPSESS"]:
44 continue
45 cookie += f"{x['name']}={x['value']}; "
46 return cookie
47
48
49@cache.memoize(ttl=7200)
50async def cookie_cloud_bilibili() -> str:
51 data = await cookie_cloud()
52 cookie_data = data.get("cookie_data", {})
53 cookies = []
54 for k, v in cookie_data.items():
55 if k in [".bilibili.com", "www.bilibili.com"]:
56 cookies.extend(v)
57 cookie = ""
58 for x in cookies:
59 cookie += f"{x['name']}={x['value']}; "
60 return cookie
61
62
63@cache.memoize(ttl=7200)
64async def bilibili_cookie_dict() -> dict:
65 data = await cookie_cloud()
66 cookie_data = data.get("cookie_data", {})
67 cookies = []
68 for k, v in cookie_data.items():
69 if k in [".bilibili.com", "www.bilibili.com"]:
70 cookies.extend(v)
71 cookie = {}
72 keys = ["SESSDATA", "bili_jct", "buvid3", "buvid4", "dedeuserid", "ac_time_value"]
73 for x in cookies:
74 if x.get("name", "") in keys:
75 cookie |= {x["name"]: x["value"]}
76 return cookie
77
78
79@cache.memoize(ttl=7200)
80async def ytdlp_bilibili_cookie(save_path: str | Path = "") -> str:
81 """Get bilibili cookie from cookie cloud and save it to file."""
82 if not save_path:
83 save_path = Path(DOWNLOAD_DIR) / "cookies" / "bilibili.txt"
84 save_path.parent.mkdir(parents=True, exist_ok=True)
85 now = nowdt()
86 expiration = now + timedelta(hours=2)
87 expiration_ts = expiration.timestamp()
88 data = await cookie_cloud()
89 cookie_data = data.get("cookie_data", {})
90 cookie_string = "# Netscape HTTP Cookie File\n\n"
91 for domain, cookies in cookie_data.items():
92 if domain not in [".bilibili.com", "www.bilibili.com"]:
93 continue
94 for x in cookies:
95 expire = round(x.get("expirationDate", expiration_ts))
96 include_subdomains = str(not x.get("hostOnly", False)).upper()
97 secure = str(x.get("secure", False)).upper()
98 cookie_string += f"{x['domain']}\t{include_subdomains}\t{x['path']}\t{secure}\t{expire}\t{x['name']}\t{x['value']}\n"
99 Path(save_path).write_text(cookie_string)
100 return Path(save_path).as_posix()
101
102
103@cache.memoize(ttl=3600)
104async def get_weibo_cookies() -> str:
105 """Get Weibo visitor cookies.
106
107 https://github.com/Johnserf-Seed/f2/blob/32f00fe544920ed948e813c804759715352c428a/f2/apps/weibo/utils.py#L99
108 """
109 url = "https://passport.weibo.com/visitor/genvisitor2"
110 payload = {"cb": "visitor_gray_callback", "tid": "", "from": "weibo"}
111 headers = {"Content-Type": "application/x-www-form-urlencoded"}
112 try:
113 async with AsyncClient(http2=True, proxy=PROXY.WEIBO) as client:
114 response = await client.post(url, headers=headers, data=payload, follow_redirects=True, timeout=10)
115 response.raise_for_status()
116 set_cookie = response.headers.get("set-cookie", "")
117 if not isinstance(set_cookie, str):
118 logger.warning(f"set_cookie is not a string: {set_cookie}")
119 return ""
120 return ";".join(cookie.split(";")[0] for cookie in re.split(", (?=[a-zA-Z])", set_cookie))
121 except Exception as e:
122 logger.warning(f"Weibo Visitor cookie failed: {e}")
123
124 # fallback to cookie cloud
125 return await cookie_cloud_weibo()
126
127
128async def debug():
129 uid = 7357828611
130 url = f"https://m.weibo.cn/api/container/getIndex?type=uid&value={uid}"
131 cookie = await get_weibo_cookies()
132 headers = {
133 "Referer": f"https://m.weibo.cn/u/{uid}",
134 "MWeibo-Pwa": "1",
135 "X-Requested-With": "XMLHttpRequest",
136 "Cookie": cookie,
137 "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1",
138 }
139 containerid = 1076037357828611
140 bid = "OEJewzx4x"
141 url = f"https://m.weibo.cn/api/container/getIndex?type=uid&value={uid}&containerid={containerid}"
142 url = f"https://m.weibo.cn/statuses/show?id={bid}"
143 res = await hx_req(url, headers=headers)
144 print(res)
145
146
147async def bilibili_cmt(aid: str | int = "114097853038828"):
148 aid = "114097853038828"
149 link = "https://www.bilibili.com/video/BV1M19iY4Ec2"
150 cookie = await cookie_cloud_bilibili()
151 if not cookie:
152 return []
153 url = f"https://api.bilibili.com/x/v2/reply?type=1&oid={aid}&sort=1"
154 response = await hx_req(url, headers={"referer": link, "cookie": cookie}, check_kv={"code": 0}, max_retry=0, timeout=3)
155 data = response.get("data", {}).get("replies", [])
156 print(data)
157 return data
158
159
160if __name__ == "__main__":
161 import asyncio
162
163 # print(asyncio.run(get_weibo_cookies()))
164 # print(asyncio.run(cookie_cloud_bilibili()))
165 # print(asyncio.run(bilibili_cmt()))
166 # print(asyncio.run(cookie_cloud_weibo()))
167 print(asyncio.run(ytdlp_bilibili_cookie()))
168 # print(asyncio.run(bilibili_cookie_dict()))
169 # asyncio.run(debug())