Commit bdd5393

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-14 05:26:13
refactor(history): save chat info to `chatinfo` table and use it in `query` command
1 parent 619a7fe
Changed files (8)
src/database/turso.py
@@ -229,13 +229,18 @@ async def turso_exec(
     if not silent:
         logger.trace(f"Turso Exec: {statements}")
 
+    num_statements = len(statements)
     resp = await hx_req(db_url, "POST", json_data={"requests": statements}, headers=headers, check_keys=["results"], proxy=PROXY.TURSO, max_retry=int(retry), silent=silent, timeout=600)
+    num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"])
     if not silent:
         rows = glom(resp, "results.0.response.result.rows", default=[])
         log = f"Found {len(rows)} records in Turso."
         log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=0)}"
         log += f", write: {glom(resp, 'results.0.response.result.rows_written', default=0)}"
         logger.success(log)
+    if num_statements != num_success:
+        error = "\n".join(glom(resp, "results.*.error.message", default=[]))
+        logger.error(f"Turso Exec: {num_statements} statements, {num_success} success.\n{error}")
     return resp
 
 
@@ -254,3 +259,10 @@ def insert_statement(table_name: str, records: dict, update_on_conflict: str = "
     if args:
         stmt |= {"args": args}
     return {"type": "execute", "stmt": stmt}
+
+
+def turso_parse_resp(resp: dict) -> list[dict]:
+    """Parse turso SELECT response."""
+    cols = glom(resp, "results.0.response.result.cols", default=[])
+    rows = glom(resp, "results.0.response.result.rows", default=[])
+    return [{col["name"]: x["value"] for x, col in zip(row, cols, strict=True)} for row in rows]
src/history/query.py
@@ -3,23 +3,23 @@
 import re
 from io import BytesIO
 
-from glom import glom
 from loguru import logger
 from pyrogram.client import Client
 from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
 from pyrogram.types import Message
 
 from config import PREFIX, cutter
-from database.turso import turso_exec
-from history.turso import get_table_name
-from history.utils import TURSO_KWARGS, check_save_history, get_uid_by_username, is_admin, list_chat_ids, mtype_emoji
+from database.turso import turso_exec, turso_parse_resp
+from history.turso import get_chatinfo, save_chatinfo
+from history.utils import TURSO_KWARGS, check_save_history, get_chat, get_uid_by_username, is_admin, list_chat_ids
 from llm.utils import convert_html
-from messages.parser import parse_msg
+from messages.parser import parse_chat, parse_msg
 from messages.progress import modify_progress
 from messages.sender import send2tg
 from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
+from others.emoji import MTYPE_EMOJI
 from publish import publish_telegraph
-from utils import slim_cid
+from utils import myself, slim_cid, strings_list
 
 HELP = f"""🗣**查询当前对话聊天记录**
 `/hist` 使用说明:
@@ -49,7 +49,7 @@ HELP = f"""🗣**查询当前对话聊天记录**
 
 async def query_chat_history(client: Client, message: Message, **kwargs):
     info = parse_msg(message, silent=True)
-    admin_call = is_admin(info["uid"], info["handle"])
+    admin_call = is_admin(info["uid"])
     if not check_save_history(info["ctype"], info["cid"]) and not admin_call:  # save history is disabled for this chat
         return
     if not startswith_prefix(info["text"], prefix=PREFIX.HISTORY):
@@ -72,8 +72,26 @@ async def query_chat_history(client: Client, message: Message, **kwargs):
         return
     if qtype == "hist":
         chat_id = slim_cid(info["cid"])
-    table_name = await get_table_name(client, chat_id)
-    chat_title = "".join(table_name.split("-")[1:])
+    chatinfo = await get_chatinfo(chat_id)
+    if not chatinfo:  # this chat is never synced
+        chat = await get_chat(client, int(chat_id))
+        chatinfo = await save_chatinfo(parse_chat(chat))
+    if not chatinfo:
+        await message.reply(f"⚠️`{chat_id}`不是有效的ChatID", quote=True)
+        return
+
+    """process chatinfo tags, can be used to do advanced filtering
+
+    One Turso database may be read by multiple Telegram accounts, we can use tags to filter by account
+    For example,
+    tags:  {my_uid}_SKIP_QUERY -> skip query this chat of `my_uid` account
+    """
+    me = await myself(client)
+    if f"{me.id}_SKIP_QUERY" in strings_list(chatinfo.get("tags", "")):
+        await message.reply("⚠️Tags已设置跳过查询此对话", quote=True)
+        return
+
+    chat_title = chatinfo["ctitle"]
 
     caption = "📖**查询聊天记录**:"
     caption += f"\n🆔会话: {chat_title}"
@@ -83,7 +101,7 @@ async def query_chat_history(client: Client, message: Message, **kwargs):
     status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
     kwargs["progress"] = status_msg
 
-    results = await query_turso(client, table_name, match_time, user, keyword)
+    results = await query_turso(client, chatinfo, match_time, user, keyword)
     texts = results.get("texts", "")
     count = results.get("count", 0)
 
@@ -160,13 +178,13 @@ def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str, str]:
     return chat_id, match_time, user, keyword, error
 
 
