Commit 6d1bfac

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-02-05 11:30:45
feat(price): support currency price conversion
1 parent d704339
Changed files (3)
src/price/coinmarketcap.py
@@ -9,6 +9,8 @@ from config import PROXY, TOKEN, TZ, cache
 from networking import hx_req
 from utils import number
 
+HEADERS = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
+
 
 @cache.memoize(ttl=7200)
 async def get_cmc_coins() -> dict:
@@ -23,20 +25,27 @@ async def get_cmc_coins() -> dict:
         return {}
     url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/map"
     params = {"limit": "5000", "sort": "cmc_rank", "aux": "status"}
-    headers = {
-        "Accepts": "application/json",
-        "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY,
-    }
-    response = await hx_req(url, params=params, headers=headers, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
+    response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
     data = response.json()["data"]
     return {coin["symbol"]: coin["slug"] for coin in data}
 
 
+@cache.memoize(ttl=7200)
+async def get_cmc_fiat() -> list:
+    """Get all fiat from CoinMarketCap."""
+    if not TOKEN.CMC_API_KEY:
+        logger.warning("CoinMarketCap API key is not set.")
+        return []
+    url = "https://pro-api.coinmarketcap.com/v1/fiat/map"
+    response = await hx_req(url, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
+    data = response.json()["data"]
+    return [coin["symbol"] for coin in data]
+
+
 @cache.memoize(ttl=60)
-async def get_cmc_price(coin: str) -> str:
+async def get_cmc_price(coin: str, fiat: str = "USD") -> str:
     """Get the price of a crypto asset from CoinMarketCap."""
     cmc_coins = await get_cmc_coins()
-    fiat = "USD"
     if coin.upper() in cmc_coins:
         params = {"symbol": coin.upper(), "convert": fiat}
     elif coin.lower() in cmc_coins.values():
@@ -44,11 +53,7 @@ async def get_cmc_price(coin: str) -> str:
     else:
         return ""
     url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest"
-    headers = {
-        "Accepts": "application/json",
-        "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY,
-    }
-    response = await hx_req(url, params=params, headers=headers, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
+    response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
     data = next(iter(response.json()["data"].values()), {})
     if not data:
         return f"CoinMarketCap price failed: {coin}"
@@ -63,5 +68,32 @@ async def get_cmc_price(coin: str) -> str:
     msg += f"{emoji(stats['percent_change_60d'])}60d涨跌: {stats['percent_change_60d']:+.2f}%\n"
     msg += f"{emoji(stats['percent_change_90d'])}90d涨跌: {stats['percent_change_90d']:+.2f}%\n"
     date = datetime.fromisoformat(data["last_updated"]).astimezone(ZoneInfo(TZ))
-    msg += f"🕒{date:%Y-%m-%d %H:%M} [CoinMarketCap]\n"
+    msg += f"🕒{date:%Y-%m-%d %H:%M}\n"
+    msg += "📡CoinMarketCap\n"
+    return msg.strip()
+
+
+async def cmc_convert_price(amount: float | str, base: str, quote: str) -> str:
+    """Convert asset price."""
+    cmc_fiat = await get_cmc_fiat()
+    cmc_coins = await get_cmc_coins()
+    all_coins = cmc_fiat + list(cmc_coins.keys())
+    base = base.upper()
+    quote = quote.upper()
+    if base not in all_coins or quote not in all_coins:
+        return f"不支持转换: {amount} {base} → {quote}"
+    url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
+    params = {"amount": float(amount), "symbol": base, "convert": quote}
+    response = await hx_req(url, params=params, headers=HEADERS, merge_headers=False, proxy=PROXY.CRYPTO, check_has_kv=["data"])
+    data = {}
+    try:
+        data = response.json()["data"][0]["quote"][quote]
+    except Exception as e:
+        logger.error(e)
+        return f"转换失败: {amount} {base} → {quote}\n{e}"
+    precision = 2 if float(data["price"]) > 1 else 6
+    msg = f"🔄{number(amount)} **{base}** = {number(data['price'], precision)} **{quote}**\n"
+    date = datetime.fromisoformat(data["last_updated"]).astimezone(ZoneInfo(TZ))
+    msg += f"🕒{date:%Y-%m-%d %H:%M}\n"
+    msg += "📡CoinMarketCap\n"
     return msg.strip()
src/price/entrypoint.py
@@ -1,6 +1,8 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
+import re
+
 from pyrogram.client import Client
 from pyrogram.types import Message
 
@@ -9,12 +11,16 @@ from messages.parser import parse_msg
 from messages.sender import send2tg
 from messages.utils import equal_prefix, startswith_prefix
 from price.binance import get_binance_price
-from price.coinmarketcap import get_cmc_price
+from price.coinmarketcap import cmc_convert_price, get_cmc_price
 
 HELP = f"""
 💵**查询价格**
-使用方式: `{PREFIX.PRICE}` + symbol
-示例: `{PREFIX.PRICE} BTC` 查询比特币价格 (大小写均可)
+示例: (Symbol大小写均可)
+1. `{PREFIX.PRICE} BTC` 查询比特币价格
+2. `{PREFIX.PRICE} BTC CNY` 计算1枚BTC的CNY价值
+3. `{PREFIX.PRICE} USD CNY` 计算USD与CNY的汇率
+4. `{PREFIX.PRICE} 1.5 BTC` 计算1.5枚BTC的USD价值
+5. `{PREFIX.PRICE} 1.5 BTC CNY` 计算1.5枚BTC的CNY价值
 """
 
 
@@ -31,13 +37,33 @@ async def get_asset_price(client: Client, message: Message, **kwargs) -> None:
 
     if not startswith_prefix(info["text"], prefix=[PREFIX.PRICE]):
         return
+    text = info["text"].removeprefix(PREFIX.PRICE).strip()
 
-    # find the asset name
-    asset = info["text"].removeprefix(PREFIX.PRICE).strip()
+    # these patterns should use CoinMarketCap API
+    pattern_1 = r"^([\d.]+)\s+([A-Za-z]+)\s+([A-Za-z]+)"  # match "1.5 BTC CNY"
+    pattern_2 = r"^([\d.]+)\s+([A-Za-z]+)"  # match "1.5 BTC"
+    pattern_3 = r"^([A-Za-z]+)\s+([A-Za-z]+)"  # match "BTC CNY"
+    amount, base, quote = 0, "", ""
+    if matched := re.search(pattern_1, text, re.IGNORECASE):
+        amount = float(matched.group(1))
+        base = matched.group(2)
+        quote = matched.group(3)
+    elif matched := re.search(pattern_2, text, re.IGNORECASE):
+        amount = float(matched.group(1))
+        base = matched.group(2)
+        quote = "USD"
+    elif matched := re.search(pattern_3, text, re.IGNORECASE):
+        amount = 1
+        base = matched.group(1)
+        quote = matched.group(2)
+    if amount > 0 and base and quote and (msg := await cmc_convert_price(amount, base, quote)):
+        await send2tg(client, message, texts=msg, **kwargs)
+        return
 
-    if res := await get_binance_price(asset):
+    # match "BTC"
+    if res := await get_binance_price(text):
         await send2tg(client, message, **res, **kwargs)
-    elif res := await get_cmc_price(asset):
+    elif res := await get_cmc_price(text):
         await send2tg(client, message, texts=res, **kwargs)
     else:
-        await send2tg(client, message, texts=f"不支持此Symbol: {asset}", **kwargs)
+        await send2tg(client, message, texts=f"不支持此Symbol: {text}\n\n{HELP}", **kwargs)
src/handler.py
@@ -281,6 +281,8 @@ def get_social_media_help(cmd_prefix: list[str] | None = None, ignore_prefix: li
         msg += f"\n⏬**下载文件**: `{PREFIX.WGET}` + URL"
     if ENABLE.OCR:
         msg += f"\n🔤**图片转文字**: `{PREFIX.OCR}` 回复图片消息"
+    if ENABLE.PRICE:
+        msg += f"\n💵**查询价格**: `{PREFIX.PRICE}` + Symbol"
     if ENABLE.COMBINATION:
         msg += f"\n💬**合并历史**: `{PREFIX.COMBINATION} #N` 合并最近N条对话历史"
     if ENABLE.AI_SUMMARY: