main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import re
4from io import BytesIO
5
6from glom import glom
7from loguru import logger
8from pyrogram.client import Client
9from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
10from pyrogram.types import Message, User
11
12from config import HISTORY, PREFIX, TZ, cache
13from database.d1 import query_d1
14from database.turso import turso_exec, turso_parse_resp
15from history.d1 import get_d1_chatinfo, save_chatinfo_to_d1
16from history.turso import get_turso_chatinfo, save_chatinfo_to_turso
17from history.utils import TURSO_KWARGS, filter_response, get_chat, get_user_from_chat, is_admin, keyword_query, list_chat_ids
18from messages.parser import parse_chat, parse_msg
19from messages.progress import modify_progress
20from messages.sender import send2tg
21from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
22from others.emoji import MTYPE_EMOJI
23from publish import publish_telegraph
24from utils import convert_html, myself, nowstr, slim_cid, strings_list, to_int
25
26HELP = f"""🗣**查询当前对话聊天记录**
27`/hist` 使用说明:
281.`/hist + 关键词`
292.`/hist + 日期 + 关键词`
303.`/hist + @用户名 + 关键词`
314.`/hist + 日期 + @用户名 + 关键词` (日期需放在最前面)
324.`/hist + 日期 + @用户名 + 关键词` (日期需放在最前面)
33示例:
34{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist 你好`: 查询包含“你好”关键词的记录
35{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist 2025-01-01 你好`: 查询2025-01-01日包含“你好”的记录
36{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist @张三 你好`: 查询用户【张三】包含“你好”的记录
37{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的记录
38{BLOCKQUOTE_EXPANDABLE_DELIM}
39{BLOCKQUOTE_EXPANDABLE_DELIM}注意:
40{BLOCKQUOTE_EXPANDABLE_DELIM}- 用户名和关键词需要区分大小写
41{BLOCKQUOTE_EXPANDABLE_DELIM}- 用户名可以为昵称 (Name)、用户名 (@username)、用户的TelegramUID
42{BLOCKQUOTE_EXPANDABLE_DELIM}- 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
43`/history` 使用说明:
44查询所有对话的聊天记录
45但出于隐私考虑, 本命令会限制使用权限
46`/history + #ChatID` + [日期]+[用户名]+[关键词]
47`/history -l`: 列出所有ChatID
48"""
49
50
51async def query_chat_history(client: Client, message: Message, **kwargs):
52 if not startswith_prefix(message.content, prefix=PREFIX.HISTORY):
53 return
54 if equal_prefix(message.content, prefix=PREFIX.HISTORY) and not message.reply_to_message:
55 await send2tg(client, message, texts=HELP, **kwargs)
56 return
57 info = parse_msg(message, silent=True, use_cache=False)
58 if startswith_prefix(message.content, prefix="/history") and not is_admin(info["uid"]):
59 await send2tg(client, message, texts="⚠️您无权使用此命令, 请使用 `/hist`", **kwargs)
60 return
61
62 if info["text"].strip() == "/history -l":
63 await list_chat_ids(client, message, engine=HISTORY.QUERY_ENGINE)
64 return
65 # if use `/hist` reply to a message
66 if info["text"] in ["/hist", "/history"] and message.reply_to_message:
67 info["text"] = f"/hist @{info['reply_uid']}"
68 qtype = "history" if startswith_prefix(info["text"].strip(), prefix="/history") else "hist"
69
70 chat_id, match_time, user, keyword, error = parse_queries(info["text"], qtype)
71 if error:
72 await send2tg(client, message, texts=error, **kwargs)
73 return
74 if qtype == "hist":
75 chat_id = slim_cid(info["cid"])
76 chatinfo = await get_turso_chatinfo(chat_id) if HISTORY.QUERY_ENGINE == "turso" else await get_d1_chatinfo(chat_id)
77 if not chatinfo: # this chat is never synced
78 chat = await get_chat(client, int(chat_id))
79 chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat)) if HISTORY.QUERY_ENGINE == "turso" else await save_chatinfo_to_d1(client, parse_chat(chat))
80 if not chatinfo:
81 await message.reply(f"⚠️`{chat_id}`不是有效的ChatID", quote=True)
82 return
83
84 """process chatinfo tags, can be used to do advanced filtering
85
86 One Turso database may be read by multiple Telegram accounts, we can use tags to filter by account
87 For example,
88 tags: {my_uid}_SKIP_QUERY -> skip query this chat of `my_uid` account
89 """
90 me = await myself(client)
91 if f"{me.id}_SKIP_QUERY" in strings_list(chatinfo.get("tags", "")):
92 await message.reply("⚠️Tags已设置跳过查询此对话", quote=True)
93 return
94
95 chat_title = chatinfo["ctitle"]
96
97 caption = "📖**查询聊天记录**:"
98 caption += f"\n🆔会话: {chat_title}"
99 caption += f"\n🕒日期: {match_time}"
100 caption += f"\n👤用户: {user}"
101 caption += f"\n🔤关键词: {keyword}"
102 status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
103 kwargs["progress"] = status_msg
104 results = await query_history(client, chatinfo, match_time, user, keyword, engine=HISTORY.QUERY_ENGINE)
105 texts = results.get("texts", "")
106
107 count = results.get("count", 0)
108
109 if not texts:
110 await modify_progress(text=caption + "\n⚠️未匹配任何记录", force_update=True, **kwargs)
111 return
112 if len(texts) < 20480 and len(await smart_split(texts)) == 1:
113 await modify_progress(message=status_msg, text=blockquote(texts), force_update=True, **kwargs)
114 return
115
116 texts = results.get("full_texts", "") # use full texts
117 caption += f"\n#️⃣消息数: {count}"
118 # less than 100,000, add instant view
119 if len(texts) < 1000000 and (
120 telegraph_url := await publish_telegraph(
121 title=f"【{chat_title}】{user}{match_time} {keyword}",
122 html=convert_html(texts),
123 author=user or chat_title,
124 ttl="1d",
125 )
126 ):
127 caption += f"\n⚡️[即时预览]({telegraph_url})"
128 # send as txt
129 with BytesIO(texts.encode("utf-8")) as f:
130 await client.send_document(info["cid"], f, file_name=f"【{chat_title}】{user}{match_time} {keyword}.txt", caption=caption)
131
132 await modify_progress(message=status_msg, del_status=True, **kwargs)
133
134
135def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str, str]:
136 """Parse from users' query.
137
138 Returns:
139 chat_id, match_time, user, keyword, error
140 """
141 # ruff: noqa: SIM114
142 chat_id = ""
143 match_time = ""
144 user = ""
145 keyword = ""
146 error = ""
147 texts = re.sub(r"^/history", "/hist", texts, count=1) # unify prefix
148 # #chat_id
149 if matched := re.match(r"^/hist\s+#(-100)?(\d+)(\s+)?", texts):
150 chat_id = matched.group(2)
151 texts = texts.removeprefix("/hist").lstrip() # remove prefix
152 texts = re.sub(rf"^#(-100)?{chat_id}", "", texts).lstrip() # remove #chat_id
153 # 2025-01-01
154 if matched := re.match(r"(\d{4}-\d{2}-\d{2})(\s+)?", texts):
155 match_time = matched.group(1)
156 # 2025-01
157 elif matched := re.match(r"(\d{4}-\d{2})(\s+)?", texts):
158 match_time = matched.group(1)
159 # 2025
160 elif matched := re.match(r"(\d{4})(\s+)?", texts):
161 match_time = matched.group(1)
162 texts = re.sub(rf"^{match_time}", "", texts).lstrip() # remove date
163
164 # @张三 你好
165 # @张三
166 if matched := re.match(r"^@(\w+)(\s+)?", texts):
167 user = matched.group(1)
168 keyword = re.sub(rf"^@{user}", "", texts).lstrip() # remove user
169
170 if qtype == "hist":
171 if not any((match_time, user, keyword)):
172 error = f"查询格式有误, 请发送 `{PREFIX.HISTORY}` 命令查看帮助"
173 if chat_id:
174 error = "`/hist` 命令不支持指定ChatID, 仅支持查询当前对话聊天记录"
175
176 if qtype == "history":
177 if not any((chat_id, match_time, user, keyword)):
178 error = f"查询格式有误, 请发送 `{PREFIX.HISTORY}` 命令查看帮助"
179 if not chat_id:
180 error = "`/history` 命令需要指定 ChatID\n`/history -l`: 列出所有ChatID"
181
182 return chat_id, match_time, user, keyword, error
183
184
185async def query_history(
186 client: Client,
187 cinfo: dict[str, str],
188 match_time: str,
189 user: str,
190 keyword: str,
191 engine: str = HISTORY.QUERY_ENGINE,
192) -> dict:
193 """Query chat history from Turso.
194
195 由于LIKE查询会扫描整个表, 速度较慢, 而且会快速消耗读取数量配额, 因此我们使用FTS5搜索.
196 由于FTS5不支持中文匹配, 且远端数据库不支持icu分词器, 所以在插入文本时手动进行了分词 (基于`cutword`库)
197 分词结果可能并不准确, 但可以满足大部分需求.
198 我们还需要对返回的结果进行进一步过滤, 以精确匹配.
199 """
200 conditions = []
201 if keyword:
202 conditions.append(keyword_query(keyword))
203
204 if match_time:
205 begin = "1970-01-01 00:00:00"
206 end = nowstr(TZ)
207 if len(match_time) == 4: # 2025
208 begin = f"{match_time}-01-01 00:00:00"
209 end = f"{match_time}-12-31 23:59:59"
210 elif len(match_time) == 7: # 2025-01
211 begin = f"{match_time}-01 00:00:00"
212 end = f"{match_time}-31 23:59:59"
213 elif len(match_time) == 10: # 2025-01-01
214 begin = f"{match_time} 00:00:00"
215 end = f"{match_time} 23:59:59"
216 conditions.append(f"T.time >= '{begin}' AND T.time <= '{end}'")
217 if user:
218 # 由于username可以修改, 我们优先使用UID进行匹配
219 real_cid = cinfo["chandle"] if cinfo.get("chandle") else cinfo["cid"] if cinfo["ctype"] in ["BOT", "PRIVATE"] else f"-100{cinfo['cid']}"
220 if uid := await get_uid_by_username(client, real_cid, user, engine):
221 conditions.append(f"T.uid = {uid}")
222 else:
223 conditions.append(f"T.user = '{user}'")
224 limit = 200
225 if conditions:
226 condition = " AND ".join(conditions)
227 sql = f"SELECT T.mid, T.mtype, T.time, T.fullname, T.content FROM '{cinfo['tablename']}' AS T JOIN fts_{cinfo['cid']} AS FTS ON T.mid = FTS.rowid WHERE {condition} ORDER BY T.mid DESC"
228 else:
229 sql = f"SELECT T.mid, T.mtype, T.time, T.fullname, T.content FROM '{cinfo['tablename']}' AS T JOIN fts_{cinfo['cid']} AS FTS ON T.mid = FTS.rowid ORDER BY T.mid DESC"
230 if keyword or match_time:
231 limit = 99999
232 sql += f" LIMIT {limit}"
233 logger.info(sql)
234 limit_to_single_msg = False
235 if not any((match_time, user, keyword)):
236 limit_to_single_msg = True
237 elif user and (not any((match_time, keyword))): # only user
238 limit_to_single_msg = True
239
240 if engine == "turso":
241 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
242 filterd = filter_response(turso_parse_resp(resp), keyword)
243 else:
244 resp = await query_d1(sql, db_name=HISTORY.D1_DATABASE, silent=True)
245 filterd = filter_response(glom(resp, "result.0.results", default=[]), keyword)
246 full_texts = ""
247 texts = "" # long message will be trimmed
248 count = 0
249 for row in filterd:
250 url = f"https://t.me/{cinfo['chandle']}/{row['mid']}" if cinfo["chandle"] else f"https://t.me/c/{cinfo['cid']}/{row['mid']}"
251 username = row["fullname"] or "消息链接"
252 emoji = MTYPE_EMOJI[row["mtype"]] if row["mtype"] != "text" else ""
253 full_texts += f"\n👤[{username}]({url}) {row['time']}{emoji}:\n{row['content']}\n"
254 # trim long message
255 content: str = row["content"]
256 if len(row["content"]) > 100:
257 idx = content.find(keyword) # -1 if not found
258 begin = max(0, idx - 45)
259 begin_prefix = "..." if begin > 0 else ""
260 end = min(len(content), idx + len(keyword) + 45)
261 end_suffix = "..." if end != len(content) else ""
262 content = f"{begin_prefix}{row['content'][begin:end]}{end_suffix}"
263 entry = f"\n👤[{username}]({url}) {row['time']}{emoji}:\n{content}\n"
264 if limit_to_single_msg:
265 if len(await smart_split(texts + entry)) == 1:
266 texts += entry
267 count += 1
268 else:
269 break
270 else:
271 texts += entry
272 count += 1
273 return {"texts": texts.strip(), "full_texts": full_texts.strip(), "count": count}
274
275
276async def get_uid_by_username(client: Client, chat_id: str | int, username: str, engine: str = HISTORY.QUERY_ENGINE) -> int:
277 """Get Telegram user id by username.
278
279 Support formats of `username`:
280 handle (a-z, A-Z, 0-9, _)
281 """
282 if cache.get(f"get_uid_by_username-{chat_id}-{username}"):
283 return cache.get(f"get_uid_by_username-{chat_id}-{username}")
284 user = await get_user(client, to_int(username), chat_id, engine)
285 cache.set(f"get_uid_by_username-{username}", user.id, ttl=0)
286 return user.id
287
288
289async def get_user(client: Client, uid: int | str, cid: int | str = "", engine: str = HISTORY.QUERY_ENGINE) -> User:
290 try:
291 found = await client.get_users(to_int(uid))
292 if not isinstance(found, User):
293 return User(id=0)
294 # check if this user is really in this chat
295 # this step is important because:
296 # the `uid` could be a fullname like "Tom", but the handle "@Tom" is occupied by another user
297 found = await get_user_from_chat(client, found.id, cid)
298 if found.id != 0:
299 return found
300 except Exception as e:
301 logger.warning(e)
302
303 user = await get_user_from_chat(client, uid, cid)
304 if user.id == 0: # this uid is not in this chat
305 users = await get_turso_userinfo_by_uid(uid, cid) if engine == "turso" else await get_d1_userinfo_by_uid(uid, cid)
306 for user_id, chat_id in users: # check if this user is still in this chat
307 found = await get_user_from_chat(client, user_id, chat_id)
308 if found.id != 0:
309 return found
310 return User(id=0)
311
312
313async def get_turso_userinfo_by_uid(uid: int | str, cid: int | str = "") -> list[tuple[int, int]]:
314 """Get user info by uid from turso.
315
316 Returns:
317 [(uid, cid)]
318 """
319 uid = to_int(uid)
320 cond = f"uid = {uid}" if isinstance(uid, int) else f"handle = '{uid}' OR name = '{uid}'"
321 resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT cid,uid FROM userinfo WHERE {cond};"}}], retry=2, silent=True, **TURSO_KWARGS)
322 parsed = turso_parse_resp(resp)
323 if cid:
324 parsed = [x for x in parsed if slim_cid(x["cid"]) == slim_cid(cid)]
325 res = []
326 for info in parsed:
327 cid = int(info["cid"])
328 if chat := await get_turso_chatinfo(cid):
329 real_cid = int(cid) if chat["ctype"] in ["PRIVATE", "BOT"] else int(f"-100{cid}")
330 res.append((int(info["uid"]), real_cid))
331 return res
332
333
334async def get_d1_userinfo_by_uid(uid: int | str, cid: int | str = "") -> list[tuple[int, int]]:
335 """Get user info by uid from D1.
336
337 Returns:
338 [(uid, cid)]
339 """
340 uid = to_int(uid)
341 cond = f"uid = {uid}" if isinstance(uid, int) else f"handle = '{uid}' OR name = '{uid}'"
342
343 resp = await query_d1(f"SELECT cid,uid FROM userinfo WHERE {cond};", db_name=HISTORY.D1_DATABASE, silent=True)
344 parsed = glom(resp, "result.0.results", default=[])
345 if cid:
346 parsed = [x for x in parsed if slim_cid(x["cid"]) == slim_cid(cid)]
347 res = []
348 for info in parsed:
349 cid = int(info["cid"])
350 if chat := await get_d1_chatinfo(cid):
351 real_cid = int(cid) if chat["ctype"] in ["PRIVATE", "BOT"] else int(f"-100{cid}")
352 res.append((int(info["uid"]), real_cid))
353 return res