main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import json
4import os
5from datetime import datetime, timedelta
6from pathlib import Path
7from typing import Literal
8from zoneinfo import ZoneInfo
9
10from glom import Coalesce, flatten, glom
11from loguru import logger
12from pyrogram.client import Client
13from pyrogram.types import Message
14
15from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
16from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
17from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, TURSO_KWARGS, USER_COLUMNS, USER_INDEXES, can_delete_history, check_save_history, fine_grained_check, get_chat
18from messages.parser import parse_chat, parse_msg
19from utils import i_am_bot, nowdt, slim_cid, to_int, true
20
21
22async def sync_history_to_turso(client: Client, message: Message) -> None:
23 """Sync received messages to Turso database.
24
25 1. save the user info to table `userinfo`
26 2. save the chat info to table `chatinfo`
27 3. save the message to table `{cid}-{ctitle}`
28 """
29 if not HISTORY.TURSO_ENABLE:
30 return
31 if isinstance(message, list): # this is deleted messages
32 await delete_messages(message)
33 return
34 info = parse_msg(message, silent=True, use_cache=False)
35 if not check_save_history(info["ctype"], info["cid"]) or not fine_grained_check(info) or message.service:
36 return
37
38 await save_userinfo_to_turso(client, info)
39 chatinfo = await save_chatinfo_to_turso(client, info)
40 records = {
41 "mid": info["mid"],
42 "mtype": info["mtype"],
43 "time": info["time"],
44 "fullname": info["full_name"],
45 "content": message.content, # text or edited text
46 "filename": info["file_name"],
47 "urls": "\n\n".join(info["entity_urls"]),
48 "reply": message.reply_to_message_id,
49 "mime": info["mime_type"],
50 "user": info["full_name"].replace(" ", ""),
51 "handle": info["handle"],
52 "uid": info["uid"],
53 "gid": info["media_group_id"],
54 "segmented": " ".join(cutter.cutword(message.content)),
55 }
56 await turso_exec([insert_statement(chatinfo["tablename"], records, update_on_conflict="mid")], silent=True, retry=2, **TURSO_KWARGS)
57
58
59async def delete_messages(messages: Message | list[Message]) -> None:
60 """Delete messages from Turso database."""
61 if not isinstance(messages, list):
62 messages = [messages]
63 for message in messages:
64 cid = glom(message, "chat.id", default=0) or 0
65 mid = glom(message, "id", default=0) or 0
66 ctype = glom(message, "chat.type.name", default="") or ""
67 if not check_save_history(ctype, cid) or message.service:
68 return
69 chatinfo = await get_turso_chatinfo(cid)
70 if not chatinfo:
71 continue
72 tablename = chatinfo["tablename"]
73 resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM '{tablename}' WHERE mid={mid};"}}], silent=True, retry=2, **TURSO_KWARGS)
74 parsed = turso_parse_resp(resp)
75 if not parsed:
76 continue
77 uid = parsed[0]["uid"]
78 if can_delete_history(cid, uid):
79 logger.warning(f"Delete message Chat={cid}, ID={mid}: {parsed[0]}")
80 sql = f"DELETE FROM '{tablename}' WHERE mid={mid};"
81 await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
82
83
84async def backup_chat_history_to_turso(
85 client: Client,
86 chat_id: str | int,
87 hours: float = HISTORY.BACKUP_CHATS_HOURS,
88 *,
89 start_from: Literal["latest", "oldest"] = "latest",
90 max_sync: float = float("inf"),
91) -> None:
92 """Backup chat history to Turso database.
93
94 If start_from is "oldest", find the minimum message id of this chat, then use this mid as `offset_id` to retrieve messages.
95 """
96 if not HISTORY.TURSO_ENABLE:
97 return
98 if await i_am_bot(client):
99 return
100 chat = await get_chat(client, to_int(chat_id))
101 if chat.id == 0: # chat is not accessible
102 return
103 chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
104 if true(os.getenv(f"HISTORY_IGNORE_{chatinfo['cid']}")):
105 return
106 table_name = chatinfo["tablename"]
107 now = nowdt(TZ)
108 begin_dt = now - timedelta(hours=hours)
109 begin_time = begin_dt.strftime("%Y-%m-%d %H:%M:%S")
110 if start_from == "oldest":
111 sql = f'SELECT mid FROM "{table_name}" ORDER BY mid ASC LIMIT 1'
112 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
113 offset_id = int(glom(resp, "results.0.response.result.rows.0.0.value", default=1))
114 saved_mids = {offset_id}
115 else:
116 # find message ids in this time range
117 end_time = now.strftime("%Y-%m-%d %H:%M:%S")
118 sql = f'SELECT mid FROM "{table_name}" WHERE time >= "{begin_time}" AND time <= "{end_time}";'
119 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
120 saved_mids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
121 saved_mids = {int(x) for x in saved_mids}
122 offset_id = 0 # retrieve from latest message
123 logger.info(f"Found {len(saved_mids)} messages of {table_name} in Turso. Time >= {begin_time}, Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
124 concurrency = 1000
125 num_sync = 0
126 statements = []
127 real_cid = chatinfo["chandle"] or (int(chatinfo["cid"]) if chatinfo["ctype"] in ["BOT", "PRIVATE"] else int(f"-100{chatinfo['cid']}"))
128 async for message in client.get_chat_history(real_cid, max_id=offset_id): # type: ignore
129 if not isinstance(message, Message) or message.empty or message.service or message.id in saved_mids:
130 continue
131 info = parse_msg(message, silent=True, use_cache=False)
132 if info["time"] < begin_time:
133 break
134 if num_sync >= max_sync:
135 break
136 if not fine_grained_check(info):
137 continue
138 num_sync += 1
139 records = {
140 "mid": info["mid"],
141 "mtype": info["mtype"],
142 "time": info["time"],
143 "fullname": info["full_name"],
144 "content": message.content,
145 "filename": info["file_name"],
146 "urls": "\n\n".join(info["entity_urls"]),
147 "reply": message.reply_to_message_id,
148 "mime": info["mime_type"],
149 "user": info["full_name"].replace(" ", ""),
150 "handle": info["handle"],
151 "uid": info["uid"],
152 "gid": info["media_group_id"],
153 "segmented": " ".join(cutter.cutword(info["text"])),
154 }
155 logger.trace(f"Syncing {table_name}: {info['mid']} - {info['time']}")
156 statements.append(insert_statement(table_name, records, update_on_conflict="mid"))
157 if len(statements) == concurrency:
158 resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
159 num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
160 if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
161 logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}")
162 statements = []
163
164 if statements:
165 resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
166 num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
167 if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
168 logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}") # type: ignore
169
170
171async def upload_exported_history_to_turso(client: Client, path: str | Path | None = None) -> None:
172 if not HISTORY.TURSO_ENABLE:
173 return
174 if path is None:
175 path = Path(DOWNLOAD_DIR) / "result.json"
176 path = Path(path)
177 if not path.is_file():
178 return
179
180 def parse_text(texts: list) -> str:
181 if isinstance(texts, str):
182 return texts
183 text = ""
184 for x in texts:
185 text += x if isinstance(x, str) else x.get("text", "")
186 return text
187
188 def parse_urls(entities: list) -> str:
189 urls = [glom(x, Coalesce("href", "text")) for x in entities if x["type"] in {"link", "text_link"}]
190 return "\n\n".join(urls)
191
192 with path.open("r") as f: # noqa: ASYNC230
193 data = json.load(f)
194 logger.info(f"Found {len(data['messages'])} messages in json file")
195 """Since the exported history does not has media_group_id,
196 So we first process all messages and add media_group_id for it.
197 If two consecutive messages have the same `from_id` and `date_unixtime`,
198 and the message type is photo or video, these messages will be considered as a media group.
199 """
200 last_msg = {}
201 for idx, msg in enumerate(data["messages"]):
202 if all(msg.get(key) == last_msg.get(key) for key in ["from_id", "date_unixtime"]) and any(key in msg for key in ["photo", "thumbnail"]):
203 data["messages"][idx - 1]["media_group_id"] = glom(data["messages"][idx - 1], Coalesce("media_group_id", "id"))
204 data["messages"][idx]["media_group_id"] = glom(data["messages"][idx - 1], Coalesce("media_group_id", "id"))
205 last_msg = msg
206
207 mtypes = {
208 "audio_file": "audio",
209 "voice_message": "voice",
210 "video_message": "video",
211 "video_file": "video",
212 }
213 chat_id = data["id"]
214 chatinfo = await get_turso_chatinfo(chat_id)
215 if not chatinfo: # this chat is never synced
216 chat = await get_chat(client, int(chat_id))
217 chatinfo = await save_chatinfo_to_turso(client, parse_chat(chat))
218 table_name = chatinfo["tablename"]
219 # find all message_ids
220 resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'SELECT mid FROM "{table_name}";'}}], silent=True, **TURSO_KWARGS)
221 saved_ids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
222 saved_ids = {int(x) for x in saved_ids}
223 concurrency = 5000
224 statements = []
225 for info in [msg for msg in data["messages"] if msg["id"] not in saved_ids]: # type: ignore
226 if info["type"] != "message":
227 continue
228 if info["date_unixtime"] == "0":
229 continue
230 if "media_type" not in info: # guess mtype
231 if "photo" in info:
232 info["media_type"] = "photo"
233 if "video/" in info.get("mime_type", ""):
234 info["media_type"] = "video_file"
235 mtype = info.get("media_type", "text")
236 content = parse_text(info.get("text", []))
237 urls = parse_urls(info.get("text_entities", []))
238 # fine-grained check requires key: ["cid", "mtype", "text", "entity_urls"]
239 if not fine_grained_check({"cid": data["id"], "mtype": mtype, "text": content, "entity_urls": urls}):
240 continue
241 dt = datetime.fromtimestamp(int(info["date_unixtime"]), tz=ZoneInfo(TZ))
242 uid = int(info["from_id"].removeprefix("user").removeprefix("channel"))
243 user = info["from"] or info["from_id"].removeprefix("user").removeprefix("channel")
244 if user == data["name"] and data["type"] in ["public_channel", "private_channel"]: # user is not shown
245 user = ""
246 uid = 1
247
248 records = {
249 "mid": info["id"],
250 "mtype": mtypes.get(mtype, mtype),
251 "time": dt.strftime("%Y-%m-%d %H:%M:%S"),
252 "fullname": user,
253 "content": content,
254 "filename": info.get("file_name", ""),
255 "urls": urls,
256 "reply": info.get("reply_to_message_id"),
257 "mime": info.get("mime_type", ""),
258 "user": user.replace(" ", ""),
259 "handle": "", # TODO: parse handle
260 "uid": uid,
261 "gid": info.get("media_group_id", 0),
262 "segmented": " ".join(cutter.cutword(content)),
263 }
264 # logger.debug(f"Syncing message {table_name}: {info['id']}")
265 statements.append(insert_statement(table_name, records, update_on_conflict="mid"))
266 if len(statements) == concurrency:
267 resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
268 num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
269 if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
270 logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {dt.strftime('%Y-%m-%d %H:%M:%S')}")
271 statements = []
272 if statements:
273 resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
274 num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
275 if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
276 logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}")
277
278
279async def get_turso_chatinfo(cid: str | int) -> dict:
280 """Get chat info from table `chatinfo`.
281
282 Returns:
283 cid, ctype, ctitle, chandle
284 """
285 # create table
286 await turso_create_table("chatinfo", CHAT_COLUMNS, silent=True, **TURSO_KWARGS)
287 resp = await turso_exec(
288 [
289 {
290 "type": "execute",
291 "stmt": {"sql": f"SELECT * FROM chatinfo WHERE cid='{slim_cid(cid)}' OR chandle='{cid}';"},
292 }
293 ],
294 silent=True,
295 retry=2,
296 **TURSO_KWARGS,
297 )
298 return glom(turso_parse_resp(resp), "0", default={})
299
300
301async def save_chatinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
302 """Save chat info to table `chatinfo`.
303
304 Args:
305 minfo (dict): parsed message info.
306
307 Returns:
308 cid, ctype, ctitle, chandle, tablename, tags
309 """
310 cid = slim_cid(minfo["cid"])
311 if str(cid) == "0":
312 return {}
313 # Get chat info from turso and save it to cache
314 if not (cached := cache.get(f"turso-chat-{cid}")):
315 cached = await get_turso_chatinfo(cid)
316 cache.set(f"turso-chat-{cid}", cached, ttl=0)
317
318 ctitle = minfo["ctitle"] or minfo["full_name"]
319 # if in private chats, we use the opponent's name as chat title
320 if minfo["ctype"] in ["BOT", "PRIVATE"]:
321 chat = await get_chat(client, minfo["cid"])
322 if chat.id != 0:
323 ctitle = parse_chat(chat)["ctitle"]
324
325 records = {
326 "cid": int(cid),
327 "ctype": minfo["ctype"],
328 "ctitle": ctitle,
329 "chandle": minfo["chandle"],
330 "tablename": cached.get("tablename", "") or f"{cid}-{ctitle}",
331 "tags": cached.get("tags", ""),
332 }
333 # create table for this chat
334 await turso_create_table(
335 records["tablename"],
336 MSG_COLUMNS,
337 idx_cols=MSG_INDEXES,
338 idx_prefix=f"idx_{cid}_",
339 fts_on_col="mid",
340 fts_name=cid,
341 silent=True,
342 **TURSO_KWARGS,
343 )
344 if cached != records:
345 logger.info(f"Save chat info: {records}")
346 cache.set(f"turso-chat-{cid}", records, ttl=0)
347 await turso_exec([insert_statement("chatinfo", records, update_on_conflict="cid")], retry=2, **TURSO_KWARGS)
348 return records
349
350
351async def get_turso_userinfo(uid: int, cid: int) -> dict:
352 """Get user info from table `userinfo`.
353
354 Returns:
355 uid, full_name, handle
356 """
357 # create table
358 await turso_create_table("userinfo", USER_COLUMNS, idx_cols=USER_INDEXES, idx_prefix="idx_userinfo_", silent=True, **TURSO_KWARGS)
359 resp = await turso_exec(
360 [
361 {
362 "type": "execute",
363 "stmt": {"sql": f"SELECT * FROM userinfo WHERE uid={uid} AND cid={cid};"},
364 }
365 ],
366 silent=True,
367 retry=2,
368 **TURSO_KWARGS,
369 )
370 return glom(turso_parse_resp(resp), "0", default={})
371
372
373async def save_userinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
374 """Save user info to table `userinfo`.
375
376 Args:
377 minfo (dict): parsed message info.
378
379 Returns:
380 uid, full_name, handle, tags
381 """
382 uid = int(minfo["uid"])
383 cid = int(slim_cid(minfo["cid"]))
384 if uid == 1: # default user (user is unknown)
385 return {}
386 # Get user info from turso and save it to cache
387 if not (cached := cache.get(f"turso-user-{uid}-{cid}")):
388 cached = await get_turso_userinfo(uid, cid)
389 cache.set(f"turso-user-{uid}-{cid}", cached, ttl=0)
390
391 ctitle = minfo["ctitle"] or minfo["full_name"]
392 # if in private chats, we use the opponent's name as chat title
393 if minfo["ctype"] in ["BOT", "PRIVATE"]:
394 chat = await get_chat(client, minfo["cid"])
395 if chat.id != 0:
396 ctitle = parse_chat(chat)["ctitle"]
397
398 primary_key = uid if uid == cid else abs(uid - cid)
399 records = {
400 "ctitle": ctitle,
401 "full_name": minfo["full_name"],
402 "handle": minfo["handle"],
403 "tags": cached.get("tags", ""),
404 "name": minfo["full_name"].replace(" ", ""),
405 "uid": int(uid),
406 "cid": int(cid),
407 "id": int(primary_key),
408 }
409 if cached != records:
410 logger.info(f"Save user info: {records}")
411 cache.set(f"turso-user-{uid}-{cid}", records, ttl=0)
412 await turso_exec([insert_statement("userinfo", records, update_on_conflict="id")], retry=2, **TURSO_KWARGS)
413 return records