Commit bb6dd18

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-02 05:38:25
feat(history): add support for saving message history to D1 database
1 parent 26574d1
src/danmu/sync.py
@@ -11,7 +11,7 @@ from loguru import logger
 
 from config import DANMU, TZ, cache
 from danmu.utils import merge_json, simplify_json
-from database import create_cf_d1_table, get_cf_r2, list_cf_r2, query_cf_d1, set_cf_r2
+from database import create_d1_table, get_cf_r2, list_cf_r2, query_d1, set_cf_r2
 from networking import hx_req
 from utils import nowdt
 
@@ -149,7 +149,7 @@ async def sync_danmu_to_d1() -> None:
             table_name = dt.year
             if f"{dt:%Y-%m-%d %H:%M:%S}{danmu['authorId']}" in saved_items:
                 continue
-            await create_cf_d1_table(table_name, db_columns, DANMU.D1_DATABASE)
+            await create_d1_table(table_name, db_columns, DANMU.D1_DATABASE)
             columns = ["time", "uid", "user", "text"]
             params = [f"{dt:%Y-%m-%d %H:%M:%S}", danmu["authorId"], danmu["authorName"], danmu["message"]]
             if (amt := danmu.get("scAmount")) and (matched := sc_pattern.fullmatch(amt)):
@@ -158,7 +158,7 @@ async def sync_danmu_to_d1() -> None:
                 columns.extend(["sc_amt", "sc_ccy"])
                 params.extend([float(sc_amt), sc_ccy])
             sql = f'INSERT INTO "{table_name}" ({", ".join(columns)}) VALUES ({", ".join(["?" for _ in range(len(columns))])});'
-            tasks.append(query_cf_d1(sql, db_name=DANMU.D1_DATABASE, params=params))
+            tasks.append(query_d1(sql, db_name=DANMU.D1_DATABASE, params=params))
         chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
         processed = 0
         for chunk in chunks:
@@ -171,7 +171,7 @@ async def sync_danmu_to_d1() -> None:
         return processed
 
     sql = f'SELECT time, uid FROM "{now.year}";'
-    resp = await query_cf_d1(sql, db_name=DANMU.D1_DATABASE)
+    resp = await query_d1(sql, db_name=DANMU.D1_DATABASE)
     saved_items = [f"{x['time']}{x['uid']}" for x in flatten(glom(resp, "result.*.results", default=[]))]
     page = 1
     payload = {"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": f"{now:%Y-%m-%d}", "message": "", "authorName": ""}
