main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from collections import defaultdict
  4from datetime import UTC, datetime
  5from decimal import Decimal
  6from zoneinfo import ZoneInfo
  7
  8import anyio
  9from loguru import logger
 10
 11from config import DANMU, DOWNLOAD_DIR, PROXY, TZ
 12from danmu.utils import get_bearer_token, live_date
 13from messages.progress import modify_progress
 14from networking import hx_req
 15from others.emoji import CURRENCY
 16from utils import number
 17
 18
 19async def query_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
 20    """从远程数据库获取记录.
 21
 22    Returns:
 23        {"paths": list[str], "count": int}
 24    """
 25    if not dates:
 26        return {}
 27    paths = []
 28    total_count = 0
 29    queried_dates = []
 30    for date in sorted(dates, reverse=True):
 31        api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
 32        headers = {"Authorization": f"Bearer {await get_bearer_token()}", "X-schema": date[:4]}
 33        payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
 34        if user and qtype == "弹幕":
 35            payload["authorName"] = user
 36        if keyword:
 37            payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
 38        payload["page"] = 1
 39        logger.debug(f"Query {qtype}: {payload}")
 40        resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
 41        queried_dates.append(date)
 42        parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
 43        count = parsed.get("count", 0)
 44        if count == 0:
 45            continue
 46        total_count += count
 47        texts = parsed.get("texts", "")
 48        await modify_progress(text=caption + f"\n🔍查询时间: {''.join(queried_dates)}\n⏳匹配{qtype}数: {total_count}", force_update=True, **kwargs)
 49        while len(resp.get("data", [])) == payload["limit"] and parsed.get("count", 0):
 50            payload["page"] += 1
 51            logger.debug(f"Query {qtype}: {payload}")
 52            resp = await hx_req(api_url, "POST", headers=headers, data=payload, proxy=PROXY.DANMU, check_kv={"code": 0}, silent=True)
 53            parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
 54            total_count += parsed.get("count", 0)
 55            texts += parsed.get("texts", "")
 56            await modify_progress(text=caption + f"\n⏳已匹配 {total_count}{qtype}", **kwargs)
 57        del parsed
 58        save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
 59        async with await anyio.open_file(save_path, "w") as f:
 60            await f.write(texts.strip())
 61        paths.append(save_path)
 62    return {"paths": paths, "count": total_count}
 63
 64
 65async def parse_from_server(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
 66    """解析从远程数据库获取的记录.
 67
 68    日期从新到旧, 数据从旧到新
 69    """
 70    texts = ""
 71    count = 0
 72    text_key = "message" if qtype == "弹幕" else "content"
 73    if keyword:
 74        data = [x for x in data if keyword in x.get(text_key, "")]
 75    # deduplicate
 76    added = set()
 77    deduplicated = []
 78    for x in data:
 79        if f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}" not in added:
 80            deduplicated.append(x)
 81            added.add(f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}")
 82    data = deduplicated
 83
 84    logged_date = []
 85    if qtype == "发言":
 86        data = [x for x in data if x.get("liveDate") and x.get("startTime")]
 87        # group data by dates, and normalize startTime
 88        grouped_data = defaultdict(list)
 89        for x in data:
 90            start_time = x["startTime"].split(",")[0]  # 3:53:23 or 03:53:23
 91            if len(start_time.split(":")[0]) == 1:
 92                x["startTime"] = f"0{start_time}"  # 03:53:23
 93            else:
 94                x["startTime"] = start_time
 95            grouped_data[x["liveDate"][:10]].append(x)
 96
 97        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
 98            for x in sorted(date_data, key=lambda x: x["startTime"]):  # 数据从旧到新
 99                # only show the day once
100                if norm_date not in logged_date:
101                    stream_date = await live_date(norm_date)
102                    day = f"\n开播日期: {stream_date}\n"
103                    logged_date.append(norm_date)
104                else:
105                    day = ""
106                texts += f"\n{day}{x['startTime']}: {x['content'].strip()}"
107                count += 1
108    else:
109        hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
110        if user:
111            data = [x for x in data if x.get("authorName", "") == user]
112        if keyword:
113            data = [x for x in data if keyword in x.get("message", "")]
114        data = [x for x in data if x.get("timestamp")]  # ensure timestamp
115        # group data by dates, and normalize startTime
116        grouped_data = defaultdict(list)
117        for x in data:
118            grouped_data[x["liveDate"][:10]].append(x)
119
120        for norm_date, date_data in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
121            for x in sorted(date_data, key=lambda x: x["timestamp"]):  # 数据从旧到新
122                dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
123                sc_amount = ""
124                if super_chat := x.get("scAmount"):
125                    currency, amount = super_chat.split(" ")
126                    super_chats[currency] += Decimal(amount)
127                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
128                msg = x.get("message", "")
129                username = "" if hide_name else "|" + x.get("authorName", "")
130                # only show the day once
131                if norm_date not in logged_date:
132                    stream_date = await live_date(norm_date)
133                    day = f"\n开播日期: {stream_date}\n"
134                    logged_date.append(norm_date)
135                else:
136                    day = ""
137                texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
138                count += 1
139
140    return {"texts": texts.rstrip(), "count": count}