main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3
  4
  5from datetime import datetime
  6from zoneinfo import ZoneInfo
  7
  8from glom import Coalesce, glom
  9from loguru import logger
 10from pathvalidate import sanitize_filename
 11from pyrogram.enums import MessageEntityType
 12from pyrogram.types import Chat, Message
 13
 14from config import TZ, cache
 15from others.emoji import CTYPE_EMOJI, MTYPE_EMOJI
 16from utils import nowdt
 17
 18
 19def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False, use_cache: bool = True) -> dict:
 20    """Parse a message object and return a dictionary of its attributes.
 21
 22    Abbreviations: c = chat, m = message, u = user
 23    """
 24    if use_cache and (cached := cache.get(f"parse_msg-{message.chat.id}-{message.id}")):
 25        return cached
 26    if not silent and verbose:
 27        logger.trace(f"{message!r}")
 28    mtype = glom(message, "media.value", default="") or "text"
 29    ctype = glom(message, "chat.type.name", default="") or ""
 30    ctitle = glom(message, "chat.title", default="") or ""
 31    chandle = glom(message, "chat.username", default="") or ""
 32    uid = glom(message, "from_user.id", default=1) or 1  # uid must > 0
 33    cid = glom(message, "chat.id", default=0) or 0
 34    tid = get_thread_id(message)
 35    mid = glom(message, "id", default=0) or 0
 36    media_group_id = glom(message, "media_group_id", default=0) or 0
 37    is_bot = glom(message, "from_user.is_bot", default=False)
 38    text = message.content
 39    dt = message.date.astimezone(ZoneInfo(TZ)) if isinstance(message.date, datetime) else nowdt(TZ)
 40    time = f"{dt:%Y-%m-%d %H:%M:%S}"
 41    message_url = glom(message, "link", default="") or ""
 42    # parse user attributes
 43    first_name = glom(message, "from_user.first_name", default="") or ""
 44    last_name = glom(message, "from_user.last_name", default="") or ""
 45    handle = glom(message, "from_user.username", default="") or ""
 46    full_name = f"{first_name} {last_name}".strip()
 47
 48    # parse reply message
 49    reply_uid = glom(message, "reply_to_message.from_user.id", default=1) or 1
 50    reply_mid = glom(message, "reply_to_message.id", default=0) or 0
 51    reply_text = glom(message, "reply_to_message.content", default="") or ""
 52    reply_first_name = glom(message, "reply_to_message.from_user.first_name", default="") or ""
 53    reply_last_name = glom(message, "reply_to_message.from_user.last_name", default="") or ""
 54    reply_handle = glom(message, "reply_to_message.from_user.username", default="") or ""
 55    reply_full_name = f"{reply_first_name} {reply_last_name}".strip()
 56
 57    # parse forward message
 58    forward_origin = message.forward_origin
 59    fwd_cid = glom(forward_origin, "chat.id", default=0) or 0
 60    fwd_ctype = glom(forward_origin, "chat.type.name", default="") or ""
 61    fwd_uid = glom(forward_origin, "sender_user.id", default=1) or 1
 62    fwd_handle = glom(forward_origin, Coalesce("sender_user.username", "chat.username"), default="") or ""
 63    fwd_first_name = glom(forward_origin, "sender_user.first_name", default="") or ""
 64    fwd_last_name = glom(forward_origin, "sender_user.last_name", default="") or ""
 65    fwd_full_name = f"{fwd_first_name} {fwd_last_name}".strip() or glom(forward_origin, Coalesce("sender_user_name", "chat.title"), default="") or ""
 66
 67    # parse media attributes. for photo, we should use `sizes[-1]`. ref: TelegramPlayground/pyrogram @1ea5e797f920776bfeecf985a51dc03ff22906af
 68    if mtype == "photo":
 69        file_id = glom(message, f"{mtype}.sizes")[-1].file_id
 70        file_size = glom(message, f"{mtype}.sizes")[-1].file_size
 71    else:
 72        file_id = glom(message, f"{mtype}.file_id", default=0) or 0
 73        file_size = glom(message, f"{mtype}.file_size", default=0) or 0
 74
 75    file_name = glom(message, f"{mtype}.file_name", default="") or ""
 76    mime_type = glom(message, f"{mtype}.mime_type", default="") or ""
 77    duration = glom(message, f"{mtype}.duration", default=0) or 0
 78    # Parse URL from message entities
 79    entity_urls = []
 80    if message.entities:
 81        entity_urls.extend(entity.url for entity in message.entities if entity.type == MessageEntityType.TEXT_LINK)
 82    if message.caption_entities:
 83        entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
 84
 85    ctype_emoji = CTYPE_EMOJI.get(ctype, "")
 86    mtype_emoji = MTYPE_EMOJI.get(mtype, mtype)
 87    # log the summary to console
 88    summary = ""
 89    if ctitle:  # group or channel
 90        summary += f"{ctype_emoji}{ctitle}[{mid}]"
 91
 92    if full_name:  # private chat
 93        summary += f"🤖{full_name}(@{handle})[{uid}]" if is_bot else f"👨{full_name}(@{handle})[{uid}]"
 94    summary += f" {mtype_emoji}{mtype}{file_name}".strip()
 95    if text:
 96        summary += f" 📝{text}"
 97    if not silent:
 98        logger.info(f"{summary!r}")
 99