src/history/sync.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import json
+from datetime import datetime
+from pathlib import Path
+from zoneinfo import ZoneInfo
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.errors import PeerIdInvalid
+from pyrogram.types import Chat, Message
+
+from config import DOWNLOAD_DIR, HISTORY, TZ, cache
+from database import create_d1_table, query_d1
+from messages.parser import parse_msg
+from utils import i_am_bot
+
+# ruff: noqa: S608
+
+DB_COLUMNS = "mid INTEGER PRIMARY KEY, day TEXT, time TEXT, user TEXT, content TEXT, mtype TEXT, uid INTEGER, handle TEXT, filename TEXT, mime TEXT"
+
+
+async def save_history_to_d1(client: Client, message: Message) -> None:
+    if not HISTORY.D1_ENABLE:
+        return
+    info = parse_msg(message, silent=True)
+    await create_d1_table(info["cid"], DB_COLUMNS, HISTORY.D1_DATABASE, silent=True)
+    cid = str(info["cid"]).removeprefix("-100")
+    chat_title = info["ctitle"] or info["full_name"]
+    if info["ctype"] in ["BOT", "PRIVATE"]:  # for private chats, we use peer side name as chat_title
+        chat = await chat_info(client, info["cid"])
+        first_name = chat.first_name or ""
+        last_name = chat.last_name or ""
+        chat_title = first_name + last_name
+    table_name = f"{cid}-{chat_title}".replace(" ", "")
+    await create_d1_table(table_name, DB_COLUMNS, HISTORY.D1_DATABASE, silent=True)
+    records = {
+        "mid": info["mid"],
+        "day": info["time"].split(" ")[0],
+        "time": info["time"].split(" ")[-1],
+        "user": info["full_name"],
+        "content": info["text"],
+        "mtype": info["mtype"],
+        "uid": info["uid"],
+        "handle": info["handle"],
+        "filename": info["file_name"],
+        "mime": info["mime_type"],
+    }
+    keys = ", ".join(records)
+    values = ", ".join(["?" for _ in range(len(records))])
+    updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != "mid"])
+    sql = f'INSERT INTO "{table_name}" ({keys}) VALUES ({values}) ON CONFLICT (mid) DO UPDATE SET {updates};'
+    await query_d1(sql, db_name=HISTORY.D1_DATABASE, params=list(records.values()), silent=True)
+
+
+async def sync_chat_history_to_d1(client: Client, chat_id: str | int) -> None:
+    if not HISTORY.D1_ENABLE:
+        return
+    if await i_am_bot(client):
+        return
+    chat = await chat_info(client, int(chat_id))
+    cid = str(chat.id).removeprefix("-100")
+    chat_name = chat.title or chat.full_name
+    table_name = f"{cid}-{chat_name}".replace(" ", "")
+    await create_d1_table(table_name, DB_COLUMNS, HISTORY.D1_DATABASE, silent=True)
+    # find last message id
+    sql = f'SELECT mid FROM "{table_name}" ORDER BY mid DESC LIMIT 1;'
+    resp = await query_d1(sql, db_name=HISTORY.D1_DATABASE, silent=True)
+    last_mid = glom(resp, "result.0.results.0.mid", default=1)
+    concurrency = 200
+    tasks = []
+    mids = set()  # to avoid duplicate
+    async for message in client.get_chat_history(chat.id, offset_id=last_mid, reverse=True):  # type: ignore
+        if not isinstance(message, Message) or message.empty:
+            continue
+        info = parse_msg(message, silent=True)
+        if info["mid"] in mids:
+            continue
+        mids.add(info["mid"])
+        records = {
+            "mid": info["mid"],
+            "day": info["time"].split(" ")[0],
+            "time": info["time"].split(" ")[-1],
+            "user": info["full_name"],
+            "content": info["text"],
+            "mtype": info["mtype"],
+            "uid": info["uid"],
+            "handle": info["handle"],
+            "filename": info["file_name"],
+            "mime": info["mime_type"],
+        }
+        logger.trace(f"Syncing message {chat_name}: {info['mid']}")
+        keys = ", ".join(records)
+        values = ", ".join(["?" for _ in range(len(records))])
+        updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != "mid"])
+        sql = f'INSERT INTO "{table_name}" ({keys}) VALUES ({values}) ON CONFLICT (mid) DO UPDATE SET {updates};'
+        tasks.append(query_d1(sql, db_name=HISTORY.D1_DATABASE, params=list(records.values()), silent=True))
+        if len(tasks) == concurrency:
+            res = await asyncio.gather(*tasks, return_exceptions=True)
+            num_success = sum(glom(res, "*.success"))
+            if sync_ids := glom(res, "*.result.0.meta.last_row_id", default=0):
+                logger.success(f"Synced {num_success} messages to D1, {min(sync_ids)} -> {max(sync_ids)}")
+            tasks = []
+            mids = set()
+    if tasks:
+        res = await asyncio.gather(*tasks, return_exceptions=True)
+        num_success = sum(glom(res, "*.success"))
+        if sync_ids := glom(res, "*.result.0.meta.last_row_id", default=0):
+            logger.success(f"Synced {num_success} messages to D1, {min(sync_ids)} -> {max(sync_ids)}")
+        tasks = []
+        mids = set()
+
+
+async def sync_export_history_to_d1(client: Client, path: str | Path | None = None) -> None:
+    if not HISTORY.D1_ENABLE:
+        return
+    if path is None:
+        path = Path(DOWNLOAD_DIR) / "result.json"
+    path = Path(path)
+    if not path.is_file():
+        return
+
+    def parse_text(texts: list) -> str:
+        text = ""
+        for x in texts:
+            text += x if isinstance(x, str) else x.get("text", "")
+        return text
+
+    with path.open("r") as f:  # noqa: ASYNC230
+        data = json.load(f)
+    mtypes = {"text": "text", "animation": "animation", "audio_file": "audio", "sticker": "sticker", "voice_message": "voice", "video_file": "video"}
+
+    cid = str(data["id"]).removeprefix("-100")
+    chat_name = data["name"]
+    table_name = f"{cid}-{chat_name}".replace(" ", "")
+    await create_d1_table(table_name, DB_COLUMNS, HISTORY.D1_DATABASE, silent=True)
+    # find all message_ids
+    sql = f'SELECT mid FROM "{table_name}";'
+    resp = await query_d1(sql, db_name=HISTORY.D1_DATABASE, silent=True)
+    saved_ids = glom(resp, "result.0.results.*.mid", default=[])
+    concurrency = 200
+    tasks = []
+    for info in data["messages"]:  # type: ignore
+        if info["id"] in saved_ids:
+            continue
+        if info["type"] != "message":
+            continue
+        if info["date_unixtime"] == "0":
+            continue
+        dt = datetime.fromtimestamp(int(info["date_unixtime"]), tz=ZoneInfo(TZ))
+        uid = int(info["from_id"].removeprefix("user").removeprefix("channel"))
+        user = info["from"]
+        if user == data["name"] and data["type"] in ["public_channel", "private_channel"]:  # user is not shown
+            user = ""
+            uid = 1
+        chat = await chat_info(client, uid)
+
+        records = {
+            "mid": info["id"],
+            "day": dt.strftime("%Y-%m-%d"),
+            "time": dt.strftime("%H:%M:%S"),
+            "user": user,
+            "content": parse_text(info.get("text", "")),
+            "mtype": mtypes[info.get("media_type", "text")],
+            "uid": uid,
+            "handle": chat.username or "",
+            "filename": info.get("file_name", ""),
+            "mime": info.get("mime_type", ""),
+        }
+        logger.trace(f"Syncing message {chat_name}: {info['id']}")
+        keys = ", ".join(records)
+        values = ", ".join(["?" for _ in range(len(records))])
+        updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != "mid"])
+        sql = f'INSERT INTO "{table_name}" ({keys}) VALUES ({values}) ON CONFLICT (mid) DO UPDATE SET {updates};'
+        tasks.append(query_d1(sql, db_name=HISTORY.D1_DATABASE, params=list(records.values()), silent=True))
+        if len(tasks) == concurrency:
+            res = await asyncio.gather(*tasks, return_exceptions=True)
+            num_success = sum(glom(res, "*.success"))
+            if sync_ids := glom(res, "*.result.0.meta.last_row_id", default=0):
+                logger.success(f"Synced {num_success} messages to D1, {min(sync_ids)} -> {max(sync_ids)}")
+            tasks = []
+
+    if tasks:
+        res = await asyncio.gather(*tasks, return_exceptions=True)
+        num_success = sum(glom(res, "*.success"))
+        if sync_ids := glom(res, "*.result.0.meta.last_row_id", default=0):
+            logger.success(f"Synced {num_success} messages to D1, {min(sync_ids)} -> {max(sync_ids)}")
+        tasks = []
+
+
+async def chat_info(client: Client, uid: int) -> Chat:
+    if cache.get(f"chat-info-{uid}"):
+        return cache.get(f"chat-info-{uid}")
+    if uid == 1:
+        return Chat(id=1)
+    try:
+        chat = await client.get_chat(int(uid))
+    except PeerIdInvalid:
+        return await chat_info(client, int(f"-100{uid}"))
+    except Exception:
+        chat = Chat(id=1)
+    cache.set(f"chat-info-{uid}", chat, ttl=3600)  # cache for 1 hour
+    return chat
src/config.py
@@ -142,6 +142,7 @@ class PROXY:  # format: socks5://127.0.0.1:7890
     IMG = os.getenv("IMG_PROXY", "")  # https://caravaggio.ramielcreations.com/docs/install
     XHS = os.getenv("XHS_PROXY", None)  # Banned VPS IP, need residential proxy
     GPT = os.getenv("GPT_PROXY", None)
