main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from collections import defaultdict
  4from datetime import datetime, timedelta
  5from decimal import Decimal
  6
  7import anyio
  8from glom import glom
  9from loguru import logger
 10
 11from config import DOWNLOAD_DIR, TZ, cache, cutter
 12from danmu.utils import TURSO_KWARGS
 13from database.turso import turso_exec, turso_parse_resp
 14from messages.progress import modify_progress
 15from others.emoji import CURRENCY
 16from preview.youtube import get_youtube_channel_name_by_handle
 17from utils import nowstr, number
 18
 19
 20async def query_turso(match_time: str, user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, *, parse_handle: bool = True, **kwargs) -> dict:
 21    """从Turso获取记录.
 22
 23    Returns:
 24        {"paths": list[str], "count": int}
 25    """
 26    begin = "1970-01-01 00:00:00"
 27    end = nowstr(TZ)
 28    if match_time:
 29        if len(match_time) == 4:  # 2025
 30            begin = f"{match_time}-01-01 00:00:00"
 31            end = f"{match_time}-12-31 23:59:59"
 32        elif len(match_time) == 7:  # 2025-01
 33            begin = f"{match_time}-01 00:00:00"
 34            end = f"{match_time}-31 23:59:59"
 35        elif len(match_time) == 10:  # 2025-01-01
 36            begin = f"{match_time} 00:00:00"
 37            end = f"{match_time} 23:59:59"
 38    texts_to_match = ""
 39    if keyword:
 40        segmented = " ".join(cutter.cutword(keyword))
 41        texts_to_match = keyword if segmented == keyword else f'"{keyword}" OR "{segmented}"'  # must use double quotes for inner part
 42
 43    if qtype == "发言":  # 发言必须指定keyword
 44        sql = f"SELECT T.time, T.content FROM 发言 AS T JOIN fts_发言 AS FTS ON T.rowid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}'"
 45        if match_time:
 46            sql += f" AND T.time >= '{begin}' AND T.time <= '{end}'"
 47        logger.info(sql)
 48        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
 49        parsed = await parse_from_turso(turso_parse_resp(resp), user, keyword, super_chats, qtype)
 50        count = parsed.get("count", 0)
 51        await modify_progress(text=caption + f"\n⏳已匹配 {count}{qtype}", force_update=True, **kwargs)
 52        texts = parsed.get("texts", "")
 53        save_path = f"{DOWNLOAD_DIR}/{user}-{match_time}-{keyword}-{qtype}.txt".replace("--", "-")
 54        async with await anyio.open_file(save_path, "w") as f:
 55            await f.write(texts.strip())
 56    else:
 57        conditions = []
 58        if match_time:
 59            conditions.append(f"T.time >= '{begin}' AND T.time <= '{end}'" if keyword else f"time >= '{begin}' AND time <= '{end}'")
 60        if user:
 61            conditions.append(f"T.fullname = '{user}'" if keyword else f"fullname = '{user}'")
 62        if keyword:
 63            sql = f"SELECT T.time,T.fullname,T.content,T.superchat,T.uid FROM 弹幕 AS T JOIN fts_弹幕 AS FTS ON T.rowid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}'"
 64            if conditions:
 65                sql += " AND " + " AND ".join(conditions)
 66        else:
 67            cond = " AND ".join(conditions)
 68            sql = f"SELECT time,fullname,content,superchat,uid FROM 弹幕 WHERE {cond}"
 69        logger.info(sql)
 70        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
 71        parsed = await parse_from_turso(turso_parse_resp(resp), user, keyword, super_chats, qtype)
 72        count = parsed.get("count", 0)
 73        if count == 0 and user and parse_handle and (channel_name := await get_youtube_channel_name_by_handle(user)):
 74            await modify_progress(text=caption + f"\n⚠️未匹配到“{user}”的{qtype}\n🔍尝试使用“{channel_name}”查询", force_update=True, **kwargs)
 75            return await query_turso(match_time, channel_name, keyword, caption, super_chats, qtype, parse_handle=False, **kwargs)
 76
 77        await modify_progress(text=caption + f"\n⏳已匹配 {count}{qtype}", force_update=True, **kwargs)
 78        texts = parsed.get("texts", "")
 79        save_path = f"{DOWNLOAD_DIR}/{user}-{match_time}-{keyword}-{qtype}.txt".replace("--", "-")
 80        async with await anyio.open_file(save_path, "w") as f:
 81            await f.write(texts.strip())
 82    return {"paths": [save_path], "count": count, "user": user}
 83
 84
 85async def parse_from_turso(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
 86    """解析从Turso获取的记录.
 87
 88    日期从新到旧, 数据从旧到新
 89
 90    COLUMNS = {
 91    "发言": "time TEXT, content TEXT, segmented TEXT",
 92    "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT, uid TEXT, segmented TEXT",
 93    }
 94    """
 95    # ruff: noqa: PLW2901
 96    # group by dates
 97    grouped_data = defaultdict(list)  # {date: list[dict]}
 98    for x in data:
 99        grouped_data[x["time"][:10]].append(x)
100
101    texts = ""
102    count = 0
103    for date, items in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
104        if keyword:
105            items = [x for x in items if keyword in x.get("content", "")]
106        if user and qtype == "弹幕":
107            items = [x for x in items if x.get("fullname", "") == user]
108
109        # deduplicate
110        added = set()
111        deduplicated = []
112        for x in items:
113            if f"{x['time']}{x['content']}{x.get('fullname')}" not in added:
114                deduplicated.append(x)
115                added.add(f"{x['time']}{x['content']}{x.get('fullname')}")
116        for idx, x in enumerate(sorted(deduplicated, key=lambda x: x["time"])):  # 数据从旧到新
117            # only show the day once
118            day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
119            if qtype == "发言":
120                texts += f"\n{day}{x['time'][11:]}: {x['content'].strip()}"
121            else:
122                hide_name = bool(user)  # 当指定过滤user时, 隐藏用户名
123                sc_amount = ""
124                if super_chat := x.get("superchat"):
125                    currency, amount = super_chat.split(" ")
126                    super_chats[currency] += Decimal(amount)
127                    sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
128                msg = x.get("content", "")
129                username = "" if hide_name else "|" + x.get("fullname", "")
130                texts += f"\n{day}{x['time'][11:]}{username}{sc_amount}: {msg}"
131            count += 1
132    return {"texts": texts.rstrip(), "count": count}
133
134
135@cache.memoize(ttl=60)
136async def get_liveinfo() -> dict:
137    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
138    return {x["liveDate"]: x for x in turso_parse_resp(resp)}
139
140
141async def live_date(date: str) -> str:
142    """Use fuzzy match to get live date.
143
144    有时候过了凌晨的发言, 会被认为是第二天的数据
145    如果匹配不到发言日期, 使用相邻日期进行匹配
146
147    Args:
148        date: The date to match, in the format of "YYYY-MM-DD".
149
150    Returns:
151        str:
152    """
153    liveinfo = await get_liveinfo()
154    dt = datetime.strptime(date, "%Y-%m-%d")  # noqa: DTZ007
155    possible_dates = [date, (dt - timedelta(days=1)).strftime("%Y-%m-%d"), (dt + timedelta(days=1)).strftime("%Y-%m-%d")]
156    for d in possible_dates:
157        if matched := [v for k, v in liveinfo.items() if k[:10] == d]:
158            titles = [x.get("title", "") for x in matched]
159            urls = [x.get("url", "") for x in matched]
160            markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
161            texts = d + "\n" + "\n".join(markdown)
162            return texts.rstrip()
163    return date
164
165
166async def get_uids_by_name(name: str, queried_names: set[str] | None = None) -> list[str]:
167    """Get uids by name."""
168    logger.debug(f"Querying name: {name}, queried_names: {queried_names}")
169    queried_names = queried_names or {name}
170    resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT uid FROM userinfo WHERE fullname = '{name}';"}}], silent=True, **TURSO_KWARGS)
171    uids = glom(turso_parse_resp(resp), "*.uid")
172    if len(uids) <= 1:
173        logger.success(f"Found uid: {uids} for name: {name}")
174        return uids
175    logger.info(f"Found uids: {uids} for name: {name}")
176    # 递归查询
177    matched_uids = uids
178    for uid in uids:
179        logger.debug(f"Querying names for uid: {uid}")
180        resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT fullname FROM userinfo WHERE uid = '{uid}';"}}], silent=True, **TURSO_KWARGS)
181        names = glom(turso_parse_resp(resp), "*.fullname")
182        logger.info(f"Found names: {names} for uid: {uid}")
183        for n in names:
184            if n in queried_names:
185                continue
186            logger.debug(f"Querying uid for name: {n}")
187            matched_uids.extend(await get_uids_by_name(n, queried_names))
188            queried_names.add(n)
189    return matched_uids