Commit ec1a431
Changed files (17)
src/others/subtitle.py
@@ -119,13 +119,13 @@ async def fetch_subtitle_tikhub(video_id: str) -> dict:
logger.info(f"Fetch Subtitle for {video_id=}")
api_url = f"{API.TIKHUB}/api/v1/youtube/web/get_video_subtitles?video_id={video_id}"
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data"], check_kv={"code": 200})
- if resp.status_code != 200:
- logger.warning(f"Subtitle API failed: {resp}")
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0)
+ if resp.get("hx_error"):
+ logger.warning(f"Subtitle API failed: {resp['hx_error']}")
return {}
- if subtitles := resp.json()["data"].get("subtitles", []):
+ if subtitles := resp["data"].get("subtitles", []):
return to_webvtt(subtitles)
- if error := resp.json()["data"].get("detail", []):
+ if error := resp["data"].get("detail", []):
return {"error": error}
return {}
src/preview/douyin.py
@@ -56,23 +56,24 @@ async def preview_douyin(
logger.info(f"{platform} link preview for {url}")
succ = False
+ data = {}
if douyin_provider == "free": # try free first
api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}"
headers = {"accept": "application/json"} if douyin_provider == "tikhub" else {}
try:
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data"], check_kv={"code": 200})
- data = resp.json()["data"]
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
+ data = resp["data"]
succ = True
except Exception:
- logger.warning(f"{platform} API [free] failed: {resp}")
+ logger.warning(f"{platform} API [free] failed")
if not succ: # try tikhub
api_url = f"{API.TIKHUB}/api/v1/hybrid/video_data?url={url}"
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
try:
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data"], check_kv={"code": 200})
- data = resp.json()["data"]
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200})
+ data = resp["data"]
except Exception:
- logger.warning(f"{platform} API [tikhub] failed: {resp}")
+ logger.warning(f"{platform} API [tikhub] failed")
if fallback:
await modify_progress(text="❌抖音解析失败, 尝试第三方Bot...", **kwargs)
await send_to_social_media_bridge(client, message, url, platform, **kwargs)
@@ -134,23 +135,24 @@ async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comm
"tiktok_free": f"{API.TIKHUB_FREE}/api/tiktok/web/fetch_post_comment?aweme_id={aweme_id}",
}
succ = False
+ data = []
if douyin_comments_provider == "free": # try free first
api_url = api_urls.get(f"{platform}_free")
headers = {"accept": "application/json"}
try:
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data"], check_kv={"code": 200})
- data = resp.json()["data"].get("comments", [])
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200}, max_retry=0, timeout=3)
+ data = resp["data"].get("comments", [])
succ = True
except Exception:
- logger.warning(f"{platform} comments API [free] failed: {resp}")
+ logger.warning(f"{platform} comments API [free] failed")
if not succ: # try tikhub
api_url = api_urls.get(f"{platform}_tikhub")
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
try:
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data"], check_kv={"code": 200})
- data = resp.json()["data"].get("comments", [])
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200})
+ data = resp["data"].get("comments", [])
except Exception:
- logger.warning(f"{platform} comments API [tikhub] failed: {resp}")
+ logger.warning(f"{platform} comments API [tikhub] failed")
return []
try:
for node in data:
src/preview/instagram.py
@@ -55,13 +55,13 @@ async def preview_instagram(
api_url = API.TIKHUB_INSTAGRAM + url
logger.info(f"Preview Instagram TikHub for {api_url}")
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data"], check_kv={"code": 200})
- if resp.status_code != 200:
- await modify_progress(text="❌Instagram解析失败, 使用DDInstagram预览", **kwargs)
+ resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200})
+ if resp.get("hx_error"):
+ await modify_progress(text=f"❌Instagram解析失败, 使用DDInstagram预览\n{resp['hx_error']}", **kwargs)
await preview_ddinstagram(client, message, fallback=fallback, **kwargs)
return
- data = resp.json()["data"]
+ data = resp["data"]
# parse media
media = []
@@ -127,13 +127,13 @@ async def preview_ddinstagram(client: Client, message: Message, url: str, post_t
api_url = f"{API.DDINSTAGRAM}/{post_type}/{post_id}"
logger.info(f"Instagram link preview for {api_url}")
headers = {"user-agent": UA.TELEGRAM}
- resp = await hx_req(api_url, headers=headers)
- if not resp.text:
+ resp = await hx_req(api_url, headers=headers, rformat="text")
+ if not resp.get("text"):
if fallback:
await send_to_social_media_bridge(client, message, url, **kwargs)
return
- soup = BeautifulSoup(resp.text, "html.parser")
+ soup = BeautifulSoup(resp["text"], "html.parser")
logger.trace(soup.prettify())
texts = ""
src/preview/twitter.py
@@ -60,6 +60,9 @@ async def preview_twitter(
return
await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
succ = False
+ master_info = {}
+ this_info = {}
+ quote_info = {}
if twitter_provider == "tikhub": # try tikhub first
try:
this_info = await get_tweet_info_via_tikhub(url=url, **kwargs)
@@ -68,7 +71,9 @@ async def preview_twitter(
await modify_progress(text=error, **kwargs)
raise APIError(error) # noqa: TRY301
quote_info = await get_tweet_info_via_tikhub(quote_info=this_info["quote_info"], **kwargs) if this_info["has_quote"] else {}
- master_info = await get_tweet_info_via_tikhub(post_id=this_info["master_thread_id"], **kwargs) if this_info["has_master"] else {}
+ params = copy.deepcopy(kwargs)
+ params.pop("post_id", None)
+ master_info = await get_tweet_info_via_tikhub(post_id=this_info["master_thread_id"], **params) if this_info["has_master"] else {}
succ = True
except Exception as e:
logger.warning(f"Twitter API [tikhub] failed: {e}")
@@ -231,11 +236,11 @@ async def get_tweet_info_via_tikhub(url: str = "", post_id: str = "", quote_info
await modify_progress(text="✅正在解析引用推文...", **kwargs)
else:
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data.author.screen_name"], check_kv={"data": {"id": post_id}})
- if resp.status_code != 200:
+ resp = await hx_req(api_url, headers=headers, check_keys=["data.author.screen_name"], check_kv={"data.id": post_id})
+ if resp.get("hx_error"):
logger.error("Failed to get tweet info via TikHub")
return {}
- data: dict = resp.json()["data"]
+ data: dict = resp["data"]
await modify_progress(text=f"✅推文{post_id}解析成功, 正在处理...", **kwargs)
data = remove_none_values(data)
@@ -305,10 +310,10 @@ async def get_tweet_info_via_fxtwitter(url: str = "", handle: str = "", post_id:
logger.info(f"Twitter preview via fxtwitter: {api_url}")
headers = {"user-agent": UA.TELEGRAM}
resp = await hx_req(api_url, headers=headers)
- if str(resp.json().get("tweet", {}).get("id")) != str(post_id):
+ if resp.get("hx_error") or str(resp.get("tweet", {}).get("id")) != str(post_id):
logger.error("Failed to get tweet info via fxtwitter")
return {}
- data: dict = resp.json()["tweet"]
+ data: dict = resp["tweet"]
info = {"handle": data.get("author", {}).get("screen_name", handle), "post_id": data.get("id", post_id)}
media = data.get("media", {}).get("all", [])
src/preview/weibo.py
@@ -133,10 +133,13 @@ async def parse_weibo_info(post_id: str, data: dict | None = None, **kwargs) ->
logger.info(f"Weibo link preview for {weibo_url}")
headers = {"referer": "https://m.weibo.cn"}
try:
- resp = await hx_req(weibo_url, headers=headers)
- if not (matched := re.search(r"var \$render_data = (\[.*?\])\[0\]", str(resp.text), re.DOTALL)):
+ resp = await hx_req(weibo_url, headers=headers, rformat="text")
+ if not resp.get("text"):
+ info["error_msg"] = f"Weibo webpage not found: {weibo_url}"
+ return info
+ if not (matched := re.search(r"var \$render_data = (\[.*?\])\[0\]", str(resp["text"]), re.DOTALL)):
info["error_msg"] = "Weibo API empty response"
- if "微博不存在" in str(resp.text) or "暂无查看权限" in str(resp.text):
+ if "微博不存在" in str(resp["text"]) or "暂无查看权限" in str(resp["text"]):
info["error_msg"] = "微博不存在或暂无查看权限!"
info["fallback"] = False
logger.error(info["error_msg"])
@@ -208,8 +211,11 @@ async def parse_weibo_video(post_id: str, **kwargs) -> dict:
url = f"https://video.weibo.com/show?fid={post_id.removeprefix('weibovideo')}"
api_url = f"{API.TIKHUB_WEIBO_VIDEO}{quote_plus(url)}"
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
- resp = await hx_req(api_url, headers=headers, check_kv={"code": 200}, check_has_kv=["data.data.Component_Play_Playinfo"])
- data = resp.json()["data"]["data"]["Component_Play_Playinfo"]
+ resp = await hx_req(api_url, headers=headers, check_kv={"data.msg": "succ"}, check_keys=["data.data.Component_Play_Playinfo"])
+ if resp.get("hx_error"):
+ info["error_msg"] = resp["hx_error"]
+ return info
+ data = resp["data"]["data"]["Component_Play_Playinfo"]
urls = [https_url(x) for x in data.get("urls", {}).values()]
info["media"] = [{"video": await download_first_success_urls(urls, suffix=".mp4", **kwargs)}]
info["dt"] = ""
@@ -268,13 +274,13 @@ async def parse_weibo_comments(post_id: str) -> list[str]:
"max_id": 0,
}
api = "https://weibo.com/ajax/statuses/buildComments"
- resp = await hx_req(api, headers=headers, params=params, check_kv={"ok": 1})
- if resp.status_code != 200:
+ resp = await hx_req(api, headers=headers, params=params, check_kv={"ok": 1}, max_retry=1)
+ if resp.get("hx_error"):
logger.error(f"Weibo Comments API failed: {resp}")
return []
comments = ["\n**> 💬**点此展开评论区**:"]
- for info in resp.json().get("data", []):
+ for info in resp.get("data", []):
if not info.get("text"):
continue
cmt = ""
src/preview/xiaohongshu.py
@@ -127,23 +127,25 @@ async def get_xhs_info(url: str, ua: str = UA.CHROME, retry: int = 0) -> dict:
return UA.IPHONE if ua == UA.CHROME else UA.CHROME
headers = {"user-agent": ua, "referer": "https://www.xiaohongshu.com/"}
- if retry > 4:
+ if retry > 3:
+ logger.error(f"XHS parsing response failed after 3 retries: {url}")
return {}
data = {}
try:
- resp = await hx_req(url, headers=headers, cookies=None, proxy=PROXY.XHS)
- soup = BeautifulSoup(resp.text, "html.parser")
+ resp = await hx_req(url, headers=headers, cookies=None, proxy=PROXY.XHS, rformat="text")
+ if not resp.get("text"):
+ logger.warning(f"XHS webpage not found: {url}, Retrying: {retry + 1}")
+ return await get_xhs_info(url, ua=switch_ua(ua), retry=retry + 1)
+ soup = BeautifulSoup(resp["text"], "html.parser")
data["soup"] = soup
script_info = next((str(x.text).removeprefix("window.__INITIAL_STATE__=") for x in soup.find_all("script") if str(x.text).startswith("window.__INITIAL_STATE__=")), "{}")
info = yaml.safe_load(script_info)
if not info:
- retry += 1
- logger.warning(f"XHS empty response, maybe need to adjust the proxy. Retrying: {retry}")
- return await get_xhs_info(url, ua=switch_ua(ua), retry=retry)
+ logger.warning(f"XHS failed: {url}, Retrying: {retry + 1}")
+ return await get_xhs_info(url, ua=switch_ua(ua), retry=retry + 1)
except Exception as e:
- logger.error(f"XHS parsing response failed: {e}, Retrying: {retry}")
- retry += 1
- return await get_xhs_info(url, ua=switch_ua(ua), retry=retry)
+ logger.warning(f"XHS failed: {e}, Retrying: {retry + 1}")
+ return await get_xhs_info(url, ua=switch_ua(ua), retry=retry + 1)
# XHS has two different return formats
if notes := list(info.get("note", {}).get("noteDetailMap", {}).values()):
@@ -153,9 +155,8 @@ async def get_xhs_info(url: str, ua: str = UA.CHROME, retry: int = 0) -> dict:
if note := info.get("noteData", {}).get("data", {}).get("noteData", {}):
data["note"] = note
return data
- retry += 1
- logger.error(f"Parsed info has no post, Retrying: {retry}")
- return await get_xhs_info(url, ua=switch_ua(ua), retry=retry)
+ logger.error(f"Parsed info has no post, Retrying: {retry + 1}")
+ return await get_xhs_info(url, ua=switch_ua(ua), retry=retry + 1)
def get_xhs_comments(soup: BeautifulSoup | None) -> list[str]:
src/preview/ytdlp.py
@@ -457,23 +457,24 @@ async def get_bilibili_comments(bvid: str | None, provider: str = PROVIDER.BILIB
return []
succ = False
+ data = []
if provider == "free":
try:
api = f"{API.TIKHUB_FREE}/api/bilibili/web/fetch_video_comments?bv_id={bvid}"
headers = {"accept": "application/json"}
- resp = await hx_req(api, headers={"accept": "application/json"}, check_has_kv=["data.data"], check_kv={"code": 200})
- data = resp.json()["data"]["data"].get("replies", [])
+ resp = await hx_req(api, headers={"accept": "application/json"}, check_keys=["data.data"], check_kv={"code": 200}, max_retry=0)
+ data = resp["data"]["data"].get("replies", [])
succ = True
except Exception:
- logger.warning(f"Bilibili comments API [free] failed: {resp}")
+ logger.warning("Bilibili comments API [free] failed")
if not succ: # try tikhub
api_url = f"{API.TIKHUB}/api/v1/bilibili/web/fetch_video_comments?bv_id={bvid}"
headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
try:
- resp = await hx_req(api_url, headers=headers, check_has_kv=["data.data"], check_kv={"code": 200})
- data = resp.json()["data"]["data"].get("replies", [])
+ resp = await hx_req(api_url, headers=headers, check_keys=["data.data"], check_kv={"code": 200})
+ data = resp["data"]["data"].get("replies", [])
except Exception:
- logger.warning(f"Bilibili comments API [tikhub] failed: {resp}")
+ logger.warning("Bilibili comments API [tikhub] failed")
return []
comments = []
@@ -508,11 +509,11 @@ async def get_youtube_comments(vid: str | None, provider: str = PROVIDER.YOUTUBE
params = {"key": TOKEN.YOUTUBE_API_KEY, "maxResults": 100, "textFormat": "plainText", "part": "snippet", "videoId": vid}
comments = []
try:
- resp = await hx_req(api, proxy=get_ytdlp_proxy(platform="youtube"), params=params, check_has_kv=["items"])
- if resp.status_code != 200:
- logger.warning(f"YouTube Comments API failed: {resp}")
+ resp = await hx_req(api, proxy=get_ytdlp_proxy(platform="youtube"), params=params, check_keys=["items"], check_kv={"code": 200})
+ if resp.get("hx_error"):
+ logger.warning(f"YouTube Comments API failed: {resp['hx_error']}")
return []
- data = resp.json().get("items", [])
+ data = resp["items"]
for idx, x in enumerate(data):
name = x.get("snippet", {}).get("topLevelComment", {}).get("snippet", {}).get("authorDisplayName", "匿名")
name = name.removeprefix("@")
src/price/binance.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
-import contextlib
+from loguru import logger
from config import API, PROXY, cache
from networking import hx_req
@@ -9,22 +8,24 @@ from price.chart import generate_chart
from utils import number, ts_to_dt
-@cache.memoize(ttl=7200)
+@cache.memoize(ttl=3600)
async def get_binance_symbols() -> dict[str, str]:
"""Get all symbols from Binance."""
+ logger.info("Fetching Binance symbols...")
res = {}
# um
- with contextlib.suppress(Exception): # region restriction
- url = f"{API.BINANCE_UM}/fapi/v1/exchangeInfo"
- response = await hx_req(url, proxy=PROXY.CRYPTO, check_has_kv=["symbols"])
- data = response.json()["symbols"]
+ url = f"{API.BINANCE_UM}/fapi/v1/exchangeInfo" # region restriction
+ response = await hx_req(url, proxy=PROXY.CRYPTO, check_keys=["symbols"], max_retry=0, timeout=5, silent=True)
+ if not response.get("hx_error"):
+ data = response["symbols"]
res = {coin["symbol"]: "UM" for coin in data if coin["status"] == "TRADING"}
# spot
url = f"{API.BINANCE_SPOT}/api/v3/exchangeInfo"
params = {"showPermissionSets": False, "symbolStatus": "TRADING"}
- response = await hx_req(url, params=params, proxy=PROXY.CRYPTO, check_has_kv=["symbols"])
- data = response.json()["symbols"]
- res |= {coin["symbol"]: "SPOT" for coin in data}
+ response = await hx_req(url, params=params, proxy=PROXY.CRYPTO, check_keys=["symbols"], silent=True)
+ if not response.get("hx_error"):
+ data = response["symbols"]
+ res |= {coin["symbol"]: "SPOT" for coin in data}
return res
@@ -36,9 +37,8 @@ async def get_binance_price(coin: str, interval: str | None = None) -> dict:
# Binance interval unit: m, h, d, w, M
if interval.endswith(("H", "D", "W")):
interval = interval.lower()
- if interval not in ["1m", "3m", "5m", "15m", "30m", "1h", "2h", "6h", "8h", "12h", "1d", "3d", "1w", "1M"]:
+ if interval not in ["1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "3d", "1w", "1M"]:
interval = "30m"
-
symbols = await get_binance_symbols()
symbol = coin.upper()
stablecoins = ["USDT", "USDC", "FDUSD", "TUSD"]
@@ -50,14 +50,13 @@ async def get_binance_price(coin: str, interval: str | None = None) -> dict:
market = symbols[symbol]
if market == "SPOT":
url = f"{API.BINANCE_SPOT}/api/v3/klines?symbol={symbol}&interval={interval}&limit=49"
- response = await hx_req(url, proxy=PROXY.CRYPTO)
+ klines = await hx_req(url, proxy=PROXY.CRYPTO, silent=True)
elif market == "UM":
url = f"{API.BINANCE_UM}/fapi/v1/klines?symbol={symbol}&interval={interval}&limit=49"
- response = await hx_req(url, proxy=PROXY.CRYPTO)
+ klines = await hx_req(url, proxy=PROXY.CRYPTO, silent=True)
else:
return {}
- klines = response.json()
- if not klines:
+ if isinstance(klines, dict) and klines.get("hx_error"):
return {"texts": f"Binance price failed: {coin}"}
klines = sorted(klines, key=lambda x: x[0])
high_price = max(float(number(x[2])) for x in klines)
@@ -78,5 +77,5 @@ async def get_binance_price(coin: str, interval: str | None = None) -> dict:
text += f"收盘价: {close_price} \n"
text += f"涨跌幅: {change_pct:+.2%}\n"
text += f"振幅: {amplitude:.2%}"
- chart = await generate_chart(klines, interval, title, subtitle)
+ chart = await generate_chart(klines, interval, title, subtitle) # type:ignore
return {"texts": text, "media": [{"photo": chart}]}
src/price/chart.py
@@ -1,6 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
from pathlib import Path
from quickchart import QuickChart
src/price/coinmarketcap.py
@@ -12,7 +12,7 @@ from utils import number
HEADERS = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
-@cache.memoize(ttl=7200)
+@cache.memoize(ttl=28800) # 8 hours
async def get_cmc_coins() -> dict:
"""Get all coins from CoinMarketCap.
@@ -23,10 +23,13 @@ async def get_cmc_coins() -> dict:
if not TOKEN.CMC_API_KEY:
logger.warning("CoinMarketCap API key is not set.")
return {}
+ logger.info("Fetching CoinMarketCap coins...")
url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/map"
params = {"limit": "5000", "sort": "cmc_rank", "aux": "status"}
- response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
- data = response.json()["data"]
+ response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0}, silent=True)
+ if response.get("hx_error"):
+ return {}
+ data = response["data"]
return {coin["symbol"]: coin["slug"] for coin in data}
@@ -37,8 +40,10 @@ async def get_cmc_fiat() -> list:
logger.warning("CoinMarketCap API key is not set.")
return []
url = "https://pro-api.coinmarketcap.com/v1/fiat/map"
- response = await hx_req(url, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
- data = response.json()["data"]
+ response = await hx_req(url, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0}, silent=True)
+ if response.get("hx_error"):
+ return []
+ data = response["data"]
return [coin["symbol"] for coin in data]
@@ -62,8 +67,8 @@ async def get_cmc_price(coin: str, fiat: str = "USD") -> str:
slug_index = list(cmc_coins.values()).index(coin.lower())
params = {"symbol": coin.upper(), "convert": fiat} if symbol_index <= slug_index else {"slug": coin.lower(), "convert": fiat}
url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest"
- response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
- data = next(iter(response.json()["data"].values()), {})
+ response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0})
+ data = next(iter(response["data"].values()), {})
if not data:
return f"CoinMarketCap price failed: {coin}"
stats = data["quote"][fiat]
@@ -95,10 +100,10 @@ async def cmc_convert_price(amount: float | str, base: str, quote: str) -> str:
return f"不支持转换: {amount} {base} → {quote}\n支持的法币:\n{', '.join(sorted(cmc_fiat))}"
url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
params = {"amount": float(amount), "symbol": base, "convert": quote}
- response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
+ response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0})
data = {}
try:
- data = response.json()["data"][0]["quote"][quote]
+ data = response["data"][0]["quote"][quote]
except Exception as e:
logger.error(e)
return f"转换失败: {amount} {base} → {quote}\n{e}"
src/price/entrypoint.py
@@ -1,6 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
import re
from pyrogram.client import Client
@@ -10,10 +9,10 @@ from config import ENABLE, PREFIX, cache
from messages.parser import parse_msg
from messages.sender import send2tg
from messages.utils import equal_prefix, startswith_prefix
-from price.binance import get_binance_price
-from price.coinmarketcap import cmc_convert_price, get_cmc_price
-from price.okx import get_okx_price
-from price.tradingview import get_tradingview_price
+from price.binance import get_binance_price, get_binance_symbols
+from price.coinmarketcap import cmc_convert_price, get_cmc_coins, get_cmc_price
+from price.okx import get_okx_price, get_okx_symbols
+from price.tradingview import get_tradingview_price, get_tradingview_symbols
HELP = f"""
💵**查询价格**
@@ -26,7 +25,7 @@ HELP = f"""
K线Interval (可选):
- 加密货币(默认30m)
-1m,3m,5m,15m,30m,1h,2h,6h,8h,12h,1D,3D,1W,1M
+1m,3m,5m,15m,30m,1h,2h,4h,6h,8h,12h,1D,3D,1W,1M
- 股票&指数(默认15m)
1m,3m,5m,15m,30m,45m,1h,2h,3h,4h,1D,1W,1M,3M,6M,1Y
@@ -118,3 +117,11 @@ async def get_asset_price(client: Client, message: Message, **kwargs):
return None
return await send2tg(client, message, texts=f"不支持此Symbol: {text}\n{HELP}", **kwargs)
+
+
+async def prefetch_price_symbols():
+ if ENABLE.PRICE:
+ await get_binance_symbols()
+ await get_okx_symbols()
+ await get_tradingview_symbols()
+ await get_cmc_coins()
src/price/okx.py
@@ -1,20 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from loguru import logger
+
from config import API, PROXY, cache
from networking import hx_req
from price.chart import generate_chart
from utils import number, ts_to_dt
-@cache.memoize(ttl=7200)
+@cache.memoize(ttl=3600)
async def get_okx_symbols() -> dict[str, str]:
"""Get all symbols from OKX."""
+ logger.info("Fetching OKX symbols...")
res = {}
url = f"{API.OKX}/api/v5/public/instruments"
for instType in ["SWAP", "SPOT"]:
params = {"instType": instType}
- response = await hx_req(url, params=params, proxy=PROXY.CRYPTO, check_has_kv=["data"])
- data = response.json()["data"]
+ response = await hx_req(url, params=params, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"code": "0"}, silent=True)
+ if response.get("hx_error"):
+ return {}
+ data = response["data"]
res |= {coin["instId"].replace("-", ""): coin["instId"] for coin in data if coin["state"] == "live"}
return res
@@ -27,6 +32,8 @@ async def get_okx_price(coin: str, interval: str | None = None) -> dict:
# OKX interval unit: m, H, D, W, M
if interval.endswith(("h", "d", "w")):
interval = interval.upper()
+ if interval == "8H": # OKX does not support 8H interval
+ interval = "6H"
if interval not in ["1m", "3m", "5m", "15m", "30m", "1H", "2H", "4H", "6H", "12H", "1D", "2D", "3D", "1W", "1M", "3M"]:
interval = "30m"
@@ -40,10 +47,12 @@ async def get_okx_price(coin: str, interval: str | None = None) -> dict:
return {}
inst_id = symbols[symbol]
url = f"{API.OKX}/api/v5/market/candles?instId={inst_id}&bar={interval}&limit=49"
- response = await hx_req(url, proxy=PROXY.CRYPTO, check_kv={"code": "0"})
- klines = response.json()["data"] # ts, o, h, l, c
+ response = await hx_req(url, proxy=PROXY.CRYPTO, check_kv={"code": "0"}, silent=True)
+ if response.get("hx_error"):
+ return {"texts": f"OKX price for {coin} failed: {response['hx_error']}"}
+ klines = response["data"] # ts, o, h, l, c
if not klines:
- return {"texts": f"OKX price failed: {coin}"}
+ return {"texts": f"OKX price for {coin} failed: {response['hx_error']}"}
klines = sorted(klines, key=lambda x: x[0])
high_price = max(float(number(x[2])) for x in klines)
low_price = min(float(number(x[3])) for x in klines)
src/price/tradingview.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
-
+from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
@@ -10,7 +9,7 @@ from config import PROXY, cache
from networking import hx_req
-@cache.memoize(ttl=7200)
+@cache.memoize(ttl=43200) # 12 hours
async def get_tradingview_symbols() -> dict[str, str]:
"""Get all symbols from TradingView.
@@ -19,11 +18,14 @@ async def get_tradingview_symbols() -> dict[str, str]:
"NASDAQ:AAPL": "NASDAQ:AAPL", # (full symbol)
}
"""
+ logger.info("Fetching TradingView symbols...")
full = {}
for region in ["hongkong", "china", "america", "cfd"]: # always put cfd at the last
url = f"https://scanner.tradingview.com/{region}/scan"
- response = await hx_req(url, proxy=PROXY.CRYPTO, check_has_kv=["data"])
- data = response.json()["data"]
+ response = await hx_req(url, proxy=PROXY.CRYPTO, check_keys=["data"], silent=True)
+ if response.get("hx_error"):
+ continue
+ data = response["data"]
full |= {coin["s"]: coin["s"] for coin in data}
simple = {k.split(":")[-1]: v for k, v in full.items()}
return simple | full
src/main.py
@@ -24,6 +24,7 @@ from bridge.social import forward_social_media_results
from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TID, TOKEN, TZ, cache
from handler import handle_social_media, handle_utilities
from messages.parser import parse_msg
+from price.entrypoint import prefetch_price_symbols
from utils import cleanup_old_files, nowdt, to_int
# ruff: noqa: RUF001
@@ -124,6 +125,7 @@ async def main():
async def scheduling(client: Client):
cache.evict() # delete expired cache
cleanup_old_files()
+ await prefetch_price_symbols()
# custom crontab jobs
now = nowdt(TZ)
src/networking.py
@@ -2,22 +2,20 @@
# -*- coding: utf-8 -*-
import asyncio
-import copy
-import inspect
import json
import re
-import time
from pathlib import Path
+from typing import Any
from urllib.parse import parse_qs, quote_plus, urlparse
import anyio
-from httpx import AsyncClient, AsyncHTTPTransport, HTTPStatusError, RequestError, Response
+from httpx import AsyncClient, AsyncHTTPTransport, HTTPStatusError, RequestError
from loguru import logger
from config import DOWNLOAD_DIR, PROXY, UA, cache, semaphore
from messages.progress import modify_progress
from messages.utils import summay_media
-from utils import bare_url, https_url, is_supported_by_ytdlp, match_urls, readable_size
+from utils import bare_url, check_data, https_url, is_supported_by_ytdlp, match_urls, readable_size
# ruff: noqa: RUF001
MOBILE_HEADERS = {
@@ -35,60 +33,6 @@ MOBILE_HEADERS = {
}
-def retry(max_retries=3, delay=2):
- def decorator(func):
- async def async_wrapper(*args, **kwargs):
- retries = 0
- resp = Response(500)
- while retries < max_retries:
- try:
- return await func(*args, **kwargs)
- except RequestError as e:
- msg = f"RequestError: {e.request.url!r} {e} (Retrying {retries}/{max_retries})"
- resp.extensions = {"exception": e, "msg": msg}
- except HTTPStatusError as e:
- msg = f"HTTPStatusError: {e.response.status_code} while requesting {e.request.url!r} {e} (Retrying {retries}/{max_retries})"
- resp.extensions = {"exception": e, "msg": msg}
- except Exception as e:
- msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
- resp.extensions = {"exception": e, "msg": msg}
- logger.error(msg)
- retries += 1
- await asyncio.sleep(delay)
- logger.error(f"Failed after {max_retries} retries")
- return resp
-
- def sync_wrapper(*args, **kwargs):
- retries = 0
- resp = Response(500)
- while retries < max_retries:
- try:
- return func(*args, **kwargs)
- except RequestError as e:
- msg = f"RequestError: {e.request.url!r} {e} (Retrying {retries}/{max_retries})"
- resp.extensions = {"exception": e, "msg": msg}
- except HTTPStatusError as e:
- msg = f"HTTPStatusError: {e.response.status_code} while requesting {e.request.url!r} {e} (Retrying {retries}/{max_retries})"
- resp.extensions = {"exception": e, "msg": msg}
- except Exception as e:
- msg = f"{type(e).__name__}: {e} (Retrying {retries}/{max_retries})"
- resp.extensions = {"exception": e, "msg": msg}
- logger.error(msg)
- retries += 1
- time.sleep(delay)
- logger.error(f"Failed after {max_retries} retries")
- return resp
-
- def wrapper(*args, **kwargs):
- if inspect.iscoroutinefunction(func):
- return async_wrapper(*args, **kwargs)
- return sync_wrapper(*args, **kwargs)
-
- return wrapper
-
- return decorator
-
-
async def log_req(request):
logger.debug(f"{request.method} {request.url}")
@@ -98,7 +42,6 @@ async def log_resp(response):
logger.debug(f"[{response.status_code}] {request.method} {request.url}")
-@retry()
async def hx_req(
url,
method: str = "GET",
@@ -107,67 +50,78 @@ async def hx_req(
merge_headers: bool = True,
cookies: dict | None = None,
params: dict | None = None,
- post_data: dict | None = None,
post_json: dict | None = None,
transport: AsyncHTTPTransport | None = None,
proxy: str | None = None,
follow_redirects: bool = True,
- check_has_kv: list[str] | None = None,
+ check_keys: list[str] | None = None,
check_kv: dict | None = None,
timeout: int = 10, # noqa: ASYNC109
-) -> Response:
+ retry: int = 0,
+ max_retry: int = 2,
+ silent: bool = False,
+ rformat: str = "json", # "json", "text"
+) -> dict[str, Any]:
+ """Request the given URL with the given method and return the response as a dictionary.
+
+ Args:
+ url (str): The URL to request.
+ method (str): The method to use for the request.
+ headers (dict, optional): The headers to use for the request.
+ merge_headers (bool, optional): Whether to merge the default headers with the given headers.
+ cookies (dict, optional): The cookies to use for the request.
+ params (dict, optional): The parameters to use for the request.
+ post_json (dict, optional): The JSON data to use for the request.
+ transport (AsyncHTTPTransport, optional): The transport to use for the request.
+ proxy (str, optional): The proxy to use for the request.
+ follow_redirects (bool, optional): Whether to follow redirects.
+ check_keys (list[str], optional): The keys to check in the response.
+ check_kv (dict, optional): The key-value pairs to check in the response.
+ timeout (int, optional): The timeout for the request.
+ retry (int, optional): The number of retries for the request.
+ silent (bool, optional): Whether to suppress the logs.
+
+ Returns:
+ dict: {"success": bool, "data": response}
+ """
+ if retry > max_retry:
+ error = f"[{method}] Failed after {retry} retries: {url}"
+ logger.error(error)
+ return {"hx_error": error}
# headers
if headers is None:
headers = MOBILE_HEADERS
elif merge_headers:
headers = MOBILE_HEADERS | headers
if transport is None:
- transport = AsyncHTTPTransport(proxy=proxy, http2=True, retries=3)
- logger.trace(f"Headers: {headers}, Params: {params}")
- async with AsyncClient(http2=True, proxy=proxy, transport=transport, event_hooks={"request": [log_req], "response": [log_resp]}) as client:
- if method == "GET":
- response = await client.get(url, cookies=cookies, headers=headers, params=params, follow_redirects=follow_redirects, timeout=timeout)
- elif method == "POST":
- response = await client.post(url, cookies=cookies, headers=headers, data=post_data, json=post_json, params=params, follow_redirects=follow_redirects, timeout=timeout)
- data = response.text
- try:
- logger.trace(json.loads(data))
- except json.JSONDecodeError:
- logger.trace(data)
- response.raise_for_status()
- if check_has_kv:
- data = json.loads(response.text)
- for key in check_has_kv:
- if "." not in key:
- if not data.get(key):
- msg = f"Value of Key={key} not found in json response"
- logger.error(data)
- raise ValueError(msg)
- else:
- keys = key.split(".")
- data_copy = copy.deepcopy(data)
- error_msg = f"Value of Key={key} not found in json response"
- for k in keys:
- if not isinstance(data_copy, dict):
- raise TypeError(error_msg)
- data_copy = data_copy.get(k, {})
- if not data_copy:
- raise ValueError(error_msg)
-
- if check_kv:
- data = json.loads(response.text)
- for key, value in check_kv.items():
- if isinstance(value, dict):
- for k, v in value.items():
- if str(data.get(key, {}).get(k)) != str(v): # convert to str to compare
- msg = f"Key={key}.{k} got {data.get(key, {}).get(k)} in response, but required: {v}"
- logger.error(data)
- raise ValueError(msg)
- elif str(data.get(key)) != str(value): # convert to str to compare
- msg = f"Key={key} got {data.get(key)} in response, but required: {value}"
- logger.error(data)
- raise ValueError(msg)
- return response
+ transport = AsyncHTTPTransport(proxy=proxy, http2=True, retries=retry)
+
+ if silent:
+ client = AsyncClient(http2=True, proxy=proxy, transport=transport, follow_redirects=follow_redirects, timeout=timeout)
+ else:
+ logger.trace(f"Url: {url}, Headers: {headers}, Params: {params}")
+ client = AsyncClient(http2=True, proxy=proxy, transport=transport, follow_redirects=follow_redirects, timeout=timeout, event_hooks={"request": [log_req], "response": [log_resp]})
+
+ if method not in ["GET", "POST"]:
+ error = f"Invalid method: {method}"
+ logger.error(error)
+ return {"hx_error": error}
+ try:
+ async with client:
+ if method == "GET":
+ response = await client.get(url, cookies=cookies, headers=headers, params=params)
+ else:
+ response = await client.post(url, cookies=cookies, headers=headers, json=post_json, params=params)
+ response.raise_for_status()
+ data = response.text
+ check_data(data, check_keys=check_keys, check_kv=check_kv)
+ res = json.loads(data) if rformat == "json" else {rformat: data}
+ if not silent:
+ logger.trace(res)
+ return res
+ except Exception as e:
+ logger.error(f"{type(e).__name__}[{retry + 1}/{max_retry + 1}]: Failed to request {url}, {e}")
+ return await hx_req(url, method, headers=headers, merge_headers=merge_headers, cookies=cookies, params=params, post_json=post_json, transport=transport, proxy=proxy, follow_redirects=follow_redirects, check_keys=check_keys, check_kv=check_kv, timeout=timeout, retry=retry + 1, max_retry=max_retry, silent=silent, rformat=rformat) # fmt: off
async def download_file(
@@ -348,19 +302,20 @@ async def match_social_media_link(text: str, *, flatten_first: bool = False) ->
url = f"https://x.com/{handle}/status/{post_id}"
return {"platform": platform, "handle": handle, "post_id": post_id, "url": url, "db_key": bare_url(url)}
- # https://weibo.com/1736562685/P6lhSjRnI
- if matched := re.search(r"(https?://)?(www\.)?weibo\.com/(.*?)/(\w+)", text):
- return {"post_id": matched.group(4), "url": https_url(matched.group(0)), "db_key": f"m.weibo.cn/detail/{matched.group(4)}", "platform": "weibo"}
- # https://m.weibo.cn/detail/5113333048938691
- # https://m.weibo.cn/status/5113333048938691
- if matched := re.search(r"(https?://)?m\.weibo\.cn/(:?detail|status)/(\w+)", text):
- return {"post_id": matched.group(3), "url": https_url(matched.group(0)), "db_key": f"m.weibo.cn/detail/{matched.group(3)}", "platform": "weibo"}
+ # weibo video first, then weibo post
# https://video.weibo.com/show?fid=1034:5123779299311660
if matched := re.search(r"(https?://)?video\.weibo\.(:?com|cn)/show\?fid=(\d+):(\d+)", text):
return {"post_id": f"weibovideo{matched.group(3)}:{matched.group(4)}", "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "weibo"}
# https://weibo.com/tv/show/1034:5123779299311660?from=old_pc_videoshow
if matched := re.search(r"(https?://)?(www\.)?weibo\.(:?com|cn)/tv/show/(\d+):(\d+)", text):
return {"post_id": f"weibovideo{matched.group(4)}:{matched.group(5)}", "url": https_url(matched.group(0)), "db_key": bare_url(matched.group(0)), "platform": "weibo"}
+ # https://m.weibo.cn/detail/5113333048938691
+ # https://m.weibo.cn/status/5113333048938691
+ if matched := re.search(r"(https?://)?m\.weibo\.cn/(:?detail|status)/(\w+)", text):
+ return {"post_id": matched.group(3), "url": https_url(matched.group(0)), "db_key": f"m.weibo.cn/detail/{matched.group(3)}", "platform": "weibo"}
+ # https://weibo.com/1736562685/P6lhSjRnI
+ if matched := re.search(r"(https?://)?(www\.)?weibo\.com/(.*?)/(\w+)", text):
+ return {"post_id": matched.group(4), "url": https_url(matched.group(0)), "db_key": f"m.weibo.cn/detail/{matched.group(4)}", "platform": "weibo"}
# http://xhslink.com/a/Z3VPXAReU1Y1
xhs_pattern = r"(https?://)?xhslink\.com/(\w?/?)([^,,.。?\s]+)"
@@ -467,7 +422,7 @@ async def flatten_rediercts(texts: str | None = None, pattern: str | None = None
except Exception as e:
logger.error(f"Failed to parse redirect for {url}: {e}")
return texts
- rediercted_url = str(resp.url)
+ rediercted_url = str(resp.url) # type: ignore
logger.info(f"Flatten redirect: {url} -> {rediercted_url}")
return texts.replace(url, rediercted_url)
@@ -475,15 +430,16 @@ async def flatten_rediercts(texts: str | None = None, pattern: str | None = None
if __name__ == "__main__":
import asyncio
- asyncio.run(match_social_media_link("https://www.douyin.com/video/7398813386827468041"))
- asyncio.run(match_social_media_link("https://www.iesdouyin.com/share/note/7454527270925946138/"))
- asyncio.run(match_social_media_link("https://www.instagram.com/yifaer_chen/p/DEzv9x-vzOn/"))
+ check_data(json.dumps({"foo": "bar", "baz": {"qux": "quux"}, "lst": ["1", "2", "3"]}), check_keys=["baz.qux"], check_kv={"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]})
+ # asyncio.run(match_social_media_link("https://www.douyin.com/video/7398813386827468041"))
+ # asyncio.run(match_social_media_link("https://www.iesdouyin.com/share/note/7454527270925946138/"))
+ # asyncio.run(match_social_media_link("https://www.instagram.com/yifaer_chen/p/DEzv9x-vzOn/"))
# asyncio.run(flatten_rediercts("http://t.cn/A6ukIuVn"))
# asyncio.run(flatten_rediercts("shorturl.at/fuyrt"))
# asyncio.run(flatten_rediercts("https://v.douyin.com/CeiJfJMQG/"))
# asyncio.run(flatten_rediercts("https://t.co/Wwo3x69CQz"))
- # res = asyncio.run(hx_req("https://httpbin.org/delay/5000"))
- # asyncio.run(hx_req("https://httpbin.org/get", check_kv={"url": "https://httpbin.org/"}))
+ # res = asyncio.run(hx_req("https://httpbin.org/delay/10"))
+ asyncio.run(hx_req("https://httpbin.org/get", check_kv={"url": "https://httpbin.org/get", "headers.Pragma": "no-cache1"}, max_retry=1))
# resp = asyncio.run(hx_req("https://httpbin.org/get", check_kv={"headers": {"Accept-Language": "en-US,en;q=0.8"}}))
# resp = asyncio.run(hx_req("https://httpbin.org/status/404"))
# asyncio.run(download_file("https://httpbin.org/image/jpeg", suffix=".jpg"))
src/utils.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import copy
import json
import random
import re
@@ -11,7 +12,7 @@ from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
-from bs4 import PageElement
+from bs4.element import PageElement
from loguru import logger
from pyrogram.client import Client
from yt_dlp.extractor import gen_extractors
@@ -253,6 +254,75 @@ def ascii_to_unicode(text: str) -> str:
return bytes(str(text), "ascii").decode("unicode_escape")
+def check_data(text: str, check_keys: list[str] | None = None, check_kv: dict | None = None):
+ """Check if data contains required keys and key-value pairs.
+
+ Example data:
+ {
+ "foo": "bar",
+ "baz": {
+ "qux": "quux"
+ },
+ "lst": ["1", "2", "3"]
+ }
+
+ check_keys: ["foo", "baz.qux", "lst"]
+ check_kv: {"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]}
+ """
+ if not check_keys and not check_kv: # no need to check
+ return
+ try:
+ data = json.loads(text)
+ except json.JSONDecodeError:
+ logger.error(f"Failed to parse data as json: {text}")
+ raise
+
+ # ["foo", "baz.qux", "lst"]
+ if check_keys:
+ for key in check_keys:
+ msg = f"{key=} not found: {data}"
+ if "." not in key:
+ if not data.get(key):
+ logger.error(msg)
+ raise ValueError
+ else:
+ keys = key.split(".")
+ data_copy = copy.deepcopy(data) # do not modify the original data
+ for k in keys:
+ if not isinstance(data_copy, dict):
+ logger.error(msg)
+ raise TypeError
+ data_copy = data_copy.get(k, {})
+ if not data_copy:
+ logger.error(msg)
+ raise ValueError
+
+ # {"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]}
+ if check_kv:
+ for key, required_value in check_kv.items():
+ if "." not in key:
+ value = data.get(key)
+ else:
+ keys = key.split(".")
+ msg = f"{key=} not found: {data}"
+ value = copy.deepcopy(data)
+ for k in keys:
+ if not isinstance(value, dict):
+ msg = f"{key=} not found: {data}"
+ logger.error(msg)
+ raise TypeError
+ value = value.get(k, {})
+ if value is None:
+ msg = f"{key=} not found: {data}"
+ logger.error(msg)
+ raise ValueError
+
+ if str(value) != str(required_value): # convert to str to compare
+ msg = f"{data=}, {key=}, {value=}, but required: {required_value}"
+ logger.error(msg)
+ raise ValueError
+
+
def cleanup_old_files(root: Path | str | None = None, duration: int = 7200) -> None:
"""Clean up files older than duration seconds."""
if root is None: