main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import re
  4from io import BytesIO
  5
  6from glom import glom
  7from loguru import logger
  8from pyrogram.client import Client
  9from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
 10from pyrogram.types import Message, User
 11
 12from config import HISTORY, PREFIX, TZ, cache
 13from database.d1 import query_d1
 14from database.turso import turso_exec, turso_parse_resp
 15from history.d1 import get_d1_chatinfo, save_chatinfo_to_d1
 16from history.turso import get_turso_chatinfo, save_chatinfo_to_turso
 17from history.utils import TURSO_KWARGS, filter_response, get_chat, get_user_from_chat, is_admin, keyword_query, list_chat_ids
 18from messages.parser import parse_chat, parse_msg
 19from messages.progress import modify_progress
 20from messages.sender import send2tg
 21from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
 22from others.emoji import MTYPE_EMOJI
 23from publish import publish_telegraph
 24from utils import convert_html, myself, nowstr, slim_cid, strings_list, to_int
 25
 26HELP = f"""🗣**查询当前对话聊天记录**
 27`/hist` 使用说明:
 281.`/hist + 关键词`
 292.`/hist + 日期 + 关键词`
 303.`/hist + @用户名 + 关键词`
 314.`/hist + 日期 + @用户名 + 关键词` (日期需放在最前面)
 324.`/hist + 日期 + @用户名 + 关键词` (日期需放在最前面)
 33示例:
 34{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist 你好`: 查询包含“你好”关键词的记录
 35{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist 2025-01-01 你好`: 查询2025-01-01日包含“你好”的记录
 36{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist @张三 你好`: 查询用户【张三】包含“你好”的记录
 37{BLOCKQUOTE_EXPANDABLE_DELIM}`/hist 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的记录
 38{BLOCKQUOTE_EXPANDABLE_DELIM}
 39{BLOCKQUOTE_EXPANDABLE_DELIM}注意:
 40{BLOCKQUOTE_EXPANDABLE_DELIM}- 用户名和关键词需要区分大小写
 41{BLOCKQUOTE_EXPANDABLE_DELIM}- 用户名可以为昵称 (Name)、用户名 (@username)、用户的TelegramUID
 42{BLOCKQUOTE_EXPANDABLE_DELIM}- 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
 43`/history` 使用说明:
 44查询所有对话的聊天记录
 45但出于隐私考虑, 本命令会限制使用权限
 46`/history + #ChatID` + [日期]+[用户名]+[关键词]
 47`/history -l`: 列出所有ChatID
 48"""
 49
 50
 51async def query_chat_history(client: Client, message: Message, **kwargs):
 52    if not startswith_prefix(message.content, prefix=PREFIX.HISTORY):
 53        return
 54    if equal_prefix(message.content, prefix=PREFIX.HISTORY) and not message.reply_to_message:
 55        await send2tg(client, message, texts=HELP, **kwargs)
 56        return
 57    info = parse_msg(message, silent=True, use_cache=False)
 58    if startswith_prefix(message.content, prefix="/history") and not is_admin(info["uid"]):
 59        await send2tg(client, message, texts="⚠️您无权使用此命令, 请使用 `/hist`", **kwargs)
 60        return
 61
 62    if info["text"].strip() == "/history -l":
 63        await list_chat_ids(client, message, engine=HISTORY.QUERY_ENGINE)
 64        return
 65    # if use `/hist` reply to a message
 66    if info["text"] in ["/hist", "/history"] and message.reply_to_message:
 67        info["text"] = f"/hist @{info['reply_uid']}"
 68    qtype = "history" if startswith_prefix(info["text"].strip(), prefix="/history") else "hist"
 69
 70    chat_id, match_time, user, keyword, error = parse_queries(info["text"], qtype)
 71    if error:
 72        await send2tg(client, message, texts=error, **kwargs)
 73        return
 74    if qtype == "hist":
 75        chat_id = slim_cid(info["cid"])
 76    chatinfo = await get_turso_chatinfo(chat_id) if HISTORY.QUERY_ENGINE == "turso" else await get_d1_chatinfo(chat_id)
 77    if not chatinfo:  # this chat is never synced
 78        chat = await get_chat(client, int(chat_id))
 79        chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat)) if HISTORY.QUERY_ENGINE == "turso" else await save_chatinfo_to_d1(client, parse_chat(chat))
 80    if not chatinfo:
 81        await message.reply(f"⚠️`{chat_id}`不是有效的ChatID", quote=True)
 82        return
 83
 84    """process chatinfo tags, can be used to do advanced filtering
 85
 86    One Turso database may be read by multiple Telegram accounts, we can use tags to filter by account
 87    For example,
 88    tags:  {my_uid}_SKIP_QUERY -> skip query this chat of `my_uid` account
 89    """
 90    me = await myself(client)
 91    if f"{me.id}_SKIP_QUERY" in strings_list(chatinfo.get("tags", "")):
 92        await message.reply("⚠️Tags已设置跳过查询此对话", quote=True)
 93        return
 94
 95    chat_title = chatinfo["ctitle"]
 96
 97    caption = "📖**查询聊天记录**:"
 98    caption += f"\n🆔会话: {chat_title}"
 99    caption += f"\n🕒日期: {match_time}"
