Commit 6a19c0b
Changed files (9)
src/price/binance.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import contextlib
+
+from config import API, PROXY, cache
+from networking import hx_req
+from price.chart import generate_chart
+from utils import number, ts_to_dt
+
+
+@cache.memoize(ttl=7200)
+async def get_binance_symbols() -> dict[str, str]:
+ """Get all symbols from Binance."""
+ res = {}
+ # um
+ with contextlib.suppress(Exception): # region restriction
+ url = f"{API.BINANCE_UM}/fapi/v1/exchangeInfo"
+ response = await hx_req(url, proxy=PROXY.CRYPTO, check_has_kv=["symbols"])
+ data = response.json()["symbols"]
+ res = {coin["symbol"]: "UM" for coin in data if coin["status"] == "TRADING"}
+ # spot
+ url = f"{API.BINANCE_SPOT}/api/v3/exchangeInfo"
+ params = {"showPermissionSets": False, "symbolStatus": "TRADING"}
+ response = await hx_req(url, params=params, proxy=PROXY.CRYPTO, check_has_kv=["symbols"])
+ data = response.json()["symbols"]
+ res |= {coin["symbol"]: "SPOT" for coin in data}
+ return res
+
+
+@cache.memoize(ttl=60)
+async def get_binance_price(coin: str, interval: str = "30m") -> dict:
+ """Get the price of a crypto asset from Binance."""
+ symbols = await get_binance_symbols()
+ symbol = coin.upper()
+ stablecoins = ["USDT", "USDC", "FDUSD", "TUSD"]
+ while symbol not in symbols and stablecoins:
+ symbol = f"{coin}{stablecoins.pop(0)}".upper()
+ if symbol not in symbols:
+ return {}
+
+ market = symbols[symbol]
+ if market == "SPOT":
+ url = f"{API.BINANCE_SPOT}/api/v3/klines?symbol={symbol}&interval={interval}&limit=49"
+ response = await hx_req(url, proxy=PROXY.CRYPTO)
+ elif market == "UM":
+ url = f"{API.BINANCE_UM}/fapi/v1/klines?symbol={symbol}&interval={interval}&limit=49"
+ response = await hx_req(url, proxy=PROXY.CRYPTO)
+ else:
+ return {}
+ klines = response.json()
+ if not klines:
+ return {"texts": f"Binance price failed: {coin}"}
+ high_price = max(float(number(x[2])) for x in klines)
+ low_price = min(float(number(x[3])) for x in klines)
+ open_price = float(number(klines[0][1]))
+ close_price = float(number(klines[-1][4]))
+ change_pct = (close_price - open_price) / open_price
+ amplitude = (high_price - low_price) / low_price
+ if close_price < open_price:
+ amplitude *= -1
+ title = f"{symbol}•{interval}•Binance"
+ subtitle = f"开: {open_price} 高: {high_price} 低: {low_price} 收: {close_price} 涨: {change_pct:+.2%} 振: {abs(amplitude):.2%}"
+ text = f"{title}\n"
+ text += f"时间段: {ts_to_dt(klines[0][0]):%m-%d %H:%M} - {ts_to_dt(klines[-1][0]):%m-%d %H:%M}\n"
+ text += f"开盘价: {open_price}\n"
+ text += f"最高价: {high_price}\n"
+ text += f"最低价: {low_price}\n"
+ text += f"收盘价: {close_price} \n"
+ text += f"涨跌幅: {change_pct:+.2%}\n"
+ text += f"振幅: {amplitude:.2%}"
+ chart = await generate_chart(klines, interval, title, subtitle)
+ return {"texts": text, "media": [{"photo": chart}]}
src/price/chart.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from pathlib import Path
+
+from quickchart import QuickChart
+
+from config import DOWNLOAD_DIR
+from networking import download_file
+from utils import number, rand_string, ts_to_dt
+
+
+async def generate_chart(klines: list[list], interval: str, title: str, subtitle: str) -> str:
+ """Generate a candlestick chart.
+
+ Docs: https://quickchart.io/documentation
+ """
+ qc = QuickChart()
+ qc.width = 3000 # max 3000
+ qc.height = 1080
+ qc.version = "3"
+ qc.format = "png"
+ qc.device_pixel_ratio = 1.0
+ data_str = [f"{{ x: new Date('{ts_to_dt(x[0]):%Y-%m-%dT%H:%M:%S}+08:00').getTime(), o: {number(x[1])}, h: {number(x[2])}, l: {number(x[3])}, c: {number(x[4])} }}" for x in klines]
+ config_str = f"""
+ {{
+ type: 'candlestick',
+ data: {{
+ datasets: [{{
+ data: [{",".join(data_str)}],
+ }}],
+ }},
+ options: {{
+ scales:{{
+ x: {{
+ adapters: {{ date: {{ zone: 'UTC+8' }} }},
+ type: 'time',
+ time: {{
+ unit: '{"minute" if interval[-1] in ["m", "h"] else "day"}',
+ stepSize: 1,
+ displayFormats: {{
+ minute: 'HH:mm',
+ hour: 'HH:mm',
+ day: 'MM-dd',
+ month: 'MM-dd',
+ year: 'MM-dd',
+ }}
+ }},
+ ticks: {{
+ autoSkip: false,
+ font: {{
+ size: 16,
+ weight: 'bold',
+ }},
+ }}
+ }},
+ y: {{
+ ticks: {{
+ font: {{
+ size: 24,
+ weight: 'bold',
+ }},
+ }}
+ }},
+ }},
+ plugins: {{
+ title: {{
+ display: true,
+ text: '{title}',
+ color: 'rgb(0, 0, 0)',
+ font: {{
+ size: 24,
+ weight: 'bold',
+ }},
+ }},
+ subtitle: {{
+ display: true,
+ text: '{subtitle}',
+ font: {{
+ size: 20,
+ weight: 'bold',
+ }},
+ }},
+ legend: {{
+ display: false,
+ }},
+ }},
+ }},
+ }}"""
+ qc.config = config_str.replace("\n", "").replace(" ", "") # type: ignore
+ path = Path(DOWNLOAD_DIR) / f"{rand_string()}.png"
+ try:
+ return await download_file(qc.get_url(), path=path)
+ except Exception:
+ return ""
src/price/coinmarketcap.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from loguru import logger
+
+from config import PROXY, TOKEN, TZ, cache
+from networking import hx_req
+from utils import number
+
+
+@cache.memoize(ttl=7200)
+async def get_cmc_coins() -> dict:
+ """Get all coins from CoinMarketCap.
+
+ Returns:
+ dict: {coin: slug} (e.g. {"BTC": "bitcoin"})
+ All coins are in uppercase. The slugs are in lowercase.
+ """
+ if not TOKEN.CMC_API_KEY:
+ logger.warning("CoinMarketCap API key is not set.")
+ return {}
+ url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/map"
+ params = {"limit": "5000", "sort": "cmc_rank", "aux": "status"}
+ headers = {
+ "Accepts": "application/json",
+ "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY,
+ }
+ response = await hx_req(url, params=params, headers=headers, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
+ data = response.json()["data"]
+ return {coin["symbol"]: coin["slug"] for coin in data}
+
+
+@cache.memoize(ttl=60)
+async def get_cmc_price(coin: str) -> str:
+ """Get the price of a crypto asset from CoinMarketCap."""
+ cmc_coins = await get_cmc_coins()
+ fiat = "USD"
+ if coin.upper() in cmc_coins:
+ params = {"symbol": coin.upper(), "convert": fiat}
+ elif coin.lower() in cmc_coins.values():
+ params = {"slug": coin.lower(), "convert": fiat}
+ else:
+ return ""
+ url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest"
+ headers = {
+ "Accepts": "application/json",
+ "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY,
+ }
+ response = await hx_req(url, params=params, headers=headers, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
+ data = next(iter(response.json()["data"].values()), {})
+ if not data:
+ return f"CoinMarketCap price failed: {coin}"
+ stats = data["quote"][fiat]
+ precision = 2 if float(stats["price"]) > 1 else 6
+ emoji = lambda x: "🟢" if float(x) > 0 else "🔴"
+ msg = f"🪙**{data['symbol']}** ({data['name']})\n"
+ msg += f"💵价格: ${number(stats['price'], precision)}\n"
+ msg += f"{emoji(stats['percent_change_1h'])}1h涨跌: {stats['percent_change_1h']:+.2f}%\n"
+ msg += f"{emoji(stats['percent_change_24h'])}24h涨跌: {stats['percent_change_24h']:+.2f}%\n"
+ msg += f"{emoji(stats['percent_change_30d'])}30d涨跌: {stats['percent_change_30d']:+.2f}%\n"
+ msg += f"{emoji(stats['percent_change_60d'])}60d涨跌: {stats['percent_change_60d']:+.2f}%\n"
+ msg += f"{emoji(stats['percent_change_90d'])}90d涨跌: {stats['percent_change_90d']:+.2f}%\n"
+ date = datetime.fromisoformat(data["last_updated"]).astimezone(ZoneInfo(TZ))
+ msg += f"🕒{date:%Y-%m-%d %H:%M} [CoinMarketCap]\n"
+ return msg.strip()
src/price/entrypoint.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import ENABLE, PREFIX, cache
+from messages.parser import parse_msg
+from messages.sender import send2tg
+from messages.utils import equal_prefix, startswith_prefix
+from price.binance import get_binance_price
+from price.coinmarketcap import get_cmc_price
+
+HELP = f"""
+💵**查询价格**
+使用方式: `{PREFIX.PRICE}` + symbol
+示例: `{PREFIX.PRICE} BTC` 查询比特币价格 (大小写均可)
+"""
+
+
+@cache.memoize(ttl=60)
+async def get_asset_price(client: Client, message: Message, **kwargs) -> None:
+ """Get asset price."""
+ if not ENABLE.PRICE:
+ return
+ info = parse_msg(message)
+ # send docs if message == "/price"
+ if equal_prefix(info["text"], prefix=[PREFIX.PRICE]):
+ await send2tg(client, message, texts=HELP, **kwargs)
+ return
+
+ if not startswith_prefix(info["text"], prefix=[PREFIX.PRICE]):
+ return
+
+ # find the asset name
+ asset = info["text"].removeprefix(PREFIX.PRICE).strip()
+
+ if res := await get_binance_price(asset):
+ await send2tg(client, message, **res, **kwargs)
+ elif res := await get_cmc_price(asset):
+ await send2tg(client, message, texts=res, **kwargs)
+ else:
+ await send2tg(client, message, texts=f"不支持此Symbol: {asset}", **kwargs)
src/config.py
@@ -34,6 +34,7 @@ class ENABLE:
GPT = os.getenv("ENABLE_GPT", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
INSTAGRAM = os.getenv("ENABLE_INSTAGRAM", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
OCR = os.getenv("ENABLE_OCR", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+ PRICE = os.getenv("ENABLE_PRICE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
SUBTITLE = os.getenv("ENABLE_SUBTITLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
TIKTOK = os.getenv("ENABLE_TIKTOK", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
TWITTER = os.getenv("ENABLE_TWITTER", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
@@ -58,6 +59,7 @@ class PREFIX:
SUBTITLE = os.getenv("PREFIX_SUBTITLE", "/subtitle").lower()
WGET = os.getenv("PREFIX_WGET", "/wget").lower()
OCR = os.getenv("PREFIX_OCR", "/ocr").lower()
+ PRICE = os.getenv("PREFIX_PRICE", "/price").lower()
COMBINATION = os.getenv("PREFIX_COMBINATION", "/combine").lower()
@@ -77,6 +79,8 @@ class API:
TIKHUB_INSTAGRAM = os.getenv("TIKHUB_INSTAGRAM_API", "https://api.tikhub.io/api/v1/instagram/web_app/fetch_post_info_by_url?url=")
TIKHUB_TWITTER = os.getenv("TIKHUB_TWITTER_API", "https://api.tikhub.io/api/v1/twitter/web/fetch_post_comments?tweet_id=")
TIKHUB_WEIBO_VIDEO = os.getenv("TIKHUB_WEIBO_VIDEO_API", "https://api.tikhub.io/api/v1/weibo/web/fetch_short_video_data?share_text=")
+ BINANCE_SPOT = os.getenv("BINANCE_SPOT_API", "https://data-api.binance.vision")
+ BINANCE_UM = os.getenv("BINANCE_UM_API", "https://fapi.binance.com")
class PROVIDER: # default API provider
@@ -97,6 +101,7 @@ class TOKEN:
TENCENT_ASR_SECRET_ID = os.getenv("TENCENT_ASR_SECRET_ID", "")
TENCENT_ASR_SECRET_KEY = os.getenv("TENCENT_ASR_SECRET_KEY", "")
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY", "")
+ CMC_API_KEY = os.getenv("CMC_API_KEY", "")
class PROXY: # format: socks5://127.0.0.1:7890
@@ -106,6 +111,7 @@ class PROXY: # format: socks5://127.0.0.1:7890
TENCENT = os.getenv("TENCENT_PROXY", None) # Banned oversea IP, need a back to China proxy
GPT = os.getenv("GPT_PROXY", None)
SUBTITLE = os.getenv("SUBTITLE_PROXY", None)
+ CRYPTO = os.getenv("CRYPTO_PROXY", None)
DOWNLOAD = os.getenv("DOWNLOAD_PROXY", None)
WEIBO_COOKIE = os.getenv("WEIBO_COOKIE_PROXY", None) # Weibo visitor cookie
YTDLP = os.getenv("YTDLP_PROXY", None) # general proxy for ytdlp
src/handler.py
@@ -28,6 +28,7 @@ from preview.twitter import preview_twitter
from preview.weibo import preview_weibo
from preview.xiaohongshu import preview_xhs
from preview.ytdlp import ProxyError, preview_ytdlp
+from price.entrypoint import get_asset_price
from utils import to_int, true
@@ -45,6 +46,7 @@ async def handle_utilities(
subtitle: bool = True,
wget: bool = True,
ocr: bool = True,
+ price: bool = True,
summary: bool = True,
raw_img: bool = False,
show_progress: bool = True,
@@ -67,6 +69,7 @@ async def handle_utilities(
subtitle (bool, optional): Enable YouTube subtitle. Defaults to True.
wget (bool, optional): Enable WGET. Defaults to True.
ocr (bool, optional): Enable OCR. Defaults to True.
+ price (bool, optional): Enable Asset price. Defaults to True.
summary (bool, optional): Enable AI summary. Defaults to True.
raw_img (bool, optional): Enable convert raw image. Defaults to False.
show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
@@ -87,6 +90,8 @@ async def handle_utilities(
await download_url_in_message(client, message, **kwargs) # /wget
if ocr:
await send_to_ocr_bridge(client, message) # /ocr
+ if price:
+ await get_asset_price(client, message) # /ocr
if summary:
await ai_summary(client, message) # /summary
if raw_img:
src/utils.py
@@ -6,6 +6,7 @@ import random
import re
import string
from datetime import UTC, datetime
+from decimal import Decimal
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
@@ -24,6 +25,21 @@ def nowdt(tz: str = "UTC") -> datetime:
return datetime.now(ZoneInfo(tz))
+def number(n: float | str | Decimal, precision: int = -1, *, sign: bool = False) -> str:
+ """Normalize a number to its simplest decimal.
+
+ Example:
+ "1.2340000" -> "1.234"
+ 1.000000 -> "1"
+ """
+ n = Decimal(n)
+ if precision == -1: # auto precision (up to 8 decimal places)
+ return f"{n:.8f}".rstrip("0").rstrip(".")
+ if precision == 0:
+ return f"{n:.0f}"
+ return f"{n:+.{precision}f}" if sign else f"{n:.{precision}f}"
+
+
def split_parts(first: int = 0, middle: int = 0, last: int = 0) -> dict:
"""Split a list of items into three parts: first, middle, and last.
@@ -179,9 +195,19 @@ def bare_url(url: str) -> str:
def ts_to_dt(ts: str | float | None) -> datetime | None:
if not ts:
return None
+
+ try: # not number
+ ts = float(ts)
+ except ValueError:
+ return None
+
+ if 0 < float(ts) < 1:
+ return None
try:
- return datetime.fromtimestamp(float(ts), tz=UTC).astimezone(ZoneInfo(TZ))
+ return datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ))
except Exception as e:
+ if "out of range" in str(e):
+ return ts_to_dt(ts / 1000)
logger.error(e)
return None
pyproject.toml
@@ -16,6 +16,7 @@ dependencies = [
"pytgcrypto>=1.2.9.2",
"python-ffmpeg>=2.0.12",
"pyyaml>=6.0.2",
+ "quickchart-io>=2.0.0",
"tiktoken>=0.8.0",
"uvloop>=0.21.0",
"youtube-transcript-api>=0.6.3",
uv.lock
@@ -196,14 +196,15 @@ wheels = [
[[package]]
name = "beautifulsoup4"
-version = "4.12.3"
+version = "4.13.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "soupsieve", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
+ { name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
]
-sdist = { url = "https://files.pythonhosted.org/packages/b3/ca/824b1195773ce6166d388573fc106ce56d4a805bd7427b624e063596ec58/beautifulsoup4-4.12.3.tar.gz", hash = "sha256:74e3d1928edc070d21748185c46e3fb33490f22f52a3addee9aee0f4f7781051", size = 581181 }
+sdist = { url = "https://files.pythonhosted.org/packages/f0/3c/adaf39ce1fb4afdd21b611e3d530b183bb7759c9b673d60db0e347fd4439/beautifulsoup4-4.13.3.tar.gz", hash = "sha256:1bd32405dacc920b42b83ba01644747ed77456a65760e285fbc47633ceddaf8b", size = 619516 }
wheels = [
- { url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 },
+ { url = "https://files.pythonhosted.org/packages/f9/49/6abb616eb3cbab6a7cca303dc02fdf3836de2e0b834bf966a7f5271a34d8/beautifulsoup4-4.13.3-py3-none-any.whl", hash = "sha256:99045d7d3f08f91f0d656bc9b7efbae189426cd913d830294a15eefa0ea4df16", size = 186015 },
]
[[package]]
@@ -227,6 +228,7 @@ dependencies = [
{ name = "pytgcrypto", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "python-ffmpeg", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "pyyaml", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
+ { name = "quickchart-io", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "tiktoken", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "uvloop", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
{ name = "youtube-transcript-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
@@ -256,6 +258,7 @@ requires-dist = [
{ name = "pytgcrypto", specifier = ">=1.2.9.2" },
{ name = "python-ffmpeg", specifier = ">=2.0.12" },
{ name = "pyyaml", specifier = ">=6.0.2" },
+ { name = "quickchart-io", specifier = ">=2.0.0" },
{ name = "tiktoken", specifier = ">=0.8.0" },
{ name = "uvloop", specifier = ">=0.21.0" },
{ name = "youtube-transcript-api", specifier = ">=0.6.3" },
@@ -1082,6 +1085,17 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/fe/0f/25911a9f080464c59fab9027482f822b86bf0608957a5fcc6eaac85aa515/PyYAML-6.0.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:68ccc6023a3400877818152ad9a1033e3db8625d899c72eacb5a668902e4d652", size = 751597 },
]
+[[package]]
+name = "quickchart-io"
+version = "2.0.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "requests", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" },
+]
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/17/46/4d2c1ce9457ddd90913084085be5add1cb040f9bb41717adb89554a9c9d9/quickchart_io-2.0.0-py3-none-any.whl", hash = "sha256:c44b5fb4d6e957fb85db0926e691684795e9fe5d6819d33f2daea795a0f6a36b", size = 5122 },
+]
+
[[package]]
name = "regex"
version = "2024.11.6"