main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import re
4from collections import defaultdict
5from decimal import Decimal
6from io import BytesIO
7from pathlib import Path
8
9from pyrogram.client import Client
10from pyrogram.errors.exceptions.bad_request_400 import MediaCaptionTooLong
11from pyrogram.types import InputMediaDocument, Message
12
13from config import DANMU, DOWNLOAD_DIR, PREFIX, TZ
14from danmu.r2 import get_username_history, query_r2
15from danmu.turso import query_turso
16from danmu.utils import count_entities, to_usd
17from messages.parser import parse_msg
18from messages.progress import modify_progress
19from messages.sender import send2tg
20from messages.utils import blockquote, delete_message, equal_prefix, smart_split, startswith_prefix
21from others.emoji import CURRENCY
22from publish import publish_telegraph
23from utils import convert2html, nowdt, number, strings_list
24
25HELP = f"""📖**查询直播合订本**
26`{PREFIX.DANMU}` 使用说明:
271.`{PREFIX.DANMU} + 日期`
282.`{PREFIX.DANMU} + @用户名` (区分大小写)
293.`{PREFIX.DANMU} + 关键词` (区分大小写)
304.以上可组合使用, 但日期必须放前面, 关键词必须放后面
31示例:
32- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
33- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
34- `{PREFIX.DANMU} 你好`: 查询包含“你好”关键词的弹幕
35- `{PREFIX.DANMU} 2025-01 @张三`: 查询2025年1月份用户【张三】的弹幕
36- `{PREFIX.DANMU} 2025-01-01 你好`: 查询2025-01-01日包含“你好”的弹幕
37- `{PREFIX.DANMU} 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的弹幕
38
39🕒**日期说明:**
40- 发言记录的时间点是开播时长
41- 弹幕记录的时间点是真实时间,时区为 **{TZ}**
42- 指定弹幕查询日期时,采用的是30小时制(凌晨0—6点属于前一天)
43- 例如 `{PREFIX.DANMU} 2025-01-01` 查询的是2025-01-01日6点至次日6点的弹幕
44
45👤**用户名说明:**
46- 如果用户名中有空格, 请用**引号**包住用户名 (单双引号、中英引号皆可)。
47- 例如: 想指定用户为John Doe请使用 `@"John Doe"`
48- 用户名也支持指定YouTube的 **ChannelID**, 例如 `{PREFIX.DANMU} @UC...`
49- 查询ChannelID方法:在YouTube用户页面的简介处点击“更多” -> 点击最下方的“分享频道” -> 点击“复制频道ID”
50
51`{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
52( `@用户名` 对于 `{PREFIX.FAYAN}` 命令无效)
53"""
54
55DANMU_TIPS = f"时间点为{TZ}时区\n"
56
57
58async def query_danmu(client: Client, message: Message, **kwargs):
59 info = parse_msg(message)
60 if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
61 return
62 if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
63 await send2tg(client, message, texts=HELP, **kwargs)
64 return
65 if not DANMU.BASE_URL:
66 await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
67 return
68
69 qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
70 match_time, user, keyword, error = parse_queries(info["text"], qtype)
71 if error:
72 await send2tg(client, message, texts=error, **kwargs)
73 return
74
75 user = user if qtype == "弹幕" else DANMU.STREAMER
76
77 caption = f"📖**{qtype}记录**:"
78 if match_time:
79 caption += f"\n🕒日期: {match_time}"
80 caption += f"\n👤用户: {user}"
81 caption += f"\n🔤关键词: {keyword}"
82
83 status_msg = await message.reply_text(caption, quote=True)
84 kwargs["progress"] = status_msg
85
86 super_chats = defaultdict(Decimal) # {"currency": amount}
87 if DANMU.QUERY_METHOD.lower() == "turso":
88 resp = await query_turso(match_time, user, keyword, caption, super_chats, qtype, **kwargs)
89 count = resp.get("count", 0)
90 paths = resp.get("paths", [])
91 texts = "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
92 [Path(path).unlink(missing_ok=True) for path in paths]
93 user = resp.get("user", user)
94 else:
95 query_dates = await get_query_dates(match_time, qtype)
96 resp = await query_r2(query_dates, user, keyword, caption, super_chats, qtype, **kwargs)
97 texts = resp.get("texts", "")
98 count = resp.get("count", 0)
99 if count == 0:
100 await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
101 return
102 header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
103 username_history = await get_username_history(user) if qtype == "弹幕" and user else ""
104 profit = ""
105 profit_usd = 0
106 for currency, amount in sorted(super_chats.items()):
107 profit += f"\n{CURRENCY[currency]}**{currency}**: {number(amount)}" if currency in CURRENCY else ""
108 profit_usd += amount * (await to_usd(currency)) if currency in CURRENCY else Decimal()
109 # if only "USD" ccy, do not include total USD
110 super_chats.pop("USD", None) # remove "USD"
111 if profit_usd > 0 and super_chats:
112 profit += f"\n💵**总计**: {profit_usd:.2f} USD"
113
114 # try send as message directly
115 tips = DANMU_TIPS if qtype == "弹幕" else ""
116 final = f"{header}{tips}{profit}{username_history.strip()}\n\n{texts}"
117 if (await count_entities(client, final)) <= 100 and len(await smart_split(final)) == 1:
118 await modify_progress(message=status_msg, text=blockquote(final), force_update=True, **kwargs)
119 return
120
121 caption += f"\n#️⃣{qtype}数: {count}"
122 caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
123 html = convert2html(texts)
124 if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=html, author=user, ttl="1d"):
125 caption += f"\n⚡️[即时预览]({telegraph_url})"
126 caption += blockquote(username_history)
127 html = f'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>{qtype}查询结果</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css"></head><body><article>{html}</article></body></html>'
128 try:
129 with BytesIO(html.encode("utf-8")) as f:
130 await status_msg.edit_media(media=InputMediaDocument(f, file_name=f"{qtype}查询结果.html", caption=caption))
131 except MediaCaptionTooLong:
132 save_path = Path(DOWNLOAD_DIR).joinpath(f"{qtype}查询结果.html")
133 save_path.write_text(html)
134 await send2tg(client, message, texts=caption, media=[{"document": save_path.as_posix()}], **kwargs)
135 await delete_message(status_msg)
136
137
138def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
139 """Parse from users' query.
140
141 Returns:
142 match_time, user, keyword, error
143 """
144 # ruff: noqa: RUF001
145 match_time = ""
146 user = ""
147 texts = texts.replace(PREFIX.FAYAN, PREFIX.DANMU) # unify prefix
148 # 2025-01-01
149 if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts): # noqa: SIM114
150 match_time = matched.group(1)
151 # 2025-01
152 elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2})(\s+)?", texts): # noqa: SIM114
153 match_time = matched.group(1)
154 # 2025
155 elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
156 match_time = matched.group(1)
157 if not match_time.startswith("20"):
158 match_time = ""
159 # remove prefix + date
160 texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
161 # @张三 你好
162 # @张三
163 # @"Zhang San"
164 if texts.startswith("@"): # user is specified
165 texts = texts.lstrip("@")
166 if texts.startswith('"'):
167 if matched := re.match(r'^"(.*?)"', texts):
168 user = matched.group(1)
169 texts = re.sub(r'^"(.*)"', "", texts)
170 else:
171 return "", "", "", "查询格式有误, 用户名右侧缺失英文双引号"
172 elif texts.startswith("“"):
173 if matched := re.match(r"^“(.*?)”", texts):
174 user = matched.group(1)
175 texts = re.sub(r"^“(.*?)”", "", texts)
176 else:
177 return "", "", "", "查询格式有误, 用户名右侧缺失中文双引号"
178 elif texts.startswith("'"):
179 if matched := re.match(r"^'(.*?)'", texts):
180 user = matched.group(1)
181 texts = re.sub(r"^'(.*?)'", "", texts)
182 else:
183 return "", "", "", "查询格式有误, 用户名右侧缺失英文单引号"
184 elif texts.startswith("‘"):
185 if matched := re.match(r"^‘(.*?)’", texts):
186 user = matched.group(1)
187 texts = re.sub(r"^‘(.*?)’", "", texts)
188 else:
189 return "", "", "", "查询格式有误, 用户名右侧缺失中文单引号"
190 elif matched := re.match(r"^(\S*)", texts):
191 user = matched.group(1)
192 texts = re.sub(rf"^{user}", "", texts)
193
194 keyword = texts.lstrip()
195
196 if qtype == "发言":
197 user = ""
198
199 if not any((match_time, user, keyword)):
200 return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.FAYAN}命令查看帮助"
201 return match_time, user, keyword, ""
202
203
204async def get_query_dates(match_time: str, qtype: str = "弹幕") -> list[str]:
205 """获取查询日期.
206
207 Returns:
208 list[str]: [date-1, date-2, ...]
209 """
210 now = nowdt(TZ)
211 allowed_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
212 allowed_years.append(str(now.year))
213 allowed_years = sorted(set(allowed_years))
214 # YYYY-MM-DD, YYYY-MM, YYYY
215 if match_time:
216 return [match_time]
217
218 return allowed_years