main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import json
  4import os
  5from datetime import datetime, timedelta
  6from pathlib import Path
  7from typing import Literal
  8from zoneinfo import ZoneInfo
  9
 10from glom import Coalesce, flatten, glom
 11from loguru import logger
 12from pyrogram.client import Client
 13from pyrogram.types import Message
 14
 15from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
 16from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
 17from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, TURSO_KWARGS, USER_COLUMNS, USER_INDEXES, can_delete_history, check_save_history, fine_grained_check, get_chat
 18from messages.parser import parse_chat, parse_msg
 19from utils import i_am_bot, nowdt, slim_cid, to_int, true
 20
 21
 22async def sync_history_to_turso(client: Client, message: Message) -> None:
 23    """Sync received messages to Turso database.
 24
 25    1. save the user info to table `userinfo`
 26    2. save the chat info to table `chatinfo`
 27    3. save the message to table `{cid}-{ctitle}`
 28    """
 29    if not HISTORY.TURSO_ENABLE:
 30        return
 31    if isinstance(message, list):  # this is deleted messages
 32        await delete_messages(message)
 33        return
 34    info = parse_msg(message, silent=True, use_cache=False)
 35    if not check_save_history(info["ctype"], info["cid"]) or not fine_grained_check(info) or message.service:
 36        return
 37
 38    await save_userinfo_to_turso(client, info)
 39    chatinfo = await save_chatinfo_to_turso(client, info)
 40    records = {
 41        "mid": info["mid"],
 42        "mtype": info["mtype"],
 43        "time": info["time"],
 44        "fullname": info["full_name"],
 45        "content": message.content,  # text or edited text
 46        "filename": info["file_name"],
 47        "urls": "\n\n".join(info["entity_urls"]),
 48        "reply": message.reply_to_message_id,
 49        "mime": info["mime_type"],
 50        "user": info["full_name"].replace(" ", ""),
 51        "handle": info["handle"],
 52        "uid": info["uid"],
 53        "gid": info["media_group_id"],
 54        "segmented": " ".join(cutter.cutword(message.content)),
 55    }
 56    await turso_exec([insert_statement(chatinfo["tablename"], records, update_on_conflict="mid")], silent=True, retry=2, **TURSO_KWARGS)
 57
 58
 59async def delete_messages(messages: Message | list[Message]) -> None:
 60    """Delete messages from Turso database."""
 61    if not isinstance(messages, list):
 62        messages = [messages]
 63    for message in messages:
 64        cid = glom(message, "chat.id", default=0) or 0
 65        mid = glom(message, "id", default=0) or 0
 66        ctype = glom(message, "chat.type.name", default="") or ""
 67        if not check_save_history(ctype, cid) or message.service:
 68            return
 69        chatinfo = await get_turso_chatinfo(cid)
 70        if not chatinfo:
 71            continue
 72        tablename = chatinfo["tablename"]
 73        resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM '{tablename}' WHERE mid={mid};"}}], silent=True, retry=2, **TURSO_KWARGS)
 74        parsed = turso_parse_resp(resp)
 75        if not parsed:
 76            continue
 77        uid = parsed[0]["uid"]
 78        if can_delete_history(cid, uid):
 79            logger.warning(f"Delete message Chat={cid}, ID={mid}: {parsed[0]}")
 80            sql = f"DELETE FROM '{tablename}' WHERE mid={mid};"
 81            await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
 82
 83
 84async def backup_chat_history_to_turso(
 85    client: Client,
 86    chat_id: str | int,
 87    hours: float = HISTORY.BACKUP_CHATS_HOURS,
 88    *,
 89    start_from: Literal["latest", "oldest"] = "latest",
 90    max_sync: float = float("inf"),
 91) -> None:
 92    """Backup chat history to Turso database.
 93
 94    If start_from is "oldest", find the minimum message id of this chat, then use this mid as `offset_id` to retrieve messages.
 95    """
 96    if not HISTORY.TURSO_ENABLE:
 97        return
 98    if await i_am_bot(client):
 99        return
