main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from datetime import datetime
  4from zoneinfo import ZoneInfo
  5
  6from glom import Coalesce, glom
  7from loguru import logger
  8
  9from config import PROXY, TOKEN, TZ, cache
 10from networking import hx_req
 11from utils import number
 12
 13HEADERS = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
 14
 15
 16@cache.memoize(ttl=28800)  # 8 hours
 17async def get_cmc_coins() -> dict:
 18    """Get all coins from CoinMarketCap.
 19
 20    Returns:
 21        dict: {coin: slug} (e.g. {"BTC": "bitcoin"})
 22        All coins are in uppercase. The slugs are in lowercase.
 23    """
 24    if not TOKEN.CMC_API_KEY:
 25        logger.warning("CoinMarketCap API key is not set.")
 26        return {}
 27    logger.info("Fetching CoinMarketCap coins...")
 28    url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/map"
 29    params = {"limit": "5000", "sort": "cmc_rank", "aux": "status"}
 30    response = await hx_req(url, params=params, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0}, silent=True)
 31    if response.get("hx_error"):
 32        return {}
 33    data = response["data"]
 34    return {coin["symbol"]: coin["slug"] for coin in data}
 35
 36
 37@cache.memoize(ttl=7200)
 38async def get_cmc_fiat() -> dict[str, tuple[str, str]]:
 39    """Get all fiat from CoinMarketCap.
 40
 41    Returns: {
 42                "USD": ("United States Dollar", "2781"),  # symbol: (name, ID)
 43            }
 44    """
 45    if not TOKEN.CMC_API_KEY:
 46        logger.warning("CoinMarketCap API key is not set.")
 47        return {}
 48    url = "https://pro-api.coinmarketcap.com/v1/fiat/map"
 49    response = await hx_req(url, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0}, silent=True)
 50    if response.get("hx_error"):
 51        return {}
 52    data = response["data"]
 53    return {coin["symbol"]: (coin["name"], coin["id"]) for coin in data}
 54
 55
 56async def cmc_supported(coin: str, fiat: str = "USD") -> dict:
 57    """Check if the coin is supported by CoinMarketCap.
 58
 59    If supported, returns a dict which is needed to pass to CMC API.
 60    Some shitcoins use the common name as the symbol. (e.g. "bitcoin")
 61    So we determine wether we should use "symbol" or "slug" base on the coin rank
 62    """
 63    cmc_coins = await get_cmc_coins()
 64    if coin.upper() not in cmc_coins and coin.lower() not in cmc_coins.values():
 65        return {}
 66    symbol_index = 10e8
 67    slug_index = 10e8
 68    if coin.upper() in cmc_coins:
 69        symbol_index = list(cmc_coins.keys()).index(coin.upper())
 70    if coin.lower() in cmc_coins.values():
 71        slug_index = list(cmc_coins.values()).index(coin.lower())
 72    return {"symbol": coin.upper(), "convert": fiat} if symbol_index <= slug_index else {"slug": coin.lower(), "convert": fiat}
 73
 74
 75@cache.memoize(ttl=60)
 76async def get_cmc_price(coin: str, fiat: str = "USD") -> str:
 77    """Get the price of a crypto asset from CoinMarketCap."""
 78    params = await cmc_supported(coin, fiat)
 79    if not params:
 80        return ""
 81    url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest"
 82    response = await hx_req(url, params=params, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0})
 83    data = glom(response, "data.*", default=[{}])[0]
 84    if not data:
 85        return f"CoinMarketCap price failed: {coin}"
 86    stats = data["quote"][fiat]
 87    precision = 2 if float(stats["price"]) > 1 else 6
 88    emoji = lambda x: "🟢" if float(x) > 0 else "🔴"
 89    msg = f"🪙**{data['symbol']}** ({data['name']})\n"
 90    msg += f"💵价格: ${number(stats['price'], precision)}\n"
 91    msg += f"{emoji(stats['percent_change_1h'])}1h涨跌: {stats['percent_change_1h']:+.2f}%\n"
 92    msg += f"{emoji(stats['percent_change_24h'])}24h涨跌: {stats['percent_change_24h']:+.2f}%\n"
 93    msg += f"{emoji(stats['percent_change_30d'])}30d涨跌: {stats['percent_change_30d']:+.2f}%\n"
 94    msg += f"{emoji(stats['percent_change_60d'])}60d涨跌: {stats['percent_change_60d']:+.2f}%\n"
 95    msg += f"{emoji(stats['percent_change_90d'])}90d涨跌: {stats['percent_change_90d']:+.2f}%\n"
 96    date = datetime.fromisoformat(data["last_updated"]).astimezone(ZoneInfo(TZ))
 97    msg += f"🕒{date:%Y-%m-%d %H:%M}\n"
 98    msg += f"📡[CoinMarketCap](https://coinmarketcap.com/currencies/{data['slug']})\n"
 99    return msg.strip()
100
101
102async def cmc_convert_price(amount: float | str, base: str, quote: str) -> str:
103    """Convert asset price.
104
105    In this function, we prefer to use fiat conversion.
106    For example, "TWD" is some crypto's symbol, but it's also a fiat currency (NT$).
107    """
108    cmc_fiat = await get_cmc_fiat()
109    cmc_coins = await get_cmc_coins()
110    all_coins = list(cmc_fiat.keys()) + list(cmc_coins.keys())
111    base = base.upper()
112    quote = quote.upper()
113    if base not in all_coins or quote not in all_coins:
114        return f"不支持转换: {amount} {base}{quote}\n支持的法币:\n{', '.join(sorted(cmc_fiat))}"
115
116    params = {}
117    if base in cmc_fiat:
118        base_name = cmc_fiat[base][0]  # e.g. United States Dollar
119        params["id"] = cmc_fiat[base][1]
120    else:
121        base_name = cmc_coins.get(base, "").capitalize()  # slug, e.g. Bitcoin
122        params["symbol"] = base
123    if quote in cmc_fiat:
124        quote_name = cmc_fiat[quote][0]
125        params["convert_id"] = cmc_fiat[quote][1]
126    else:
127        quote_name = cmc_coins.get(quote, "").capitalize()  # slug, e.g. Bitcoin
128        params["convert"] = quote
129    params["amount"] = float(amount)
130    url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
131    response = await hx_req(url, params=params, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0})
132    try:
133        # if base and quote are both passed as id, the response["data"] is a dict, otherwise it's a list
134        data = glom(response, Coalesce("data.quote.*", "data.0.quote.*"))[0]
135    except Exception as e:
136        logger.error(e)
137        return f"转换失败: {amount} {base}{quote}\n{e}"
138    precision = 2 if float(data["price"]) > 1 else 6
139    msg = f"🔄{number(amount)} **{base}** = {number(data['price'], precision)} **{quote}**\n"
140    msg += f"🔄{base_name}{quote_name}\n"
141    date = datetime.fromisoformat(data["last_updated"]).astimezone(ZoneInfo(TZ))
142    msg += f"🕒{date:%Y-%m-%d %H:%M}\n"
143    msg += "📡CoinMarketCap\n"
144    return msg.strip()