main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import re
4from collections import defaultdict
5from decimal import Decimal
6from pathlib import Path
7
8from pyrogram.client import Client
9from pyrogram.types import Message
10
11from config import DANMU, PREFIX, TZ
12from danmu.r2 import query_r2
13from danmu.turso import query_turso
14from danmu.utils import file_bytes, merge_txt_files, to_usd
15from messages.parser import parse_msg
16from messages.progress import modify_progress
17from messages.sender import send2tg
18from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
19from others.emoji import CURRENCY
20from publish import publish_telegraph
21from utils import convert_html, nowdt, number, strings_list
22
23HELP = f"""📖**查询直播合订本**
24`{PREFIX.DANMU}` 使用说明:
251.`{PREFIX.DANMU} + 日期`
262.`{PREFIX.DANMU} + @用户名` (区分大小写)
273.`{PREFIX.DANMU} + 关键词` (区分大小写)
284.以上可组合使用, 但日期必须放前面, 关键词必须放后面
29示例:
30- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
31- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
32- `{PREFIX.DANMU} 你好`: 查询包含“你好”关键词的弹幕
33- `{PREFIX.DANMU} 2025-01 @张三`: 查询2025年1月份用户【张三】的弹幕
34- `{PREFIX.DANMU} 2025-01-01 你好`: 查询2025-01-01日包含“你好”的弹幕
35- `{PREFIX.DANMU} 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的弹幕
36
37⚠️注意:
38- 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
39- 如果用户名中有空格, 请用**引号**包住用户名 (单双引号、中英引号皆可)。
40- 例如: 想指定用户为John Doe请使用 `@"John Doe"`
41
42`{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
43( `@用户名` 对于 `{PREFIX.FAYAN}` 命令无效)
44"""
45
46DANMU_TIPS = f"时间点为{TZ}时区"
47FAYAN_TIPS = "时间点并非真实时间, 而是相对开播时长"
48
49
50async def query_danmu(client: Client, message: Message, **kwargs):
51 info = parse_msg(message)
52 if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
53 return
54 if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
55 await send2tg(client, message, texts=HELP, **kwargs)
56 return
57 if not DANMU.BASE_URL:
58 await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
59 return
60
61 qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
62 match_time, user, keyword, error = parse_queries(info["text"], qtype)
63 if error:
64 await send2tg(client, message, texts=error, **kwargs)
65 return
66
67 user = user if qtype == "弹幕" else DANMU.STREAMER
68
69 caption = f"📖**{qtype}记录**:"
70 if match_time:
71 caption += f"\n🕒日期: {match_time}"
72 caption += f"\n👤用户: {user}"
73 caption += f"\n🔤关键词: {keyword}"
74
75 status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0] if kwargs.get("show_progress") else None
76 kwargs["progress"] = status_msg
77
78 super_chats = defaultdict(Decimal) # {"currency": amount}
79 if DANMU.QUERY_METHOD.lower() == "turso":
80 resp = await query_turso(match_time, user, keyword, caption, super_chats, qtype, **kwargs)
81 count = resp.get("count", 0)
82 paths = resp.get("paths", [])
83 user = resp.get("user", user)
84 else:
85 query_dates = await get_query_dates(match_time, qtype)
86 resp = await query_r2(query_dates, user, keyword, caption, super_chats, qtype, **kwargs)
87 paths = resp.get("paths", [])
88 count = resp.get("count", 0)
89 if count == 0:
90 await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
91 return
92 header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
93 profit = ""
94 profit_usd = 0
95 for currency, amount in sorted(super_chats.items()):
96 profit += f"\n{CURRENCY[currency]}**{currency}**: {number(amount)}" if currency in CURRENCY else ""
97 profit_usd += amount * (await to_usd(currency)) if currency in CURRENCY else Decimal()
98 # if only "USD" ccy, do not include total USD
99 super_chats.pop("USD", None) # remove "USD"
100 if profit_usd > 0 and super_chats:
101 profit += f"\n💵**总计**: {profit_usd:.2f} USD"
102
103 tips = f"{DANMU_TIPS}\n" if qtype == "弹幕" else f"{FAYAN_TIPS}\n"
104 if file_bytes(paths) < 20480: # 20 KB, short length, try send as message directly
105 danmu = tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
106 final = f"{header}{profit}\n{blockquote(danmu)}"
107 if len(await smart_split(final)) == 1:
108 await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
109 return
110 caption += f"\n#️⃣{qtype}数: {count}"
111 caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
112 merged_paths = paths if DANMU.QUERY_METHOD.lower() == "turso" else merge_txt_files(paths, query_dates, user, keyword, qtype, tips)
113 media = [{"document": path} for path in sorted(merged_paths)]
114 # less than 200K, add instant view
115 if file_bytes(merged_paths) < 204800:
116 texts = "\n".join([Path(path).read_text() for path in sorted(merged_paths, reverse=True)]).strip()
117 if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=convert_html(texts), author=user, ttl="1d"):
118 caption += f"\n⚡️[即时预览]({telegraph_url})"
119 await send2tg(client, message, texts=caption, media=media, **kwargs)
120 [Path(path).unlink(missing_ok=True) for path in paths]
121 [Path(path).unlink(missing_ok=True) for path in merged_paths]
122 await modify_progress(message=status_msg, del_status=True, **kwargs)
123
124
125def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
126 """Parse from users' query.
127
128 Returns:
129 match_time, user, keyword, error
130 """
131 # ruff: noqa: RUF001
132 match_time = ""
133 user = ""
134 texts = texts.replace(PREFIX.FAYAN, PREFIX.DANMU) # unify prefix
135 # 2025-01-01
136 if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts): # noqa: SIM114
137 match_time = matched.group(1)
138 # 2025-01
139 elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2})(\s+)?", texts): # noqa: SIM114
140 match_time = matched.group(1)
141 # 2025
142 elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
143 match_time = matched.group(1)
144 if not match_time.startswith("20"):
145 match_time = ""
146 # remove prefix + date
147 texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
148 # @张三 你好
149 # @张三
150 # @"Zhang San"
151 if texts.startswith("@"): # user is specified
152 texts = texts.lstrip("@")
153 if texts.startswith('"'):
154 if matched := re.match(r'^"(.*?)"', texts):
155 user = matched.group(1)
156 texts = re.sub(r'^"(.*)"', "", texts)
157 else:
158 return "", "", "", "查询格式有误, 用户名右侧缺失英文双引号"
159 elif texts.startswith("“"):
160 if matched := re.match(r"^“(.*?)”", texts):
161 user = matched.group(1)
162 texts = re.sub(r"^“(.*?)”", "", texts)
163 else:
164 return "", "", "", "查询格式有误, 用户名右侧缺失中文双引号"
165 elif texts.startswith("'"):
166 if matched := re.match(r"^'(.*?)'", texts):
167 user = matched.group(1)
168 texts = re.sub(r"^'(.*?)'", "", texts)
169 else:
170 return "", "", "", "查询格式有误, 用户名右侧缺失英文单引号"
171 elif texts.startswith("‘"):
172 if matched := re.match(r"^‘(.*?)’", texts):
173 user = matched.group(1)
174 texts = re.sub(r"^‘(.*?)’", "", texts)
175 else:
176 return "", "", "", "查询格式有误, 用户名右侧缺失中文单引号"
177 elif matched := re.match(r"^(\S*)", texts):
178 user = matched.group(1)
179 texts = re.sub(rf"^{user}", "", texts)
180
181 keyword = texts.lstrip()
182
183 if qtype == "发言":
184 user = ""
185
186 if not any((match_time, user, keyword)):
187 return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.FAYAN}命令查看帮助"
188 return match_time, user, keyword, ""
189
190
191async def get_query_dates(match_time: str, qtype: str = "弹幕") -> list[str]:
192 """获取查询日期.
193
194 Returns:
195 list[str]: [date-1, date-2, ...]
196 """
197 now = nowdt(TZ)
198 allowed_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
199 allowed_years.append(str(now.year))
200 allowed_years = sorted(set(allowed_years))
201 # YYYY-MM-DD, YYYY-MM, YYYY
202 if match_time:
203 return [match_time]
204
205 return allowed_years