-async def query_turso(client: Client, table_name: str, match_time: str, user: str, keyword: str) -> dict:
+async def query_turso(client: Client, cinfo: dict[str, str], match_time: str, user: str, keyword: str) -> dict:
     """Query chat history from Turso."""
     # ruff: noqa: S608
-    cid = int(table_name.split("-")[0])
+
     segmented = " ".join(cutter.cutword(keyword))
     texts_to_match = keyword if segmented == keyword else f'"{keyword}" OR "{segmented}"'  # must use double quotes for inner part
-    sql = f"SELECT T.mid, T.mtype, T.time, T.fullname, T.content FROM '{table_name}' AS T JOIN fts_{cid} AS FTS ON T.mid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}' ORDER BY T.mid DESC"
+    sql = f"SELECT T.mid, T.mtype, T.time, T.fullname, T.content FROM '{cinfo['tablename']}' AS T JOIN fts_{cinfo['cid']} AS FTS ON T.mid = FTS.rowid WHERE FTS.segmented MATCH '{texts_to_match}' ORDER BY T.mid DESC"
     if match_time:
         if len(match_time) == 4:  # 2025
             begin = f"{match_time}-01-01 00:00:00"
@@ -186,16 +204,12 @@ async def query_turso(client: Client, table_name: str, match_time: str, user: st
             sql += f" AND T.user = '{user}'"
     logger.info(sql)
     resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
-
-    # parse turso response
-    cols = glom(resp, "results.0.response.result.cols", default=[])
-    rows = glom(resp, "results.0.response.result.rows", default=[])
     texts = ""
     count = 0
-    for row in rows:
-        row_info = {col["name"]: x["value"] for x, col in zip(row, cols, strict=True)}
-        url = f"https://t.me/c/{cid}/{row_info['mid']}"
-        username = row_info["fullname"] or "匿名"
-        texts += f"👤[{username}]({url}) {row_info['time']}{mtype_emoji(row_info['mtype'])}:\n{row_info['content']}\n"
+    for row in turso_parse_resp(resp):
+        url = f"https://t.me/{cinfo['chandle']}/{row['mid']}" if cinfo["chandle"] else f"https://t.me/c/{cinfo['cid']}/{row['mid']}"
+        username = row["fullname"] or "匿名"
+        emoji = MTYPE_EMOJI[row["mtype"]] if row["mtype"] != "text" else ""
+        texts += f"👤[{username}]({url}) {row['time']}{emoji}:\n{row['content']}\n"
         count += 1
     return {"texts": texts.strip(), "count": count}
src/history/sync.py
@@ -15,7 +15,7 @@ async def sync_chat_history(client: Client, message: Message) -> None:
     if HISTORY.ENGINE.upper() == "D1":  # Deprecated
         await sync_history_to_d1(client, message)
     if HISTORY.ENGINE.upper() == "TURSO":
-        await sync_history_to_turso(client, message)
+        await sync_history_to_turso(message)
 
 
 async def backup_chat_history(
src/history/turso.py
@@ -11,24 +11,34 @@ from pyrogram.client import Client
 from pyrogram.types import Message
 
 from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
-from database.turso import insert_statement, turso_create_table, turso_exec, turso_list_tables
-from history.utils import TURSO_KWARGS, chat_info, check_save_history, fine_grained_check
-from messages.parser import parse_msg
-from utils import i_am_bot, nowdt
+from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
+from history.utils import TURSO_KWARGS, check_save_history, fine_grained_check, get_chat
+from messages.parser import parse_chat, parse_msg
+from utils import i_am_bot, nowdt, slim_cid
 
 # ruff: noqa: S608
 
-DB_COLUMNS = "mid INTEGER PRIMARY KEY, mtype TEXT, time TEXT NOT NULL, fullname TEXT, content TEXT, filename TEXT, urls TEXT, reply INTEGER, mime TEXT, user TEXT, uid INTEGER, segmented TEXT"
+CHAT_COLUMNS = "cid INTEGER PRIMARY KEY, ctype TEXT, ctitle TEXT, chandle TEXT, tablename TEXT, tags TEXT"
+USER_COLUMNS = "uid INTEGER PRIMARY KEY, full_name TEXT, handle TEXT, tags TEXT"
+MSG_COLUMNS = "mid INTEGER PRIMARY KEY, mtype TEXT, time TEXT NOT NULL, fullname TEXT, content TEXT, filename TEXT, urls TEXT, reply INTEGER, mime TEXT, user TEXT, uid INTEGER, segmented TEXT"
 INDEX_NAMES = ["time", "user", "uid"]
 
 
-async def sync_history_to_turso(client: Client, message: Message) -> None:
+async def sync_history_to_turso(message: Message) -> None:
+    """Sync received messages to Turso database.
+
+    1. save the user info to table `userinfo`
+    2. save the chat info to table `chatinfo`
+    3. save the message to table `{cid}-{ctitle}`
+    """
     if not HISTORY.TURSO_ENABLE:
         return
-    info = parse_msg(message, silent=True)
+    info = parse_msg(message, silent=True, use_cache=False)
     if not check_save_history(info["ctype"], info["cid"]) or not fine_grained_check(info) or message.service:
         return
-    table_name = await get_table_name(client, info["cid"])
+
+    await save_userinfo(info)
+    chatinfo = await save_chatinfo(info)
     records = {
         "mid": info["mid"],
         "mtype": info["mtype"],
@@ -43,7 +53,7 @@ async def sync_history_to_turso(client: Client, message: Message) -> None:
         "uid": info["uid"],
         "segmented": " ".join(cutter.cutword(message.content)),
     }
-    await turso_exec([insert_statement(table_name, records, update_on_conflict="mid")], silent=True, retry=2, **TURSO_KWARGS)
+    await turso_exec([insert_statement(chatinfo["tablename"], records, update_on_conflict="mid")], silent=True, retry=2, **TURSO_KWARGS)
 
 
 async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours: float = HISTORY.BACKUP_CHATS_HOURS) -> None:
@@ -51,9 +61,13 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
         return
     if await i_am_bot(client):
         return
-    chat = await chat_info(client, int(chat_id))
-    table_name = await get_table_name(client, chat_id)
-
+    chatinfo = await get_chatinfo(chat_id)
+    if not chatinfo:  # this chat is never synced
+        chat = await get_chat(client, int(chat_id))
+        chatinfo = await save_chatinfo(parse_chat(chat))
+    if not chatinfo:  # chat is deleted
+        return
+    table_name = chatinfo["tablename"]
     # find message ids in this time range
     now = nowdt(TZ)
     begin_dt = now - timedelta(hours=hours)
@@ -66,10 +80,11 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
     logger.info(f"Found {len(saved_mids)} messages in Turso. Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
     concurrency = 1000
     statements = []
-    async for message in client.get_chat_history(chat.id):  # type: ignore
+    real_cid = int(chatinfo["cid"]) if chatinfo["ctype"] in ["BOT", "PRIVATE"] else int(f"-100{chatinfo['cid']}")
+    async for message in client.get_chat_history(real_cid):  # type: ignore
         if not isinstance(message, Message) or message.empty or message.service:
             continue
-        info = parse_msg(message, silent=True)
+        info = parse_msg(message, silent=True, use_cache=False)
         if info["mid"] in saved_mids:
             continue
         if not fine_grained_check(info):
@@ -136,7 +151,12 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
         "video_message": "video",
         "video_file": "video",
     }
-    table_name = await get_table_name(client, data["id"])
+    chat_id = data["id"]
+    chatinfo = await get_chatinfo(chat_id)
+    if not chatinfo:  # this chat is never synced
+        chat = await get_chat(client, int(chat_id))
+        chatinfo = await save_chatinfo(parse_chat(chat))
+    table_name = chatinfo["tablename"]
     # find all message_ids
     resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'SELECT mid FROM "{table_name}";'}}], silent=True, **TURSO_KWARGS)
     saved_ids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