100    caption += f"\n👤用户: {user}"
101    caption += f"\n🔤关键词: {keyword}"
102    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
103    kwargs["progress"] = status_msg
104    results = await query_history(client, chatinfo, match_time, user, keyword, engine=HISTORY.QUERY_ENGINE)
105    texts = results.get("texts", "")
106
107    count = results.get("count", 0)
108
109    if not texts:
110        await modify_progress(text=caption + "\n⚠️未匹配任何记录", force_update=True, **kwargs)
111        return
112    if len(texts) < 20480 and len(await smart_split(texts)) == 1:
113        await modify_progress(message=status_msg, text=blockquote(texts), force_update=True, **kwargs)
114        return
115
116    texts = results.get("full_texts", "")  # use full texts
117    caption += f"\n#️⃣消息数: {count}"
118    # less than 100,000, add instant view
119    if len(texts) < 1000000 and (
120        telegraph_url := await publish_telegraph(
121            title=f"{chat_title}{user}{match_time} {keyword}",
122            html=convert_html(texts),
123            author=user or chat_title,
124            ttl="1d",
125        )
126    ):
127        caption += f"\n⚡️[即时预览]({telegraph_url})"
128    # send as txt
129    with BytesIO(texts.encode("utf-8")) as f:
130        await client.send_document(info["cid"], f, file_name=f"{chat_title}{user}{match_time} {keyword}.txt", caption=caption)
131
132    await modify_progress(message=status_msg, del_status=True, **kwargs)
133
134
135def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str, str]:
136    """Parse from users' query.
137
138    Returns:
139        chat_id, match_time, user, keyword, error
140    """
141    # ruff: noqa: SIM114
142    chat_id = ""
143    match_time = ""
144    user = ""
145    keyword = ""
146    error = ""
147    texts = re.sub(r"^/history", "/hist", texts, count=1)  # unify prefix
148    # #chat_id
149    if matched := re.match(r"^/hist\s+#(-100)?(\d+)(\s+)?", texts):
150        chat_id = matched.group(2)
151    texts = texts.removeprefix("/hist").lstrip()  # remove prefix
152    texts = re.sub(rf"^#(-100)?{chat_id}", "", texts).lstrip()  # remove #chat_id
153    # 2025-01-01
154    if matched := re.match(r"(\d{4}-\d{2}-\d{2})(\s+)?", texts):
155        match_time = matched.group(1)
156    # 2025-01
157    elif matched := re.match(r"(\d{4}-\d{2})(\s+)?", texts):
158        match_time = matched.group(1)
159    # 2025
160    elif matched := re.match(r"(\d{4})(\s+)?", texts):
161        match_time = matched.group(1)
162    texts = re.sub(rf"^{match_time}", "", texts).lstrip()  # remove date
163
164    # @张三 你好
165    # @张三
166    if matched := re.match(r"^@(\w+)(\s+)?", texts):
167        user = matched.group(1)
168    keyword = re.sub(rf"^@{user}", "", texts).lstrip()  # remove user
169
170    if qtype == "hist":
171        if not any((match_time, user, keyword)):
172            error = f"查询格式有误, 请发送 `{PREFIX.HISTORY}` 命令查看帮助"
173        if chat_id:
174            error = "`/hist` 命令不支持指定ChatID, 仅支持查询当前对话聊天记录"
175
176    if qtype == "history":
177        if not any((chat_id, match_time, user, keyword)):
178            error = f"查询格式有误, 请发送 `{PREFIX.HISTORY}` 命令查看帮助"
179        if not chat_id:
180            error = "`/history` 命令需要指定 ChatID\n`/history -l`: 列出所有ChatID"
181
182    return chat_id, match_time, user, keyword, error
183
184
185async def query_history(
186    client: Client,
187    cinfo: dict[str, str],
188    match_time: str,
189    user: str,
190    keyword: str,
191    engine: str = HISTORY.QUERY_ENGINE,
192) -> dict:
193    """Query chat history from Turso.
194
195    由于LIKE查询会扫描整个表, 速度较慢, 而且会快速消耗读取数量配额, 因此我们使用FTS5搜索.
196    由于FTS5不支持中文匹配, 且远端数据库不支持icu分词器, 所以在插入文本时手动进行了分词 (基于`cutword`库)
197    分词结果可能并不准确, 但可以满足大部分需求.
198    我们还需要对返回的结果进行进一步过滤, 以精确匹配.
199    """
200    conditions = []
201    if keyword:
202        conditions.append(keyword_query(keyword))
203
204    if match_time:
205        begin = "1970-01-01 00:00:00"
206        end = nowstr(TZ)
207        if len(match_time) == 4:  # 2025
208            begin = f"{match_time}-01-01 00:00:00"
209            end = f"{match_time}-12-31 23:59:59"
210        elif len(match_time) == 7:  # 2025-01
211            begin = f"{match_time}-01 00:00:00"
212            end = f"{match_time}-31 23:59:59"
213        elif len(match_time) == 10:  # 2025-01-01
214            begin = f"{match_time} 00:00:00"
215            end = f"{match_time} 23:59:59"
216        conditions.append(f"T.time >= '{begin}' AND T.time <= '{end}'")
217    if user:
218        # 由于username可以修改, 我们优先使用UID进行匹配
219        real_cid = cinfo["chandle"] if cinfo.get("chandle") else cinfo["cid"] if cinfo["ctype"] in ["BOT", "PRIVATE"] else f"-100{cinfo['cid']}"
220        if uid := await get_uid_by_username(client, real_cid, user, engine):
221            conditions.append(f"T.uid = {uid}")
222        else:
223            conditions.append(f"T.user = '{user}'")
224    limit = 200
225    if conditions:
226        condition = " AND ".join(conditions)
227        sql = f"SELECT T.mid, T.mtype, T.time, T.fullname, T.content FROM '{cinfo['tablename']}' AS T JOIN fts_{cinfo['cid']} AS FTS ON T.mid = FTS.rowid WHERE {condition}  ORDER BY T.mid DESC"
228    else:
229        sql = f"SELECT T.mid, T.mtype, T.time, T.fullname, T.content FROM '{cinfo['tablename']}' AS T JOIN fts_{cinfo['cid']} AS FTS ON T.mid = FTS.rowid ORDER BY T.mid DESC"
230    if keyword or match_time:
231        limit = 99999
232    sql += f" LIMIT {limit}"
233    logger.info(sql)
234    limit_to_single_msg = False
235    if not any((match_time, user, keyword)):
236        limit_to_single_msg = True
237    elif user and (not any((match_time, keyword))):  # only user
238        limit_to_single_msg = True
239
240    if engine == "turso":
241        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
242        filterd = filter_response(turso_parse_resp(resp), keyword)
243    else:
244        resp = await query_d1(sql, db_name=HISTORY.D1_DATABASE, silent=True)
245        filterd = filter_response(glom(resp, "result.0.results", default=[]), keyword)
246    full_texts = ""
247    texts = ""  # long message will be trimmed
248    count = 0
249    for row in filterd:
250        url = f"https://t.me/{cinfo['chandle']}/{row['mid']}" if cinfo["chandle"] else f"https://t.me/c/{cinfo['cid']}/{row['mid']}"
251        username = row["fullname"] or "消息链接"
252        emoji = MTYPE_EMOJI[row["mtype"]] if row["mtype"] != "text" else ""
253        full_texts += f"\n👤[{username}]({url}) {row['time']}{emoji}:\n{row['content']}\n"
254        # trim long message
255        content: str = row["content"]
256        if len(row["content"]) > 100:
257            idx = content.find(keyword)  # -1 if not found
258            begin = max(0, idx - 45)
259            begin_prefix = "..." if begin > 0 else ""
260            end = min(len(content), idx + len(keyword) + 45)
261            end_suffix = "..." if end != len(content) else ""
262            content = f"{begin_prefix}{row['content'][begin:end]}{end_suffix}"
263        entry = f"\n👤[{username}]({url}) {row['time']}{emoji}:\n{content}\n"
264        if limit_to_single_msg:
265            if len(await smart_split(texts + entry)) == 1:
266                texts += entry
267                count += 1
268            else:
269                break
270        else:
271            texts += entry
272            count += 1
273    return {"texts": texts.strip(), "full_texts": full_texts.strip(), "count": count}
274
275
276async def get_uid_by_username(client: Client, chat_id: str | int, username: str, engine: str = HISTORY.QUERY_ENGINE) -> int:
277    """Get Telegram user id by username.
278
279    Support formats of `username`:
280        handle (a-z, A-Z, 0-9, _)
281    """
282    if cache.get(f"get_uid_by_username-{chat_id}-{username}"):
283        return cache.get(f"get_uid_by_username-{chat_id}-{username}")
284    user = await get_user(client, to_int(username), chat_id, engine)
285    cache.set(f"get_uid_by_username-{username}", user.id, ttl=0)
286    return user.id
287
288
289async def get_user(client: Client, uid: int | str, cid: int | str = "", engine: str = HISTORY.QUERY_ENGINE) -> User:
290    try:
291        found = await client.get_users(to_int(uid))
292        if not isinstance(found, User):
293            return User(id=0)
294        # check if this user is really in this chat
295        # this step is important because:
296        # the `uid` could be a fullname like "Tom", but the handle "@Tom" is occupied by another user
297        found = await get_user_from_chat(client, found.id, cid)
298        if found.id != 0:
299            return found
300    except Exception as e:
301        logger.warning(e)
302
303    user = await get_user_from_chat(client, uid, cid)
304    if user.id == 0:  # this uid is not in this chat
305        users = await get_turso_userinfo_by_uid(uid, cid) if engine == "turso" else await get_d1_userinfo_by_uid(uid, cid)
306        for user_id, chat_id in users:  # check if this user is still in this chat
307            found = await get_user_from_chat(client, user_id, chat_id)
308            if found.id != 0:
309                return found
310    return User(id=0)
311
312
313async def get_turso_userinfo_by_uid(uid: int | str, cid: int | str = "") -> list[tuple[int, int]]:
314    """Get user info by uid from turso.
315
316    Returns:
317        [(uid, cid)]
318    """
319    uid = to_int(uid)
320    cond = f"uid = {uid}" if isinstance(uid, int) else f"handle = '{uid}' OR name = '{uid}'"
321    resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT cid,uid FROM userinfo WHERE {cond};"}}], retry=2, silent=True, **TURSO_KWARGS)
322    parsed = turso_parse_resp(resp)
323    if cid:
324        parsed = [x for x in parsed if slim_cid(x["cid"]) == slim_cid(cid)]
325    res = []
326    for info in parsed:
327        cid = int(info["cid"])
328        if chat := await get_turso_chatinfo(cid):
329            real_cid = int(cid) if chat["ctype"] in ["PRIVATE", "BOT"] else int(f"-100{cid}")
330            res.append((int(info["uid"]), real_cid))
331    return res
332
333
334async def get_d1_userinfo_by_uid(uid: int | str, cid: int | str = "") -> list[tuple[int, int]]:
335    """Get user info by uid from D1.
336
337    Returns:
338        [(uid, cid)]
339    """
340    uid = to_int(uid)
341    cond = f"uid = {uid}" if isinstance(uid, int) else f"handle = '{uid}' OR name = '{uid}'"
342
343    resp = await query_d1(f"SELECT cid,uid FROM userinfo WHERE {cond};", db_name=HISTORY.D1_DATABASE, silent=True)
344    parsed = glom(resp, "result.0.results", default=[])
345    if cid:
346        parsed = [x for x in parsed if slim_cid(x["cid"]) == slim_cid(cid)]
347    res = []
348    for info in parsed:
349        cid = int(info["cid"])
350        if chat := await get_d1_chatinfo(cid):
351            real_cid = int(cid) if chat["ctype"] in ["PRIVATE", "BOT"] else int(f"-100{cid}")
352            res.append((int(info["uid"]), real_cid))
353    return res