+    D1 = os.getenv("D1_PROXY", None)
     WECHAT = os.getenv("WECHAT_PROXY", None)
     DOUYIN = os.getenv("DOUYIN_PROXY", None)
     TIKTOK = os.getenv("TIKTOK_PROXY", None)
@@ -193,6 +194,17 @@ class DB:
     PASTBIN_MAX_BYTES = int(os.getenv("PASTBIN_MAX_BYTES", "10485760"))  # 10 MB
 
 
+class HISTORY:
+    D1_ENABLE = os.getenv("HISTORY_D1_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+    D1_DATABASE = os.getenv("HISTORY_D1_DATABASE", "bennybot-history")
+    INCLUDE_CHATS = os.getenv("HISTORY_INCLUDE_CHATS", "")  # "all" or comma separated chat ids to include  (without `-100` prefix)
+    IGNORE_CHATS = os.getenv("HISTORY_IGNORE_CHATS", "")  # comma separated chat ids to ignore  (without `-100` prefix)
+    INCLUDE_PRIVATES = os.getenv("HISTORY_INCLUDE_PRIVATES", "")  # "all" or comma separated private chat ids to include (without `-100` prefix)
+    INCLUDE_BOTS = os.getenv("HISTORY_INCLUDE_BOTS", "")  # "all" or comma separated private chat ids to include  (without `-100` prefix)
+    INCLUDE_GROUPS = os.getenv("HISTORY_INCLUDE_GROUPS", "")  # "all" or comma separated private chat ids to include  (without `-100` prefix)
+    INCLUDE_CHANNELS = os.getenv("HISTORY_INCLUDE_CHANNELS", "")  # "all" or comma separated private chat ids to include  (without `-100` prefix)
+
+
 class ASR:
     # use different engines based on duration
     # support ali, tencent, gemini engines
src/database.py
@@ -21,11 +21,11 @@ import brotli
 import puremagic
 from aioboto3 import Session
 from botocore.exceptions import ClientError
-from glom import glom
+from glom import flatten, glom
 from httpx import AsyncClient, AsyncHTTPTransport
 from loguru import logger
 
-from config import DB, DOWNLOAD_DIR, cache
+from config import DB, DOWNLOAD_DIR, PROXY, cache
 from networking import hx_req
 from utils import bare_url, nowdt, stringfy
 
@@ -324,7 +324,7 @@ async def del_cf_r2(key: str):
 
 
 @cache.memoize(ttl=0)
-async def create_cf_d1_database(name: str = "bennybot", primary_location_hint: str = "") -> str:
+async def create_d1_database(name: str = "bennybot", primary_location_hint: str = "", *, silent: bool = False) -> str:
     """Create D1 database and return DatabaseID."""
     if not all([DB.CF_D1_ENABLED, DB.CF_ACCOUNT_ID, DB.CF_API_TOKEN]):
         return ""
@@ -332,41 +332,60 @@ async def create_cf_d1_database(name: str = "bennybot", primary_location_hint: s
     headers = {"authorization": f"Bearer {DB.CF_API_TOKEN}", "content-type": "application/json"}
     payload = {"name": name}
     # check if database exists
-    resp = await hx_req(api, method="GET", params=payload, headers=headers, check_kv={"success": True}, max_retry=0, silent=True)
+    resp = await hx_req(api, method="GET", params=payload, headers=headers, check_kv={"success": True}, proxy=PROXY.D1, max_retry=0, silent=silent)
     if database_id := glom(resp, "result.0.uuid", default=""):
         return database_id
     if primary_location_hint:
         payload |= {"primary_location_hint": primary_location_hint}
-    resp = await hx_req(api, method="POST", post_json=payload, headers=headers, check_kv={"success": True}, max_retry=0, silent=True)
+    resp = await hx_req(api, method="POST", post_json=payload, headers=headers, check_kv={"success": True}, proxy=PROXY.D1, max_retry=0, silent=silent)
     return glom(resp, "result.uuid", default="")
 
 
 @cache.memoize(ttl=0)
-async def create_cf_d1_table(table_name: str | float, columns: str, db_name: str = "bennybot") -> None:
+async def create_d1_table(table_name: str | float, columns: str, db_name: str = "bennybot", *, silent: bool = False) -> None:
     """Create D1 database and return DatabaseID."""
     if not all([DB.CF_D1_ENABLED, DB.CF_ACCOUNT_ID, DB.CF_API_TOKEN]):
         return
-    database_id = await create_cf_d1_database(db_name)
+    database_id = await create_d1_database(db_name, silent=silent)
     if not database_id:
         return
+    tables = await list_d1_tables(db_name, silent=silent)
+    if table_name in tables:
+        return
+
     sql = f'CREATE TABLE IF NOT EXISTS "{table_name}" ({columns});'
-    await query_cf_d1(sql, database_id)
-    logger.success(f"Create Table {table_name} in D1 database {db_name}")
+    await query_d1(sql, database_id, silent=silent)
+    if not silent:
+        logger.success(f"Create Table {table_name} in D1 database {db_name}")
+
+
+@cache.memoize(ttl=600)
+async def list_d1_tables(db_name: str = "bennybot", *, silent: bool = False) -> list[str]:
+    """List D1 tables in a database."""
+    if not all([DB.CF_D1_ENABLED, DB.CF_ACCOUNT_ID, DB.CF_API_TOKEN]):
+        return []
+    database_id = await create_d1_database(db_name, silent=silent)
+    if not database_id:
+        return []
+    sql = "SELECT name FROM sqlite_master WHERE type='table';"
+    resp = await query_d1(sql, database_id, silent=silent)
+    return flatten(glom(resp, "result.*.results.*.name", default=[]))
 
 
-async def query_cf_d1(sql: str, db_id: str | None = None, db_name: str = "bennybot", params: list[str] | None = None) -> dict:
+async def query_d1(sql: str, db_id: str | None = None, db_name: str = "bennybot", params: list[str] | None = None, *, silent: bool = False) -> dict:
     """Query D1."""
     if not all([DB.CF_D1_ENABLED, DB.CF_ACCOUNT_ID, DB.CF_API_TOKEN]):
         return {}
     if db_id is None:
-        db_id = await create_cf_d1_database(db_name)
+        db_id = await create_d1_database(db_name, silent=silent)
     api = f"https://api.cloudflare.com/client/v4/accounts/{DB.CF_ACCOUNT_ID}/d1/database/{db_id}/query"
     headers = {"authorization": f"Bearer {DB.CF_API_TOKEN}", "content-type": "application/json"}
     payload = {"sql": sql}
     if params is not None:
         payload |= {"params": params}
-    logger.trace(f"Query CF-D1: {payload}")
-    return await hx_req(api, "POST", post_json=payload, headers=headers, check_kv={"success": True}, max_retry=0, silent=True)
+    if not silent:
+        logger.trace(f"Query CF-D1: {payload}")
+    return await hx_req(api, "POST", post_json=payload, headers=headers, check_kv={"success": True}, proxy=PROXY.D1, max_retry=0, silent=silent)
 
 
 async def list_alist() -> list[dict]:
@@ -535,9 +554,9 @@ if __name__ == "__main__":
     asyncio.run(list_cf_r2("RSS/"))
     # asyncio.run(get_cf_r2("Danmu/2025-05-26"))
     # columns = "id INTEGER PRIMARY KEY, time TEXT, uid INTEGER, user TEXT, text TEXT, sc_amt REAL NULL, sc_ccy TEXT NULL"
-    # asyncio.run(create_cf_d1_table("2025", columns))
+    # asyncio.run(create_d1_table("2025", columns))
 
     # sql = 'INSERT INTO "2025" (id, time, uid, user, text, sc_amt, sc_ccy) VALUES (?, ?, ?, ?, ?, ?, ?);'
     # params = [123, "2025-01-01", 456, "username", "hello", 15.5, "USD"]
-    # resp = asyncio.run(query_cf_d1(sql, params=params))
+    # resp = asyncio.run(query_d1(sql, params=params))
     # print(resp)
src/handler.py
@@ -11,6 +11,7 @@ from bridge.ocr import send_to_ocr_bridge
 from config import ENABLE, PREFIX, PROXY
 from danmu.entrypoint import query_danmu
 from database import del_db
+from history.sync import save_history_to_d1
 from llm.gpt import gpt_response
 from llm.summary import ai_summary
 from messages.parser import parse_msg
@@ -47,6 +48,7 @@ async def handle_utilities(
     asr: bool = True,
     audio: bool = True,
     danmu: bool = True,
+    save_d1: bool = True,
     google: bool = True,
     ocr: bool = True,
     price: bool = True,
@@ -72,6 +74,7 @@ async def handle_utilities(
         asr (bool, optional): Enable ASR. Defaults to True.
         audio (bool, optional): Enable Video -> Audio. Defaults to True.
         danmu (bool, optional): Enable Query Danmu database. Defaults to True.
+        save_d1 (bool, optional): Enable Save message to D1. Defaults to True.
         google (bool, optional): Enable Google Search. Defaults to True.
         ytb (bool, optional): Enable YouTube Search. Defaults to True.
         subtitle (bool, optional): Enable YouTube subtitle. Defaults to True.
@@ -83,6 +86,12 @@ async def handle_utilities(
         show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
         detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
     """
+    if kwargs.get("only_d1"):
+        await save_history_to_d1(client, message)
+        return
+    if kwargs.get("disabled"):
+        return
+
     kwargs |= {"target_chat": target_chat, "reply_msg_id": reply_msg_id, "show_progress": show_progress, "detail_progress": detail_progress}
     info = parse_msg(message)
     kwargs |= params_from_msg_text(info["text"])  # merge the parameters from the message text
@@ -110,6 +119,8 @@ async def handle_utilities(
         await query_danmu(client, message, **kwargs)  # /danmu
     if raw_img:
         await convert_raw_img_file(client, message, **kwargs)
+    if save_d1:
+        await save_history_to_d1(client, message)
 
 
 async def handle_social_media(
@@ -150,6 +161,8 @@ async def handle_social_media(
         show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
         detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
     """
+    if kwargs.get("only_d1") or kwargs.get("disabled"):
+        return None
     kwargs |= {"target_chat": target_chat, "reply_msg_id": reply_msg_id, "show_progress": show_progress, "detail_progress": detail_progress}
     if not ENABLE.SEND_AS_REPLY:
         kwargs["reply_msg_id"] = -1
src/main.py
@@ -54,26 +54,18 @@ async def main():
     @app.on_message(filters.group)
     async def groups(client: Client, message: Message):
         permission = await check_permission(client, message)
-        if permission["disabled"]:
-            return
-        parse_msg(message)
         await handle_utilities(client, message, **permission)
         await handle_social_media(client, message, **permission)
 
     @app.on_message(filters.channel)
     async def channels(client: Client, message: Message):
         permission = await check_permission(client, message)
-        if permission["disabled"]:
-            return
-        parse_msg(message)
         await handle_utilities(client, message, **permission)
         await handle_social_media(client, message, **permission)
 
     @app.on_message(filters.bot)
     async def bots(client: Client, message: Message):
         permission = await check_permission(client, message)
-        if permission["disabled"]:
-            return
         parse_msg(message, verbose=True)
         await forward_social_media_results(client, message)
         await forward_ocr_results(client, message)
@@ -90,8 +82,6 @@ async def main():
             await bots(client, message)  # handle bot messages
             return
         permission = await check_permission(client, message)
-        if permission["disabled"]:
-            return
         parse_msg(message, verbose=True)
         await handle_utilities(client, message, **permission)
         await handle_social_media(client, message, **permission)
src/permission.py
@@ -7,7 +7,7 @@ from loguru import logger
 from pyrogram.client import Client
 from pyrogram.types import Message
 
-from config import ENABLE, TID, cache
+from config import ENABLE, HISTORY, TID, cache
 from utils import i_am_bot, to_int, true
 
 
@@ -23,6 +23,14 @@ async def check_permission(client: Client, message: Message) -> dict:
     # check permission per service
     permission |= check_service(cid=message.chat.id, ctype=ctype)
 
+    """Save message to D1 is different to the above functions.
+    Some times, we only need to save message to D1, but disable other tools.
+    """
+    # check if we should save this message to D1
+    save_d1 = check_save_d1(ctype=ctype, cid=message.chat.id)
+    if permission["disabled"] and save_d1:  # only save msg to D1, disable others
+        permission["only_d1"] = True
+    permission["save_d1"] = save_d1
     return permission
 
 
@@ -186,5 +194,35 @@ def check_service(cid: int | str, ctype: str) -> dict:
     return permission
 
 
+def check_save_d1(ctype: str, cid: int | str) -> bool:
+    # ruff: noqa: SIM103
+    cid = slim_cid(cid)
+    if true(os.getenv(f"HISTORY_IGNORE_{cid}")):
+        return False
+    if true(os.getenv(f"HISTORY_INCLUDE_{cid}")):
+        return True
+    if cid in HISTORY.IGNORE_CHATS.split(","):
+        return False
+    if cid in HISTORY.INCLUDE_CHATS.split(","):
+        return True
+    if ctype == "PRIVATE":
+        if str(HISTORY.INCLUDE_PRIVATES).lower() == "all" or cid in HISTORY.INCLUDE_PRIVATES.split(","):
+            return True
+        return False
+    if ctype == "BOT":
+        if str(HISTORY.INCLUDE_BOTS).lower() == "all" or cid in HISTORY.INCLUDE_BOTS.split(","):
+            return True
+        return False
+    if ctype in ["GROUP", "SUPERGROUP"]:
+        if str(HISTORY.INCLUDE_GROUPS).lower() == "all" or cid in HISTORY.INCLUDE_GROUPS.split(","):
+            return True
+        return False
+    if ctype == "CHANNEL":
+        if str(HISTORY.INCLUDE_CHANNELS).lower() == "all" or cid in HISTORY.INCLUDE_CHANNELS.split(","):
+            return True
+        return False
+    return False
+
+
 def slim_cid(cid: int | str) -> str:
     return str(cid).strip().removeprefix("-100")
src/utils.py
@@ -281,11 +281,14 @@ async def myself(client: Client) -> User:
 
 async def i_am_bot(client: Client) -> bool:
     """Check if this clinet is a bot or not."""
+    if cache.get("i_am_bot"):
+        return cache.get("i_am_bot")
     try:
         me = await myself(client)
     except Exception as e:
         logger.error(e)
         return False
+    cache.set("i_am_bot", me.is_bot, ttl=0)
     return me.is_bot