main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from datetime import datetime
4from zoneinfo import ZoneInfo
5
6from glom import Coalesce, glom
7from loguru import logger
8
9from config import PROXY, TOKEN, TZ, cache
10from networking import hx_req
11from utils import number
12
13HEADERS = {"Accepts": "application/json", "X-CMC_PRO_API_KEY": TOKEN.CMC_API_KEY}
14
15
16@cache.memoize(ttl=28800) # 8 hours
17async def get_cmc_coins() -> dict:
18 """Get all coins from CoinMarketCap.
19
20 Returns:
21 dict: {coin: slug} (e.g. {"BTC": "bitcoin"})
22 All coins are in uppercase. The slugs are in lowercase.
23 """
24 if not TOKEN.CMC_API_KEY:
25 logger.warning("CoinMarketCap API key is not set.")
26 return {}
27 logger.info("Fetching CoinMarketCap coins...")
28 url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/map"
29 params = {"limit": "5000", "sort": "cmc_rank", "aux": "status"}
30 response = await hx_req(url, params=params, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0}, silent=True)
31 if response.get("hx_error"):
32 return {}
33 data = response["data"]
34 return {coin["symbol"]: coin["slug"] for coin in data}
35
36
37@cache.memoize(ttl=7200)
38async def get_cmc_fiat() -> dict[str, tuple[str, str]]:
39 """Get all fiat from CoinMarketCap.
40
41 Returns: {
42 "USD": ("United States Dollar", "2781"), # symbol: (name, ID)
43 }
44 """
45 if not TOKEN.CMC_API_KEY:
46 logger.warning("CoinMarketCap API key is not set.")
47 return {}
48 url = "https://pro-api.coinmarketcap.com/v1/fiat/map"
49 response = await hx_req(url, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0}, silent=True)
50 if response.get("hx_error"):
51 return {}
52 data = response["data"]
53 return {coin["symbol"]: (coin["name"], coin["id"]) for coin in data}
54
55
56async def cmc_supported(coin: str, fiat: str = "USD") -> dict:
57 """Check if the coin is supported by CoinMarketCap.
58
59 If supported, returns a dict which is needed to pass to CMC API.
60 Some shitcoins use the common name as the symbol. (e.g. "bitcoin")
61 So we determine wether we should use "symbol" or "slug" base on the coin rank
62 """
63 cmc_coins = await get_cmc_coins()
64 if coin.upper() not in cmc_coins and coin.lower() not in cmc_coins.values():
65 return {}
66 symbol_index = 10e8
67 slug_index = 10e8
68 if coin.upper() in cmc_coins:
69 symbol_index = list(cmc_coins.keys()).index(coin.upper())
70 if coin.lower() in cmc_coins.values():
71 slug_index = list(cmc_coins.values()).index(coin.lower())
72 return {"symbol": coin.upper(), "convert": fiat} if symbol_index <= slug_index else {"slug": coin.lower(), "convert": fiat}
73
74
75@cache.memoize(ttl=60)
76async def get_cmc_price(coin: str, fiat: str = "USD") -> str:
77 """Get the price of a crypto asset from CoinMarketCap."""
78 params = await cmc_supported(coin, fiat)
79 if not params:
80 return ""
81 url = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest"
82 response = await hx_req(url, params=params, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0})
83 data = glom(response, "data.*", default=[{}])[0]
84 if not data:
85 return f"CoinMarketCap price failed: {coin}"
86 stats = data["quote"][fiat]
87 precision = 2 if float(stats["price"]) > 1 else 6
88 emoji = lambda x: "🟢" if float(x) > 0 else "🔴"
89 msg = f"🪙**{data['symbol']}** ({data['name']})\n"
90 msg += f"💵价格: ${number(stats['price'], precision)}\n"
91 msg += f"{emoji(stats['percent_change_1h'])}1h涨跌: {stats['percent_change_1h']:+.2f}%\n"
92 msg += f"{emoji(stats['percent_change_24h'])}24h涨跌: {stats['percent_change_24h']:+.2f}%\n"
93 msg += f"{emoji(stats['percent_change_30d'])}30d涨跌: {stats['percent_change_30d']:+.2f}%\n"
94 msg += f"{emoji(stats['percent_change_60d'])}60d涨跌: {stats['percent_change_60d']:+.2f}%\n"
95 msg += f"{emoji(stats['percent_change_90d'])}90d涨跌: {stats['percent_change_90d']:+.2f}%\n"
96 date = datetime.fromisoformat(data["last_updated"]).astimezone(ZoneInfo(TZ))
97 msg += f"🕒{date:%Y-%m-%d %H:%M}\n"
98 msg += f"📡[CoinMarketCap](https://coinmarketcap.com/currencies/{data['slug']})\n"
99 return msg.strip()
100
101
102async def cmc_convert_price(amount: float | str, base: str, quote: str) -> str:
103 """Convert asset price.
104
105 In this function, we prefer to use fiat conversion.
106 For example, "TWD" is some crypto's symbol, but it's also a fiat currency (NT$).
107 """
108 cmc_fiat = await get_cmc_fiat()
109 cmc_coins = await get_cmc_coins()
110 all_coins = list(cmc_fiat.keys()) + list(cmc_coins.keys())
111 base = base.upper()
112 quote = quote.upper()
113 if base not in all_coins or quote not in all_coins:
114 return f"不支持转换: {amount} {base} → {quote}\n支持的法币:\n{', '.join(sorted(cmc_fiat))}"
115
116 params = {}
117 if base in cmc_fiat:
118 base_name = cmc_fiat[base][0] # e.g. United States Dollar
119 params["id"] = cmc_fiat[base][1]
120 else:
121 base_name = cmc_coins.get(base, "").capitalize() # slug, e.g. Bitcoin
122 params["symbol"] = base
123 if quote in cmc_fiat:
124 quote_name = cmc_fiat[quote][0]
125 params["convert_id"] = cmc_fiat[quote][1]
126 else:
127 quote_name = cmc_coins.get(quote, "").capitalize() # slug, e.g. Bitcoin
128 params["convert"] = quote
129 params["amount"] = float(amount)
130 url = "https://pro-api.coinmarketcap.com/v2/tools/price-conversion"
131 response = await hx_req(url, params=params, headers=HEADERS, proxy=PROXY.CRYPTO, check_keys=["data"], check_kv={"status.error_code": 0})
132 try:
133 # if base and quote are both passed as id, the response["data"] is a dict, otherwise it's a list
134 data = glom(response, Coalesce("data.quote.*", "data.0.quote.*"))[0]
135 except Exception as e:
136 logger.error(e)
137 return f"转换失败: {amount} {base} → {quote}\n{e}"
138 precision = 2 if float(data["price"]) > 1 else 6
139 msg = f"🔄{number(amount)} **{base}** = {number(data['price'], precision)} **{quote}**\n"
140 msg += f"🔄{base_name} → {quote_name}\n"
141 date = datetime.fromisoformat(data["last_updated"]).astimezone(ZoneInfo(TZ))
142 msg += f"🕒{date:%Y-%m-%d %H:%M}\n"
143 msg += "📡CoinMarketCap\n"
144 return msg.strip()