100    info = {  # ensure the type of each field
101        "mtype": str(mtype),
102        "ctype": str(ctype),
103        "ctitle": str(ctitle),
104        "chandle": str(chandle),
105        "uid": int(uid),
106        "cid": int(cid),
107        "tid": int(tid),
108        "mid": int(mid),
109        "media_group_id": int(media_group_id),
110        "is_bot": bool(is_bot),
111        "text": str(text),
112        "quote_text": glom(message, "quote.text", default="") or "",
113        "html": getattr(text, "html", ""),
114        "full_name": str(full_name),
115        "handle": str(handle),
116        "datetime": dt,
117        "time": str(time),
118        "file_name": sanitize_filename(file_name, replacement_text="_"),
119        "file_id": str(file_id),
120        "mime_type": str(mime_type),
121        "file_size": int(file_size),
122        "duration": int(duration),
123        "summary": str(summary),
124        "message_url": str(message_url),
125        "entity_urls": entity_urls,
126        "reply_mid": int(reply_mid),
127        "reply_text": str(reply_text),
128        "reply_uid": int(reply_uid),
129        "reply_handle": str(reply_handle),
130        "reply_full_name": str(reply_full_name),
131        "fwd_cid": int(fwd_cid),
132        "fwd_ctype": str(fwd_ctype),
133        "fwd_uid": int(fwd_uid),
134        "fwd_handle": str(fwd_handle),
135        "fwd_full_name": str(fwd_full_name),
136    }
137    if use_cache:
138        cache.set(f"parse_msg-{message.chat.id}-{message.id}", info, ttl=120)  # cache the same msg for 2 minutes
139    return info
140
141
142def parse_chat(chat: Chat, *, use_cache: bool = True) -> dict:
143    """Parse a Chat object and return a dictionary of its attributes.
144
145    Abbreviations: c = chat, m = message, u = user
146    """
147    if use_cache and (cached := cache.get(f"parse_chat-{chat.id}")):
148        return cached
149    ctype = glom(chat, "type.name", default="") or ""
150    ctitle = glom(chat, "title", default="") or ""
151    chandle = glom(chat, "username", default="") or ""
152    cid = glom(chat, "id", default=0) or 0
153
154    # parse user attributes
155    first_name = glom(chat, "first_name", default="") or ""
156    last_name = glom(chat, "last_name", default="") or ""
157    full_name = f"{first_name} {last_name}".strip()
158
159    info = {  # ensure the type of each field
160        "cid": int(cid),
161        "ctype": str(ctype),
162        "ctitle": str(ctitle) or str(full_name),
163        "chandle": str(chandle),
164    }
165    if use_cache:
166        cache.set(f"parse_chat-{chat.id}", info, ttl=120)  # cache the same msg for 2 minutes
167    return info
168
169
170def get_thread_id(message: Message) -> int:
171    """Get the thread ID of a message.
172
173    If the message is not a reply, return the message ID.
174    """
175    tid = glom(message, "message_thread_id", default=0) or 0
176    if not tid and glom(message, "chat.is_forum", default=False):
177        tid = 1  # this message is sent to `General` topic thread
178    return tid