Commit faf89f7
src/danmu/server.py
@@ -9,7 +9,7 @@ import anyio
from loguru import logger
from config import DANMU, DOWNLOAD_DIR, TZ
-from danmu.utils import live_date
+from danmu.utils import get_bearer_token, live_date
from messages.progress import modify_progress
from networking import hx_req
from others.emoji import CURRENCY
@@ -29,6 +29,7 @@ async def query_server(dates: list[str], user: str, keyword: str, caption: str,
queried_dates = []
for date in sorted(dates, reverse=True):
api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
+ headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
if user and qtype == "弹幕":
payload["authorName"] = user
@@ -36,7 +37,7 @@ async def query_server(dates: list[str], user: str, keyword: str, caption: str,
payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
payload["page"] = 1
logger.debug(f"Query {qtype}: {payload}")
- resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+ resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
queried_dates.append(date)
parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
count = parsed.get("count", 0)
@@ -48,7 +49,7 @@ async def query_server(dates: list[str], user: str, keyword: str, caption: str,
while len(resp.get("data", [])) == payload["limit"] and parsed.get("count", 0):
payload["page"] += 1
logger.debug(f"Query {qtype}: {payload}")
- resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+ resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
total_count += parsed.get("count", 0)
texts += parsed.get("texts", "")
src/danmu/sync.py
@@ -9,11 +9,11 @@ from glom import flatten, glom
from loguru import logger
from config import DANMU, TZ, cutter
-from danmu.utils import TURSO_KWARGS, merge_json, simplify_json
+from danmu.utils import TURSO_KWARGS, get_bearer_token, merge_json, simplify_json
from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
from networking import hx_req
-from utils import nowdt
+from utils import nowdt, strings_list
# date为yyyy-mm-dd标准字符串. liveDate是服务器端原始日期, 可能有后缀 ("2025-04-14_01", "2025-04-14_02")
LIVEINFO_COLUMNS = "date TEXT, title TEXT, url TEXT, 发言已完成 INTEGER DEFAULT 0, 弹幕已完成 INTEGER DEFAULT 0, liveDate TEXT PRIMARY KEY"
@@ -55,13 +55,14 @@ async def sync_server_to_turso(qtype: str) -> None:
db_liveinfo = {x["liveDate"]: x for x in turso_parse_resp(resp)}
# 开始同步
- monitor_years = [x.strip() for x in DANMU.SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.SYNC_FAYAN_YEARS.split(",") if x.strip()]
+ monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
monitor_years.append(str(nowdt(TZ).year))
monitor_years = sorted(set(monitor_years))
for year in monitor_years:
+ headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
- liveinfo_list: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True) # type: ignore
+ liveinfo_list: list[dict] = await hx_req(api, headers=headers, proxy=DANMU.PROXY, params=params, silent=True) # type: ignore
if glom(liveinfo_list, "hx_error", default=""): # API server is down
return
for liveinfo in sorted(liveinfo_list, key=lambda x: x["liveDate"]):
@@ -70,7 +71,7 @@ async def sync_server_to_turso(qtype: str) -> None:
continue
results = []
payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date}
- resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+ resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
if resp.get("count", 0) == 0:
continue
logger.trace(f"Query {qtype} date: {live_date} - {resp['count']} results")
@@ -82,6 +83,7 @@ async def sync_server_to_turso(qtype: str) -> None:
hx_req(
api_url,
"POST",
+ headers=headers,
data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": live_date},
proxy=DANMU.PROXY,
check_kv={"code": 0},
@@ -210,8 +212,9 @@ async def sync_server_to_r2(qtype: str) -> None:
results = []
for date in sorted(server_dates):
logger.trace(f"Query {qtype} date: {date}")
+ headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
- resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+ resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
if resp.get("count", 0) == 0:
continue
logger.trace(f"Query {qtype} date: {date} - {resp['count']} results")
@@ -223,6 +226,7 @@ async def sync_server_to_r2(qtype: str) -> None:
hx_req(
api_url,
"POST",
+ headers=headers,
data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
proxy=DANMU.PROXY,
check_kv={"code": 0},
@@ -245,16 +249,17 @@ async def sync_server_to_r2(qtype: str) -> None:
# 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
return
- monitor_years = [x.strip() for x in DANMU.SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.SYNC_FAYAN_YEARS.split(",") if x.strip()]
+ monitor_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
monitor_years.append(str(now.year))
monitor_years = sorted(set(monitor_years))
r2 = await list_cf_r2(prefix=prefix)
r2_dates = [x.removeprefix(prefix) for x in glom(r2, "Contents.*.Key", default=[])]
has_update = False
for year in monitor_years:
+ headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
- resp = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True)
+ resp = await hx_req(api, headers=headers, proxy=DANMU.PROXY, params=params, silent=True)
new_dates_map = defaultdict(list)
# some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
# norm dates to YYYY-MM-DD
src/danmu/utils.py
@@ -185,11 +185,27 @@ def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str,
return final_paths
+@cache.memoize(ttl=86400 * 7)
+async def get_bearer_token() -> str:
+ """获取bearer token."""
+ api_url = DANMU.BASE_URL + "/auth/login"
+ resp = await hx_req(
+ api_url,
+ "POST",
+ data={"userName": DANMU.AUTH_USER, "password": DANMU.AUTH_PASS},
+ proxy=DANMU.PROXY,
+ check_kv={"code": 200},
+ silent=True,
+ )
+ return resp.get("msg", "")
+
+
@cache.memoize(ttl=600)
async def get_live_info(year: str | int) -> dict:
params = {"liveDate": year}
api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
- resp: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True) # type: ignore
+ headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": year}
+ resp: list[dict] = await hx_req(api, headers=headers, proxy=DANMU.PROXY, params=params, silent=True) # type: ignore
if glom(resp, "hx_error", default=""): # API server is down
return {}
dates = {x["liveDate"][:10] for x in resp}
src/config.py
@@ -131,6 +131,8 @@ class DANMU:
BASE_URL = os.getenv("DANMU_BASE_URL", "") # Custom API, No docs
STREAMER = os.getenv("DANMU_STREAMER", "Streamer") # streamer name
PROXY = os.getenv("DANMU_PROXY", None) # socks5://127.0.0.1:7890
+ AUTH_USER = os.getenv("DANMU_AUTH_USER", "") # username for basic auth
+ AUTH_PASS = os.getenv("DANMU_AUTH_PASS", "") # password for basic auth
QUERY_METHOD = os.getenv("DANMU_QUERY_METHOD", "turso") # Turso or R2+API server
NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100")) # Number of items per query to API server
D1_DATABASE = os.getenv("DANMU_D1_DATABASE", "bennybot-danmu")