main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4from datetime import datetime
5from zoneinfo import ZoneInfo
6
7from loguru import logger
8from pyrogram.client import Client
9
10from config import MAX_MESSAGE_RETRIEVED, TZ
11from database.turso import turso_exec, turso_parse_resp
12from history.turso import get_turso_chatinfo
13from history.utils import TURSO_KWARGS, get_chat
14from messages.parser import parse_msg
15from utils import to_int
16
17
18async def get_history_info_list(
19 client: Client,
20 chat_id: int | str,
21 offset_id: int = 0,
22 limit: int = 0,
23 begin_time: datetime | None = None,
24 end_time: datetime | None = None,
25 users: str | list[str] | None = None,
26) -> list[dict]:
27 """Get given number of chat history from old to new in parserd json format.
28
29 If user is specified, number of messages from the user will be returned.
30 """
31 if begin_time is None:
32 begin_time = datetime.fromtimestamp(0, tz=ZoneInfo(TZ))
33 if end_time is None:
34 end_time = datetime.now(tz=ZoneInfo(TZ))
35
36 history = await get_history_info_list_via_turso(chat_id, limit=limit, begin_time=begin_time, end_time=end_time, users=users)
37 if not history:
38 history = await get_history_info_list_via_telegram(client, chat_id, offset_id=offset_id, limit=limit, begin_time=begin_time, end_time=end_time, users=users)
39 return history
40
41
42async def get_history_info_list_via_turso(
43 chat_id: int | str,
44 begin_time: datetime,
45 end_time: datetime,
46 users: str | list[str] | None = None,
47 limit: int = 0,
48) -> list[dict]:
49 """Get given number of chat history from old to new in parserd json format.
50
51 If user is specified, number of messages from the user will be returned.
52 """
53 chat_info = await get_turso_chatinfo(chat_id)
54 if not chat_info:
55 return []
56 begin = begin_time.strftime("%Y-%m-%d %H:%M:%S")
57 end = end_time.strftime("%Y-%m-%d %H:%M:%S")
58 sql = f"""SELECT * FROM '{chat_info["tablename"]}' WHERE (time > '{begin}' AND time < '{end}')"""
59 if users:
60 users = [users] if isinstance(users, str) else users
61 handle_cond = " OR ".join(f"handle = '{user}'" for user in users)
62 name_cond = " OR ".join(f"user = '{user}'" for user in users)
63 uid_cond = " OR ".join(f"uid = '{user}'" for user in users)
64 combined_cond = f"{handle_cond} OR {name_cond} OR {uid_cond}"
65 sql += f" AND ({combined_cond})"
66 sql += " ORDER BY mid ASC"
67 if limit:
68 sql += f" LIMIT {limit}"
69 logger.debug(sql)
70 resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
71 rows = turso_parse_resp(resp)
72
73 """Necessary fields for `parse_history_list` function:
74
75 file_name, is_bot, text, datetime, mtype, mid, full_name, message_url, reply_to_message_id, media_group_id
76 """
77 message_url_prefix = f"https://t.me/{chat_info['chandle']}" if chat_info["chandle"] else f"https://t.me/c/{chat_info['cid']}"
78
79 return [
80 {
81 "file_name": row["filename"],
82 "is_bot": row["handle"].endswith("bot"),
83 "text": row["content"],
84 "datetime": datetime.strptime(row["time"], "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)),
85 "mtype": row["mtype"],
86 "mid": int(row["mid"]),
87 "full_name": row["fullname"],
88 "message_url": f"{message_url_prefix}/{row['mid']}",
89 "reply_to_message_id": to_int(row["reply"]) or 0,
90 "media_group_id": row["gid"],
91 }
92 for row in rows
93 ]
94
95
96async def get_history_info_list_via_telegram(
97 client: Client,
98 chat_id: int | str,
99 offset_id: int = 0,
100 limit: int = 0,
101 begin_time: datetime | None = None,
102 end_time: datetime | None = None,
103 users: str | list[str] | None = None,
104) -> list[dict]:
105 """Get given number of chat history from old to new in parserd json format.
106
107 If user is specified, number of messages from the user will be returned.
108 """
109 if begin_time is None:
110 begin_time = datetime.fromtimestamp(0, tz=ZoneInfo(TZ))
111 if end_time is None:
112 end_time = datetime.now(tz=ZoneInfo(TZ))
113 history = []
114 retrieved = 0
115 if not users:
116 users = []
117 users = [users] if isinstance(users, str) else users
118 users = [x.replace(" ", "").lower() for x in users]
119 chat = await get_chat(client, chat_id)
120 if chat.id == 0:
121 return []
122 async for msg in client.get_chat_history(chat_id=chat.username or chat.id, max_id=offset_id): # type: ignore
123 # iterate messages from new to old
124 retrieved += 1
125 if retrieved > MAX_MESSAGE_RETRIEVED:
126 break
127 if len(history) >= limit:
128 break
129 if msg.empty:
130 break
131 info = parse_msg(msg, silent=True, use_cache=False)
132 if info["datetime"] < begin_time:
133 break
134 if info["datetime"] > end_time:
135 continue
136 if msg.reply_to_message_id:
137 info["reply_to_message_id"] = msg.reply_to_message_id or 0
138 if not users:
139 history.append(info)
140 continue
141 if any(info["full_name"].replace(" ", "").lower() == user or str(info["uid"]) == user or info["handle"].lower() == user for user in users):
142 history.append(info)
143 history.reverse() # from old to new
144 return history