main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from collections import defaultdict
4from datetime import datetime, timedelta
5from decimal import Decimal
6
7import anyio
8from glom import glom
9from loguru import logger
10
11from config import DOWNLOAD_DIR, TZ, cache, cutter
12from danmu.utils import TURSO_KWARGS
13from database.turso import turso_exec, turso_parse_resp
14from messages.progress import modify_progress
15from others.emoji import CURRENCY
16from preview.youtube import get_youtube_channel_name_by_handle
17from utils import nowstr, number
18
19
20async def query_turso(match_time: str, user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, *, parse_handle: bool = True, **kwargs) -> dict:
21 """从Turso获取记录.
22
23 Returns:
24 {"paths": list[str], "count": int}
25 """
26 begin = "1970-01-01 00:00:00"
27 end = nowstr(TZ)
28 if match_time:
29 if len(match_time) == 4: # 2025
30 begin = f"{match_time}-01-01 00:00:00"
31 end = f"{match_time}-12-31 23:59:59"
32 elif len(match_time) == 7: # 2025-01
33 begin = f"{match_time}-01 00:00:00"
34 end = f"{match_time}-31 23:59:59"
35 elif len(match_time) == 10: # 2025-01-01
36 begin = f"{match_time} 00:00:00"
37 end = f"{match_time} 23:59:59"
38 texts_to_match = ""
39 if keyword:
40 segmented = " ".join(cutter.cutword(keyword))
41 texts_to_match = keyword if segmented == keyword else f'"{keyword}" OR "{segmented}"' # must use double quotes for inner part
42
43 if qtype == "发言": # 发言必须指定keyword
44 sql = f"SELECT T.time, T.content FROM 发言 AS T JOIN fts_发言 AS FTS ON T.rowid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}'"
45 if match_time:
46 sql += f" AND T.time >= '{begin}' AND T.time <= '{end}'"
47 logger.info(sql)
48 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
49 parsed = await parse_from_turso(turso_parse_resp(resp), user, keyword, super_chats, qtype)
50 count = parsed.get("count", 0)
51 await modify_progress(text=caption + f"\n⏳已匹配 {count} 条{qtype}", force_update=True, **kwargs)
52 texts = parsed.get("texts", "")
53 save_path = f"{DOWNLOAD_DIR}/{user}-{match_time}-{keyword}-{qtype}.txt".replace("--", "-")
54 async with await anyio.open_file(save_path, "w") as f:
55 await f.write(texts.strip())
56 else:
57 conditions = []
58 if match_time:
59 conditions.append(f"T.time >= '{begin}' AND T.time <= '{end}'" if keyword else f"time >= '{begin}' AND time <= '{end}'")
60 if user:
61 conditions.append(f"T.fullname = '{user}'" if keyword else f"fullname = '{user}'")
62 if keyword:
63 sql = f"SELECT T.time,T.fullname,T.content,T.superchat,T.uid FROM 弹幕 AS T JOIN fts_弹幕 AS FTS ON T.rowid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}'"
64 if conditions:
65 sql += " AND " + " AND ".join(conditions)
66 else:
67 cond = " AND ".join(conditions)
68 sql = f"SELECT time,fullname,content,superchat,uid FROM 弹幕 WHERE {cond}"
69 logger.info(sql)
70 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
71 parsed = await parse_from_turso(turso_parse_resp(resp), user, keyword, super_chats, qtype)
72 count = parsed.get("count", 0)
73 if count == 0 and user and parse_handle and (channel_name := await get_youtube_channel_name_by_handle(user)):
74 await modify_progress(text=caption + f"\n⚠️未匹配到“{user}”的{qtype}\n🔍尝试使用“{channel_name}”查询", force_update=True, **kwargs)
75 return await query_turso(match_time, channel_name, keyword, caption, super_chats, qtype, parse_handle=False, **kwargs)
76
77 await modify_progress(text=caption + f"\n⏳已匹配 {count} 条{qtype}", force_update=True, **kwargs)
78 texts = parsed.get("texts", "")
79 save_path = f"{DOWNLOAD_DIR}/{user}-{match_time}-{keyword}-{qtype}.txt".replace("--", "-")
80 async with await anyio.open_file(save_path, "w") as f:
81 await f.write(texts.strip())
82 return {"paths": [save_path], "count": count, "user": user}
83
84
85async def parse_from_turso(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
86 """解析从Turso获取的记录.
87
88 日期从新到旧, 数据从旧到新
89
90 COLUMNS = {
91 "发言": "time TEXT, content TEXT, segmented TEXT",
92 "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT, uid TEXT, segmented TEXT",
93 }
94 """
95 # ruff: noqa: PLW2901
96 # group by dates
97 grouped_data = defaultdict(list) # {date: list[dict]}
98 for x in data:
99 grouped_data[x["time"][:10]].append(x)
100
101 texts = ""
102 count = 0
103 for date, items in sorted(grouped_data.items(), reverse=True): # 日期从新到旧
104 if keyword:
105 items = [x for x in items if keyword in x.get("content", "")]
106 if user and qtype == "弹幕":
107 items = [x for x in items if x.get("fullname", "") == user]
108
109 # deduplicate
110 added = set()
111 deduplicated = []
112 for x in items:
113 if f"{x['time']}{x['content']}{x.get('fullname')}" not in added:
114 deduplicated.append(x)
115 added.add(f"{x['time']}{x['content']}{x.get('fullname')}")
116 for idx, x in enumerate(sorted(deduplicated, key=lambda x: x["time"])): # 数据从旧到新
117 # only show the day once
118 day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
119 if qtype == "发言":
120 texts += f"\n{day}{x['time'][11:]}: {x['content'].strip()}"
121 else:
122 hide_name = bool(user) # 当指定过滤user时, 隐藏用户名
123 sc_amount = ""
124 if super_chat := x.get("superchat"):
125 currency, amount = super_chat.split(" ")
126 super_chats[currency] += Decimal(amount)
127 sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
128 msg = x.get("content", "")
129 username = "" if hide_name else "|" + x.get("fullname", "")
130 texts += f"\n{day}{x['time'][11:]}{username}{sc_amount}: {msg}"
131 count += 1
132 return {"texts": texts.rstrip(), "count": count}
133
134
135@cache.memoize(ttl=60)
136async def get_liveinfo() -> dict:
137 resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
138 return {x["liveDate"]: x for x in turso_parse_resp(resp)}
139
140
141async def live_date(date: str) -> str:
142 """Use fuzzy match to get live date.
143
144 有时候过了凌晨的发言, 会被认为是第二天的数据
145 如果匹配不到发言日期, 使用相邻日期进行匹配
146
147 Args:
148 date: The date to match, in the format of "YYYY-MM-DD".
149
150 Returns:
151 str:
152 """
153 liveinfo = await get_liveinfo()
154 dt = datetime.strptime(date, "%Y-%m-%d") # noqa: DTZ007
155 possible_dates = [date, (dt - timedelta(days=1)).strftime("%Y-%m-%d"), (dt + timedelta(days=1)).strftime("%Y-%m-%d")]
156 for d in possible_dates:
157 if matched := [v for k, v in liveinfo.items() if k[:10] == d]:
158 titles = [x.get("title", "") for x in matched]
159 urls = [x.get("url", "") for x in matched]
160 markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
161 texts = d + "\n" + "\n".join(markdown)
162 return texts.rstrip()
163 return date
164
165
166async def get_uids_by_name(name: str, queried_names: set[str] | None = None) -> list[str]:
167 """Get uids by name."""
168 logger.debug(f"Querying name: {name}, queried_names: {queried_names}")
169 queried_names = queried_names or {name}
170 resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT uid FROM userinfo WHERE fullname = '{name}';"}}], silent=True, **TURSO_KWARGS)
171 uids = glom(turso_parse_resp(resp), "*.uid")
172 if len(uids) <= 1:
173 logger.success(f"Found uid: {uids} for name: {name}")
174 return uids
175 logger.info(f"Found uids: {uids} for name: {name}")
176 # 递归查询
177 matched_uids = uids
178 for uid in uids:
179 logger.debug(f"Querying names for uid: {uid}")
180 resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT fullname FROM userinfo WHERE uid = '{uid}';"}}], silent=True, **TURSO_KWARGS)
181 names = glom(turso_parse_resp(resp), "*.fullname")
182 logger.info(f"Found names: {names} for uid: {uid}")
183 for n in names:
184 if n in queried_names:
185 continue
186 logger.debug(f"Querying uid for name: {n}")
187 matched_uids.extend(await get_uids_by_name(n, queried_names))
188 queried_names.add(n)
189 return matched_uids