@@ -198,32 +218,116 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
             logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}")
 
 
-async def get_table_name(client: Client, chat_id: str | int) -> str:
-    """Get table name by chat id."""
-    if cache.get(f"tablename-{chat_id}"):
-        return cache.get(f"tablename-{chat_id}")
-    # get a default table name
-    chat = await chat_info(client, int(chat_id))
-    first_name = chat.first_name or ""
-    last_name = chat.last_name or ""
-    full_name = first_name + last_name
-    chat_title = full_name or chat.title or ""
-    slim_cid = str(chat_id).removeprefix("-100")
-    default_name = f"{slim_cid}-{chat_title}".replace(" ", "")
-
-    # find the table name based on chat id
-    table_names = await turso_list_tables(**TURSO_KWARGS, silent=True)
-    table_name = next((x for x in table_names if x.startswith(slim_cid + "-")), default_name)
-    cache.set(f"tablename-{chat_id}", table_name, ttl=0)
-
-    # create table and index
+async def get_chatinfo(cid: str | int) -> dict:
+    """Get chat info from table `chatinfo`.
+
+    Returns:
+        cid, ctype, ctitle, chandle
+    """
+    # create table
+    await turso_create_table("chatinfo", CHAT_COLUMNS, silent=True, **TURSO_KWARGS)
+    resp = await turso_exec(
+        [
+            {
+                "type": "execute",
+                "stmt": {"sql": f"SELECT * FROM chatinfo WHERE cid={slim_cid(cid)};"},
+            }
+        ],
+        silent=True,
+        retry=2,
+        **TURSO_KWARGS,
+    )
+    return glom(turso_parse_resp(resp), "0", default={})
+
+
+async def save_chatinfo(minfo: dict) -> dict[str, str]:
+    """Save chat info to table `chatinfo`.
+
+    Args:
+        minfo (dict): parsed message info.
+
+    Returns:
+        cid, ctype, ctitle, chandle, tablename, tags
+    """
+    cid = slim_cid(minfo["cid"])
+    if str(cid) == "0":
+        return {}
+    # Get chat info from turso and save it to cache
+    if not (cached := cache.get(f"chatinfo-{cid}")):
+        cached = await get_chatinfo(cid)
+        cache.set(f"chatinfo-{cid}", cached, ttl=0)
+    ctitle = minfo["ctitle"] or minfo["full_name"]
+    records = {
+        "cid": cid,
+        "ctype": minfo["ctype"],
+        "ctitle": ctitle,
+        "chandle": minfo["chandle"],
+        "tablename": cached.get("tablename", "") or f"{cid}-{ctitle}",
+        "tags": cached.get("tags", ""),
+    }
+    # create table for this chat
     await turso_create_table(
-        table_name,
-        DB_COLUMNS,
+        records["tablename"],
+        MSG_COLUMNS,
         idx_cols=INDEX_NAMES,
-        idx_prefix=f"idx_{slim_cid}_",
+        idx_prefix=f"idx_{cid}_",
         fts_on_col="mid",
-        fts_name=slim_cid,
+        fts_name=cid,
         **TURSO_KWARGS,
     )