100    chat = await get_chat(client, to_int(chat_id))
101    if chat.id == 0:  # chat is not accessible
102        return
103    chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
104    if true(os.getenv(f"HISTORY_IGNORE_{chatinfo['cid']}")):
105        return
106    table_name = chatinfo["tablename"]
107    now = nowdt(TZ)
108    begin_dt = now - timedelta(hours=hours)
109    begin_time = begin_dt.strftime("%Y-%m-%d %H:%M:%S")
110    if start_from == "oldest":
111        sql = f'SELECT mid FROM "{table_name}" ORDER BY mid ASC LIMIT 1'
112        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
113        offset_id = int(glom(resp, "results.0.response.result.rows.0.0.value", default=1))
114        saved_mids = {offset_id}
115    else:
116        # find message ids in this time range
117        end_time = now.strftime("%Y-%m-%d %H:%M:%S")
118        sql = f'SELECT mid FROM "{table_name}" WHERE time >= "{begin_time}" AND time <= "{end_time}";'
119        resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
120        saved_mids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
121        saved_mids = {int(x) for x in saved_mids}
122        offset_id = 0  # retrieve from latest message
123    logger.info(f"Found {len(saved_mids)} messages of {table_name} in Turso. Time >= {begin_time}, Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
124    concurrency = 1000
125    num_sync = 0
126    statements = []
127    real_cid = chatinfo["chandle"] or (int(chatinfo["cid"]) if chatinfo["ctype"] in ["BOT", "PRIVATE"] else int(f"-100{chatinfo['cid']}"))
128    async for message in client.get_chat_history(real_cid, max_id=offset_id):  # type: ignore
129        if not isinstance(message, Message) or message.empty or message.service or message.id in saved_mids:
130            continue
131        info = parse_msg(message, silent=True, use_cache=False)
132        if info["time"] < begin_time:
133            break
134        if num_sync >= max_sync:
135            break
136        if not fine_grained_check(info):
137            continue
138        num_sync += 1
139        records = {
140            "mid": info["mid"],
141            "mtype": info["mtype"],
142            "time": info["time"],
143            "fullname": info["full_name"],
144            "content": message.content,
145            "filename": info["file_name"],
146            "urls": "\n\n".join(info["entity_urls"]),
147            "reply": message.reply_to_message_id,
148            "mime": info["mime_type"],
149            "user": info["full_name"].replace(" ", ""),
150            "handle": info["handle"],
151            "uid": info["uid"],
152            "gid": info["media_group_id"],
153            "segmented": " ".join(cutter.cutword(info["text"])),
154        }
155        logger.trace(f"Syncing {table_name}: {info['mid']} - {info['time']}")
156        statements.append(insert_statement(table_name, records, update_on_conflict="mid"))
157        if len(statements) == concurrency:
158            resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
159            num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
160            if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
161                logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}")
162            statements = []
163
164    if statements:
165        resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
166        num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
167        if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
168            logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}")  # type: ignore
169
170
171async def upload_exported_history_to_turso(client: Client, path: str | Path | None = None) -> None:
172    if not HISTORY.TURSO_ENABLE:
173        return
174    if path is None:
175        path = Path(DOWNLOAD_DIR) / "result.json"
176    path = Path(path)
177    if not path.is_file():
178        return
179
180    def parse_text(texts: list) -> str:
181        if isinstance(texts, str):
182            return texts
183        text = ""
184        for x in texts:
185            text += x if isinstance(x, str) else x.get("text", "")
186        return text
187
188    def parse_urls(entities: list) -> str:
189        urls = [glom(x, Coalesce("href", "text")) for x in entities if x["type"] in {"link", "text_link"}]
190        return "\n\n".join(urls)
191
192    with path.open("r") as f:  # noqa: ASYNC230
193        data = json.load(f)
194        logger.info(f"Found {len(data['messages'])} messages in json file")
195        """Since the exported history does not has media_group_id,
196        So we first process all messages and  add media_group_id for it.
197        If two consecutive messages have the same `from_id` and `date_unixtime`,
198        and the message type is photo or video, these messages will be considered as a media group.
199        """
200        last_msg = {}
201        for idx, msg in enumerate(data["messages"]):
202            if all(msg.get(key) == last_msg.get(key) for key in ["from_id", "date_unixtime"]) and any(key in msg for key in ["photo", "thumbnail"]):
203                data["messages"][idx - 1]["media_group_id"] = glom(data["messages"][idx - 1], Coalesce("media_group_id", "id"))
204                data["messages"][idx]["media_group_id"] = glom(data["messages"][idx - 1], Coalesce("media_group_id", "id"))
205            last_msg = msg
206
207    mtypes = {
208        "audio_file": "audio",
209        "voice_message": "voice",
210        "video_message": "video",
211        "video_file": "video",
212    }
213    chat_id = data["id"]
214    chatinfo = await get_turso_chatinfo(chat_id)
215    if not chatinfo:  # this chat is never synced
216        chat = await get_chat(client, int(chat_id))
217        chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
218    table_name = chatinfo["tablename"]
219    # find all message_ids
220    resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'SELECT mid FROM "{table_name}";'}}], silent=True, **TURSO_KWARGS)
221    saved_ids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
222    saved_ids = {int(x) for x in saved_ids}
223    concurrency = 5000
224    statements = []
225    for info in [msg for msg in data["messages"] if msg["id"] not in saved_ids]:  # type: ignore
226        if info["type"] != "message":
227            continue
228        if info["date_unixtime"] == "0":
229            continue
230        if "media_type" not in info:  # guess mtype
231            if "photo" in info:
232                info["media_type"] = "photo"
233            if "video/" in info.get("mime_type", ""):
234                info["media_type"] = "video_file"
235        mtype = info.get("media_type", "text")
236        content = parse_text(info.get("text", []))
237        urls = parse_urls(info.get("text_entities", []))
238        # fine-grained check requires key: ["cid", "mtype", "text", "entity_urls"]
239        if not fine_grained_check({"cid": data["id"], "mtype": mtype, "text": content, "entity_urls": urls}):
240            continue
241        dt = datetime.fromtimestamp(int(info["date_unixtime"]), tz=ZoneInfo(TZ))
242        uid = int(info["from_id"].removeprefix("user").removeprefix("channel"))
243        user = info["from"] or info["from_id"].removeprefix("user").removeprefix("channel")
244        if user == data["name"] and data["type"] in ["public_channel", "private_channel"]:  # user is not shown
245            user = ""
246            uid = 1
247
248        records = {
249            "mid": info["id"],
250            "mtype": mtypes.get(mtype, mtype),
251            "time": dt.strftime("%Y-%m-%d %H:%M:%S"),
252            "fullname": user,
253            "content": content,
254            "filename": info.get("file_name", ""),
255            "urls": urls,
256            "reply": info.get("reply_to_message_id"),
257            "mime": info.get("mime_type", ""),
258            "user": user.replace(" ", ""),
259            "handle": "",  # TODO: parse handle
260            "uid": uid,
261            "gid": info.get("media_group_id", 0),
262            "segmented": " ".join(cutter.cutword(content)),
263        }
264        # logger.debug(f"Syncing message {table_name}: {info['id']}")
265        statements.append(insert_statement(table_name, records, update_on_conflict="mid"))
266        if len(statements) == concurrency:
267            resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
268            num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
269            if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
270                logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {dt.strftime('%Y-%m-%d %H:%M:%S')}")
271            statements = []
272    if statements:
273        resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
274        num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
275        if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
276            logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}")
277
278
279async def get_turso_chatinfo(cid: str | int) -> dict:
280    """Get chat info from table `chatinfo`.
281
282    Returns:
283        cid, ctype, ctitle, chandle
284    """
285    # create table
286    await turso_create_table("chatinfo", CHAT_COLUMNS, silent=True, **TURSO_KWARGS)
287    resp = await turso_exec(
288        [
289            {
290                "type": "execute",
291                "stmt": {"sql": f"SELECT * FROM chatinfo WHERE cid='{slim_cid(cid)}' OR chandle='{cid}';"},
292            }
293        ],
294        silent=True,
295        retry=2,
296        **TURSO_KWARGS,
297    )
298    return glom(turso_parse_resp(resp), "0", default={})
299
300
301async def save_chatinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
302    """Save chat info to table `chatinfo`.
303
304    Args:
305        minfo (dict): parsed message info.
306
307    Returns:
308        cid, ctype, ctitle, chandle, tablename, tags
309    """
310    cid = slim_cid(minfo["cid"])
311    if str(cid) == "0":
312        return {}
313    # Get chat info from turso and save it to cache
314    if not (cached := cache.get(f"turso-chat-{cid}")):
315        cached = await get_turso_chatinfo(cid)
316        cache.set(f"turso-chat-{cid}", cached, ttl=0)
317
318    ctitle = minfo["ctitle"] or minfo["full_name"]
319    # if in private chats, we use the opponent's name as chat title
320    if minfo["ctype"] in ["BOT", "PRIVATE"]:
321        chat = await get_chat(client, minfo["cid"])
322        if chat.id != 0:
323            ctitle = parse_chat(chat)["ctitle"]
324
325    records = {
326        "cid": int(cid),
327        "ctype": minfo["ctype"],
328        "ctitle": ctitle,
329        "chandle": minfo["chandle"],
330        "tablename": cached.get("tablename", "") or f"{cid}-{ctitle}",
331        "tags": cached.get("tags", ""),
332    }
333    # create table for this chat
334    await turso_create_table(
335        records["tablename"],
336        MSG_COLUMNS,
337        idx_cols=MSG_INDEXES,
338        idx_prefix=f"idx_{cid}_",
339        fts_on_col="mid",
340        fts_name=cid,
341        silent=True,
342        **TURSO_KWARGS,
343    )
344    if cached != records:
345        logger.info(f"Save chat info: {records}")
346        cache.set(f"turso-chat-{cid}", records, ttl=0)
347        await turso_exec([insert_statement("chatinfo", records, update_on_conflict="cid")], retry=2, **TURSO_KWARGS)
348    return records
349
350
351async def get_turso_userinfo(uid: int, cid: int) -> dict:
352    """Get user info from table `userinfo`.
353
354    Returns:
355        uid, full_name, handle
356    """
357    # create table
358    await turso_create_table("userinfo", USER_COLUMNS, idx_cols=USER_INDEXES, idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
359    resp = await turso_exec(
360        [
361            {
362                "type": "execute",
363                "stmt": {"sql": f"SELECT * FROM userinfo WHERE uid={uid} AND cid={cid};"},
364            }
365        ],
366        silent=True,
367        retry=2,
368        **TURSO_KWARGS,
369    )
370    return glom(turso_parse_resp(resp), "0", default={})
371
372
373async def save_userinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
374    """Save user info to table `userinfo`.
375
376    Args:
377        minfo (dict): parsed message info.
378
379    Returns:
380        uid, full_name, handle, tags
381    """
382    uid = int(minfo["uid"])
383    cid = int(slim_cid(minfo["cid"]))
384    if uid == 1:  # default user (user is unknown)
385        return {}
386    # Get user info from turso and save it to cache
387    if not (cached := cache.get(f"turso-user-{uid}-{cid}")):
388        cached = await get_turso_userinfo(uid, cid)
389        cache.set(f"turso-user-{uid}-{cid}", cached, ttl=0)
390
391    ctitle = minfo["ctitle"] or minfo["full_name"]
392    # if in private chats, we use the opponent's name as chat title
393    if minfo["ctype"] in ["BOT", "PRIVATE"]:
394        chat = await get_chat(client, minfo["cid"])
395        if chat.id != 0:
396            ctitle = parse_chat(chat)["ctitle"]
397
398    primary_key = uid if uid == cid else abs(uid - cid)
399    records = {
400        "ctitle": ctitle,
401        "full_name": minfo["full_name"],
402        "handle": minfo["handle"],
403        "tags": cached.get("tags", ""),
404        "name": minfo["full_name"].replace(" ", ""),
405        "uid": int(uid),
406        "cid": int(cid),
407        "id": int(primary_key),
408    }
409    if cached != records:
410        logger.info(f"Save user info: {records}")
411        cache.set(f"turso-user-{uid}-{cid}", records, ttl=0)
412        await turso_exec([insert_statement("userinfo", records, update_on_conflict="id")], retry=2, **TURSO_KWARGS)
413    return records