main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3
  4
  5import re
  6from datetime import timedelta
  7from pathlib import Path
  8
  9from httpx import AsyncClient
 10from loguru import logger
 11
 12from config import COOKIE, DOWNLOAD_DIR, PROXY, cache
 13from networking import hx_req
 14from utils import nowdt
 15
 16
 17@cache.memoize(ttl=3600)
 18async def cookie_cloud() -> dict:
 19    """Get cookies from cookie cloud.
 20
 21    https://github.com/easychen/CookieCloud
 22    """
 23    if not all([COOKIE.CLOUD_SERVER, COOKIE.CLOUD_KEY, COOKIE.CLOUD_PASS]):
 24        return {}
 25    url = f"{COOKIE.CLOUD_SERVER}/get/{COOKIE.CLOUD_KEY}"
 26    params = {"password": COOKIE.CLOUD_PASS}
 27    headers = {"content-type": "application/json"}
 28    resp = await hx_req(url, method="POST", json_data=params, headers=headers)
 29    if not resp.get("hx_error"):
 30        return resp
 31    return {}
 32
 33
 34async def cookie_cloud_weibo() -> str:
 35    data = await cookie_cloud()
 36    cookie_data = data.get("cookie_data", {})
 37    cookies = []
 38    for k, v in cookie_data.items():
 39        if "weibo" in k:
 40            cookies.extend(v)
 41    cookie = ""
 42    for x in cookies:
 43        if x.get("name") not in ["XSRF-TOKEN", "SUB", "SUBP", "WBPSESS"]:
 44            continue
 45        cookie += f"{x['name']}={x['value']}; "
 46    return cookie
 47
 48
 49@cache.memoize(ttl=7200)
 50async def cookie_cloud_bilibili() -> str:
 51    data = await cookie_cloud()
 52    cookie_data = data.get("cookie_data", {})
 53    cookies = []
 54    for k, v in cookie_data.items():
 55        if k in [".bilibili.com", "www.bilibili.com"]:
 56            cookies.extend(v)
 57    cookie = ""
 58    for x in cookies:
 59        cookie += f"{x['name']}={x['value']}; "
 60    return cookie
 61
 62
 63@cache.memoize(ttl=7200)
 64async def bilibili_cookie_dict() -> dict:
 65    data = await cookie_cloud()
 66    cookie_data = data.get("cookie_data", {})
 67    cookies = []
 68    for k, v in cookie_data.items():
 69        if k in [".bilibili.com", "www.bilibili.com"]:
 70            cookies.extend(v)
 71    cookie = {}
 72    keys = ["SESSDATA", "bili_jct", "buvid3", "buvid4", "dedeuserid", "ac_time_value"]
 73    for x in cookies:
 74        if x.get("name", "") in keys:
 75            cookie |= {x["name"]: x["value"]}
 76    return cookie
 77
 78
 79@cache.memoize(ttl=7200)
 80async def ytdlp_bilibili_cookie(save_path: str | Path = "") -> str:
 81    """Get bilibili cookie from cookie cloud and save it to file."""
 82    if not save_path:
 83        save_path = Path(DOWNLOAD_DIR) / "cookies" / "bilibili.txt"
 84        save_path.parent.mkdir(parents=True, exist_ok=True)
 85    now = nowdt()
 86    expiration = now + timedelta(hours=2)
 87    expiration_ts = expiration.timestamp()
 88    data = await cookie_cloud()
 89    cookie_data = data.get("cookie_data", {})
 90    cookie_string = "# Netscape HTTP Cookie File\n\n"
 91    for domain, cookies in cookie_data.items():
 92        if domain not in [".bilibili.com", "www.bilibili.com"]:
 93            continue
 94        for x in cookies:
 95            expire = round(x.get("expirationDate", expiration_ts))
 96            include_subdomains = str(not x.get("hostOnly", False)).upper()
 97            secure = str(x.get("secure", False)).upper()
 98            cookie_string += f"{x['domain']}\t{include_subdomains}\t{x['path']}\t{secure}\t{expire}\t{x['name']}\t{x['value']}\n"
 99    Path(save_path).write_text(cookie_string)
100    return Path(save_path).as_posix()
101
102
103@cache.memoize(ttl=3600)
104async def get_weibo_cookies() -> str:
105    """Get Weibo visitor cookies.
106
107    https://github.com/Johnserf-Seed/f2/blob/32f00fe544920ed948e813c804759715352c428a/f2/apps/weibo/utils.py#L99
108    """
109    url = "https://passport.weibo.com/visitor/genvisitor2"
110    payload = {"cb": "visitor_gray_callback", "tid": "", "from": "weibo"}
111    headers = {"Content-Type": "application/x-www-form-urlencoded"}
112    try:
113        async with AsyncClient(http2=True, proxy=PROXY.WEIBO) as client:
114            response = await client.post(url, headers=headers, data=payload, follow_redirects=True, timeout=10)
115            response.raise_for_status()
116            set_cookie = response.headers.get("set-cookie", "")
117            if not isinstance(set_cookie, str):
118                logger.warning(f"set_cookie is not a string: {set_cookie}")
119                return ""
120            return ";".join(cookie.split(";")[0] for cookie in re.split(", (?=[a-zA-Z])", set_cookie))
121    except Exception as e:
122        logger.warning(f"Weibo Visitor cookie failed: {e}")
123
124    # fallback to cookie cloud
125    return await cookie_cloud_weibo()
126
127
128async def debug():
129    uid = 7357828611
130    url = f"https://m.weibo.cn/api/container/getIndex?type=uid&value={uid}"
131    cookie = await get_weibo_cookies()
132    headers = {
133        "Referer": f"https://m.weibo.cn/u/{uid}",
134        "MWeibo-Pwa": "1",
135        "X-Requested-With": "XMLHttpRequest",
136        "Cookie": cookie,
137        "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1",
138    }
139    containerid = 1076037357828611
140    bid = "OEJewzx4x"
141    url = f"https://m.weibo.cn/api/container/getIndex?type=uid&value={uid}&containerid={containerid}"
142    url = f"https://m.weibo.cn/statuses/show?id={bid}"
143    res = await hx_req(url, headers=headers)
144    print(res)
145
146
147async def bilibili_cmt(aid: str | int = "114097853038828"):
148    aid = "114097853038828"
149    link = "https://www.bilibili.com/video/BV1M19iY4Ec2"
150    cookie = await cookie_cloud_bilibili()
151    if not cookie:
152        return []
153    url = f"https://api.bilibili.com/x/v2/reply?type=1&oid={aid}&sort=1"
154    response = await hx_req(url, headers={"referer": link, "cookie": cookie}, check_kv={"code": 0}, max_retry=0, timeout=3)
155    data = response.get("data", {}).get("replies", [])
156    print(data)
157    return data
158
159
160if __name__ == "__main__":
161    import asyncio
162
163    # print(asyncio.run(get_weibo_cookies()))
164    # print(asyncio.run(cookie_cloud_bilibili()))
165    # print(asyncio.run(bilibili_cmt()))
166    # print(asyncio.run(cookie_cloud_weibo()))
167    print(asyncio.run(ytdlp_bilibili_cookie()))
168    # print(asyncio.run(bilibili_cookie_dict()))
169    # asyncio.run(debug())