-    return table_name
+    if cached != records:
+        logger.info(f"Save chat info: {records}")
+        cache.set(f"chatinfo-{cid}", records, ttl=0)
+        await turso_exec([insert_statement("chatinfo", records, update_on_conflict="cid")], retry=2, **TURSO_KWARGS)
+    return records
+
+
+async def get_userinfo(uid: str | int) -> dict:
+    """Get user info from table `userinfo`.
+
+    Returns:
+        uid, full_name, handle
+    """
+    # create table
+    await turso_create_table("userinfo", USER_COLUMNS, silent=True, **TURSO_KWARGS)
+    resp = await turso_exec(
+        [
+            {
+                "type": "execute",
+                "stmt": {"sql": f"SELECT * FROM userinfo WHERE uid={uid};"},
+            }
+        ],
+        silent=True,
+        retry=2,
+        **TURSO_KWARGS,
+    )
+    return glom(turso_parse_resp(resp), "0", default={})
+
+
+async def save_userinfo(minfo: dict) -> dict[str, str]:
+    """Save user info to table `userinfo`.
+
+    Args:
+        minfo (dict): parsed message info.
+
+    Returns:
+        uid, full_name, handle, tags
+    """
+    if str(minfo["uid"]) == "1":  # default user (user is unknown)
+        return {}
+    # Get user info from turso and save it to cache
+    if not (cached := cache.get(f"userinfo-{minfo['uid']}")):
+        cached = await get_userinfo(minfo["uid"])
+        cache.set(f"userinfo-{minfo['uid']}", cached, ttl=0)
+
+    records = {
+        "uid": str(minfo["uid"]),  # all values are str from turso.
+        "full_name": minfo["full_name"],
+        "handle": minfo["handle"],
+        "tags": cached.get("tags", ""),
+    }
+    if cached != records:
+        logger.info(f"Save user info: {records}")
+        cache.set(f"userinfo-{minfo['uid']}", records, ttl=0)
+        await turso_exec([insert_statement("userinfo", records, update_on_conflict="uid")], retry=2, **TURSO_KWARGS)
+    return records
src/history/utils.py
@@ -10,9 +10,10 @@ from pyrogram.errors import PeerIdInvalid
 from pyrogram.types import Chat, Message, User
 
 from config import DB, HISTORY, TID, cache
