main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3
  4from datetime import datetime
  5from zoneinfo import ZoneInfo
  6
  7from loguru import logger
  8from pyrogram.client import Client
  9
 10from config import MAX_MESSAGE_RETRIEVED, TZ
 11from database.turso import turso_exec, turso_parse_resp
 12from history.turso import get_turso_chatinfo
 13from history.utils import TURSO_KWARGS, get_chat
 14from messages.parser import parse_msg
 15from utils import to_int
 16
 17
 18async def get_history_info_list(
 19    client: Client,
 20    chat_id: int | str,
 21    offset_id: int = 0,
 22    limit: int = 0,
 23    begin_time: datetime | None = None,
 24    end_time: datetime | None = None,
 25    users: str | list[str] | None = None,
 26) -> list[dict]:
 27    """Get given number of chat history from old to new in parserd json format.
 28
 29    If user is specified, number of messages from the user will be returned.
 30    """
 31    if begin_time is None:
 32        begin_time = datetime.fromtimestamp(0, tz=ZoneInfo(TZ))
 33    if end_time is None:
 34        end_time = datetime.now(tz=ZoneInfo(TZ))
 35
 36    history = await get_history_info_list_via_turso(chat_id, limit=limit, begin_time=begin_time, end_time=end_time, users=users)
 37    if not history:
 38        history = await get_history_info_list_via_telegram(client, chat_id, offset_id=offset_id, limit=limit, begin_time=begin_time, end_time=end_time, users=users)
 39    return history
 40
 41
 42async def get_history_info_list_via_turso(
 43    chat_id: int | str,
 44    begin_time: datetime,
 45    end_time: datetime,
 46    users: str | list[str] | None = None,
 47    limit: int = 0,
 48) -> list[dict]:
 49    """Get given number of chat history from old to new in parserd json format.
 50
 51    If user is specified, number of messages from the user will be returned.
 52    """
 53    chat_info = await get_turso_chatinfo(chat_id)
 54    if not chat_info:
 55        return []
 56    begin = begin_time.strftime("%Y-%m-%d %H:%M:%S")
 57    end = end_time.strftime("%Y-%m-%d %H:%M:%S")
 58    sql = f"""SELECT * FROM '{chat_info["tablename"]}' WHERE (time > '{begin}' AND time < '{end}')"""
 59    if users:
 60        users = [users] if isinstance(users, str) else users
 61        handle_cond = " OR ".join(f"handle = '{user}'" for user in users)
 62        name_cond = " OR ".join(f"user = '{user}'" for user in users)
 63        uid_cond = " OR ".join(f"uid = '{user}'" for user in users)
 64        combined_cond = f"{handle_cond} OR {name_cond} OR {uid_cond}"
 65        sql += f" AND ({combined_cond})"
 66    sql += " ORDER BY mid ASC"
 67    if limit:
 68        sql += f" LIMIT {limit}"
 69    logger.debug(sql)
 70    resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, **TURSO_KWARGS)
 71    rows = turso_parse_resp(resp)
 72
 73    """Necessary fields for `parse_history_list` function:
 74
 75    file_name, is_bot, text, datetime, mtype, mid, full_name, message_url, reply_to_message_id, media_group_id
 76    """
 77    message_url_prefix = f"https://t.me/{chat_info['chandle']}" if chat_info["chandle"] else f"https://t.me/c/{chat_info['cid']}"
 78
 79    return [
 80        {
 81            "file_name": row["filename"],
 82            "is_bot": row["handle"].endswith("bot"),
 83            "text": row["content"],
 84            "datetime": datetime.strptime(row["time"], "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(TZ)),
 85            "mtype": row["mtype"],
 86            "mid": int(row["mid"]),
 87            "full_name": row["fullname"],
 88            "message_url": f"{message_url_prefix}/{row['mid']}",
 89            "reply_to_message_id": to_int(row["reply"]) or 0,
 90            "media_group_id": row["gid"],
 91        }
 92        for row in rows
 93    ]
 94
 95
 96async def get_history_info_list_via_telegram(
 97    client: Client,
 98    chat_id: int | str,
 99    offset_id: int = 0,
100    limit: int = 0,
101    begin_time: datetime | None = None,
102    end_time: datetime | None = None,
103    users: str | list[str] | None = None,
104) -> list[dict]:
105    """Get given number of chat history from old to new in parserd json format.
106
107    If user is specified, number of messages from the user will be returned.
108    """
109    if begin_time is None:
110        begin_time = datetime.fromtimestamp(0, tz=ZoneInfo(TZ))
111    if end_time is None:
112        end_time = datetime.now(tz=ZoneInfo(TZ))
113    history = []
114    retrieved = 0
115    if not users:
116        users = []
117    users = [users] if isinstance(users, str) else users
118    users = [x.replace(" ", "").lower() for x in users]
119    chat = await get_chat(client, chat_id)
120    if chat.id == 0:
121        return []
122    async for msg in client.get_chat_history(chat_id=chat.username or chat.id, max_id=offset_id):  # type: ignore
123        # iterate messages from new to old
124        retrieved += 1
125        if retrieved > MAX_MESSAGE_RETRIEVED:
126            break
127        if len(history) >= limit:
128            break
129        if msg.empty:
130            break
131        info = parse_msg(msg, silent=True, use_cache=False)
132        if info["datetime"] < begin_time:
133            break
134        if info["datetime"] > end_time:
135            continue
136        if msg.reply_to_message_id:
137            info["reply_to_message_id"] = msg.reply_to_message_id or 0
138        if not users:
139            history.append(info)
140            continue
141        if any(info["full_name"].replace(" ", "").lower() == user or str(info["uid"]) == user or info["handle"].lower() == user for user in users):
142            history.append(info)
143    history.reverse()  # from old to new
144    return history