-from database.turso import turso_list_tables
+from database.turso import turso_exec, turso_parse_resp
 from messages.sender import send2tg
-from utils import find_url, slim_cid, to_int, true
+from others.emoji import CTYPE_EMOJI
+from utils import find_url, myself, slim_cid, strings_list, to_int, true
 
 TURSO_KWARGS: dict = {
     "db_name": HISTORY.TURSO_DATABASE,
@@ -81,40 +82,43 @@ def fine_grained_check(info: dict) -> bool:
     return True
 
 
-async def get_chat(client: Client, chat_id: int) -> Chat:
+async def get_chat(client: Client, chat_id: int | str) -> Chat:
     if cache.get(f"chat-info-{slim_cid(chat_id)}"):
         return cache.get(f"chat-info-{slim_cid(chat_id)}")
-    if chat_id == 1:
-        return Chat(id=1)
+    chat = Chat(id=0)  # default
+    if str(chat_id) == "0":
+        return chat
     try:
-        logger.debug(f"Getting chat info for {chat_id}")
         chat = await client.get_chat(int(chat_id))
     except PeerIdInvalid:
-        return await get_chat(client, int(f"-100{chat_id}"))
+        with contextlib.suppress(Exception):
+            chat = await client.get_chat(int(f"-100{slim_cid(chat_id)}"))
     except Exception:
-        chat = Chat(id=1)
+        logger.warning(f"Failed to get chat info for {chat_id}")
     cache.set(f"chat-info-{slim_cid(chat_id)}", chat, ttl=3600)  # cache for 1 hour
     return chat
 
 
 async def list_chat_ids(client: Client, message: Message):
-    table_names = await turso_list_tables(**TURSO_KWARGS, silent=True)
+    """List chat ids from turso table `chatinfo`.
+
+    One Turso database may be read by multiple Telegram accounts, we can use tags to filter by account
+    For example,
+    tags:  {my_uid}_SKIP_LIST -> skip list this chat of `my_uid` account
+    """
+    resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM chatinfo;"}}], silent=True, retry=2, **TURSO_KWARGS)
+    chats = turso_parse_resp(resp)
+    me = await myself(client)
     msg = ""
-    for table_name in table_names:
-        if table_name.startswith("fts_"):
+    for x in sorted(chats, key=lambda x: x["ctype"]):
+        if f"{me.id}_SKIP_LIST" in strings_list(x.get("tags", "")):
             continue
-        cid, ctitle = table_name.split("-", maxsplit=1)
-        msg += f"`/history #{cid}`: {ctitle}\n"
+        msg += f"`/history #{x['cid']}` {CTYPE_EMOJI[x['ctype']]}: {x['ctitle']}\n"
     await send2tg(client, message, texts=msg)
 
 
-def is_admin(uid: int, handle: str) -> bool:
-    for admin in [x.strip() for x in TID.ADMIN.split(",") if x.strip()]:
-        if admin.startswith("@") and admin[1:].lower() == handle.lower():
-            return True
-        if slim_cid(admin) == slim_cid(uid):
-            return True
-    return False
+def is_admin(uid: int) -> bool:
+    return any(slim_cid(admin) == slim_cid(uid) for admin in strings_list(TID.HISTORY_ADMIN))
 
 
 async def get_uid_by_username(client: Client, username: str) -> int:
@@ -134,18 +138,3 @@ async def get_uid_by_username(client: Client, username: str) -> int:
                 return user.id
     cache.set(f"get_uid_by_username-{username}", 0, ttl=0)
     return 0
-
-
-def mtype_emoji(mtype: str) -> str:
-    emojis = {
-        "audio": "🎧",
-        "document": "📔",
-        "photo": "🏞",
-        "sticker": "🎨",
-        "video": "🎥",
-        "video_note": "🎥",
-        "animation": "✨",
-        "voice": "🎤",
-        "web_page": "🌐",
-    }
-    return emojis.get(mtype, "")
src/messages/parser.py
@@ -8,9 +8,10 @@ from zoneinfo import ZoneInfo
 from glom import Coalesce, glom
 from loguru import logger
 from pyrogram.enums import MessageEntityType
-from pyrogram.types import Message
+from pyrogram.types import Chat, Message
 
 from config import TZ, cache
+from others.emoji import CTYPE_EMOJI, MTYPE_EMOJI
 from utils import nowdt
 
 
@@ -27,6 +28,7 @@ def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False,
     mtype = glom(message, "media.value", default="") or "text"
     ctype = glom(message, "chat.type.name", default="") or ""
     ctitle = glom(message, "chat.title", default="") or ""
+    chandle = glom(message, "chat.username", default="") or ""
     uid = glom(message, "from_user.id", default=1) or 1  # uid must > 0
     cid = glom(message, "chat.id", default=0) or 0
     mid = glom(message, "id", default=0) or 0
@@ -79,23 +81,8 @@ def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False,
     if message.caption_entities:
         entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
 
-    ctype_emoji = {
-        "BOT": "🤖",
-        "GROUP": "👥",
-        "SUPERGROUP": "👥",
-        "CHANNEL": "📡",
-        "PRIVATE": "👨",
-    }.get(ctype, "")
-    mtype_emoji = {
-        "text": "🔤",
-        "audio": "🎧",
-        "document": "📔",
-        "photo": "🏞",
-        "sticker": "🎨",
-        "video": "🎬",
-        "animation": "✨",
-        "voice": "🎤",
-    }.get(mtype, mtype)
+    ctype_emoji = CTYPE_EMOJI.get(ctype, "")
+    mtype_emoji = MTYPE_EMOJI.get(mtype, mtype)
     # log the summary to console
     summary = ""
     if ctitle:  # group or channel
@@ -113,6 +100,7 @@ def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False,
         "mtype": str(mtype),
         "ctype": str(ctype),
         "ctitle": str(ctitle),
+        "chandle": str(chandle),
         "uid": int(uid),
         "cid": int(cid),
         "mid": int(mid),
@@ -130,7 +118,7 @@ def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False,
         "file_size": int(file_size),
         "duration": int(duration),
         "summary": str(summary),
-        "message_url": str(message.link),
+        "message_url": message.link or "",
         "entity_urls": entity_urls,
         "reply_mid": int(reply_mid),
         "reply_text": str(reply_text),
@@ -146,3 +134,32 @@ def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False,
     if use_cache:
         cache.set(f"parse_msg-{message.chat.id}-{message.id}", info, ttl=120)  # cache the same msg for 2 minutes
     return info
+
+
+def parse_chat(chat: Chat, *, use_cache: bool = True) -> dict:
+    """Parse a Chat object and return a dictionary of its attributes.
+
+    Abbreviations: c = chat, m = message, u = user
+    """
+    # ruff: noqa: B009
+    if use_cache and (cached := cache.get(f"parse_chat-{chat.id}")):
+        return cached
+    ctype = glom(chat, "type.name", default="") or ""
+    ctitle = glom(chat, "title", default="") or ""
+    chandle = glom(chat, "username", default="") or ""
+    cid = glom(chat, "id", default=0) or 0
+
+    # parse user attributes
+    first_name = glom(chat, "first_name", default="") or ""
+    last_name = glom(chat, "last_name", default="") or ""
+    full_name = f"{first_name} {last_name}".strip()
+
+    info = {  # ensure the type of each field
+        "cid": int(cid),
+        "ctype": str(ctype),
+        "ctitle": str(ctitle) or str(full_name),
+        "chandle": str(chandle),
+    }
+    if use_cache:
+        cache.set(f"parse_chat-{chat.id}", info, ttl=120)  # cache the same msg for 2 minutes
+    return info
src/others/emoji.py
@@ -316,6 +316,26 @@ CURRENCY = {
     "ZWL": "🇿🇼",
 }
 
+CTYPE_EMOJI = {
+    "BOT": "🤖",
+    "GROUP": "👥",
+    "SUPERGROUP": "👥",
+    "CHANNEL": "📢",
+    "PRIVATE": "👤",
+}
+MTYPE_EMOJI = {
+    "text": "🔤",
+    "audio": "🎧",
+    "document": "📔",
+    "photo": "🏞",
+    "sticker": "🎨",
+    "video": "🎬",
+    "video_note": "🎥",
+    "animation": "✨",
+    "voice": "🎤",
+    "web_page": "🌐",
+}
+
 
 def emojify(text: str, platform: str = "all") -> str:
     """Replace the text emojis with actual emojis.
src/config.py
@@ -178,6 +178,7 @@ class COOKIE:  # See: https://github.com/easychen/CookieCloud
 
 class TID:  # see more TID usecase in `src/permission.py`
     ADMIN = os.getenv("TID_ADMIN", "")  # comma separated userid or @username
+    HISTORY_ADMIN = os.getenv("TID_HISTORY_ADMIN", "")  # comma separated userid (@username is NOT supported!)
     # back up ytdlp audio if the user does not request it
     CHANNEL_YTDLP_BACKUP = os.getenv("TID_CHANNEL_YTDLP_BACKUP", "me")
     DAILY_SUMMARY = os.getenv("TID_DAILY_SUMMARY", "{}")  # {"source-chat-id": "target-chat-id"}, e.g. '{"-1001234567890": "-1009876543210"}'