main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3
  4
  5from datetime import datetime
  6from zoneinfo import ZoneInfo
  7
  8from glom import Coalesce, glom
  9from loguru import logger
 10from pathvalidate import sanitize_filename
 11from pyrogram.enums import MessageEntityType
 12from pyrogram.types import Chat, Message
 13
 14from config import TZ, cache
 15from others.emoji import CTYPE_EMOJI, MTYPE_EMOJI
 16from utils import nowdt, slim_cid
 17
 18
 19def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False, use_cache: bool = True) -> dict:
 20    """Parse a message object and return a dictionary of its attributes.
 21
 22    Abbreviations: c = chat, m = message, u = user
 23    """
 24    if use_cache and (cached := cache.get(f"parse_msg-{message.chat.id}-{message.id}")):
 25        return cached
 26    if not silent and verbose:
 27        logger.trace(f"{message!r}")
 28    mtype = glom(message, "media.value", default="") or "text"
 29    ctype = glom(message, "chat.type.name", default="") or ""
 30    ctitle = glom(message, "chat.title", default="") or ""
 31    chandle = glom(message, "chat.username", default="") or ""
 32    uid = glom(message, "from_user.id", default=1) or 1  # uid must > 0
 33    cid = glom(message, "chat.id", default=0) or 0
 34    tid = get_thread_id(message)
 35    mid = glom(message, "id", default=0) or 0
 36    media_group_id = glom(message, "media_group_id", default=0) or 0
 37    is_bot = glom(message, "from_user.is_bot", default=False)
 38    text = message.content
 39    dt = message.date.astimezone(ZoneInfo(TZ)) if isinstance(message.date, datetime) else nowdt(TZ)
 40    time = f"{dt:%Y-%m-%d %H:%M:%S}"
 41
 42    # parse msg link
 43    if chandle:
 44        message_url = f"https://t.me/{chandle}/{tid}/{mid}" if tid else f"https://t.me/{chandle}/{mid}"
 45    else:
 46        message_url = f"https://t.me/c/{slim_cid(cid)}/{tid}/{mid}" if tid else f"https://t.me/c/{slim_cid(cid)}/{mid}"
 47
 48    # parse user attributes
 49    first_name = glom(message, "from_user.first_name", default="") or ""
 50    last_name = glom(message, "from_user.last_name", default="") or ""
 51    handle = glom(message, "from_user.username", default="") or ""
 52    full_name = f"{first_name} {last_name}".strip()
 53
 54    # parse reply message
 55    reply_uid = glom(message, "reply_to_message.from_user.id", default=1) or 1
 56    reply_mid = glom(message, "reply_to_message.id", default=0) or 0
 57    reply_text = glom(message, "reply_to_message.content", default="") or ""
 58    reply_first_name = glom(message, "reply_to_message.from_user.first_name", default="") or ""
 59    reply_last_name = glom(message, "reply_to_message.from_user.last_name", default="") or ""
 60    reply_handle = glom(message, "reply_to_message.from_user.username", default="") or ""
 61    reply_full_name = f"{reply_first_name} {reply_last_name}".strip()
 62
 63    # parse forward message
 64    forward_origin = message.forward_origin
 65    fwd_cid = glom(forward_origin, "chat.id", default=0) or 0
 66    fwd_ctype = glom(forward_origin, "chat.type.name", default="") or ""
 67    fwd_uid = glom(forward_origin, "sender_user.id", default=1) or 1
 68    fwd_handle = glom(forward_origin, Coalesce("sender_user.username", "chat.username"), default="") or ""
 69    fwd_first_name = glom(forward_origin, "sender_user.first_name", default="") or ""
 70    fwd_last_name = glom(forward_origin, "sender_user.last_name", default="") or ""
 71    fwd_full_name = f"{fwd_first_name} {fwd_last_name}".strip() or glom(forward_origin, Coalesce("sender_user_name", "chat.title"), default="") or ""
 72
 73    # parse media attributes. for photo, we should use `sizes[-1]`. ref: TelegramPlayground/pyrogram @1ea5e797f920776bfeecf985a51dc03ff22906af
 74    if mtype == "photo":
 75        file_id = glom(message, f"{mtype}.sizes")[-1].file_id
 76        file_size = glom(message, f"{mtype}.sizes")[-1].file_size
 77    else:
 78        file_id = glom(message, f"{mtype}.file_id", default=0) or 0
 79        file_size = glom(message, f"{mtype}.file_size", default=0) or 0
 80
 81    file_name = glom(message, f"{mtype}.file_name", default="") or ""
 82    mime_type = glom(message, f"{mtype}.mime_type", default="") or ""
 83    duration = glom(message, f"{mtype}.duration", default=0) or 0
 84    # Parse URL from message entities
 85    entity_urls = []
 86    if message.entities:
 87        entity_urls.extend(entity.url for entity in message.entities if entity.type == MessageEntityType.TEXT_LINK)
 88    if message.caption_entities:
 89        entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
 90
 91    ctype_emoji = CTYPE_EMOJI.get(ctype, "")
 92    mtype_emoji = MTYPE_EMOJI.get(mtype, mtype)
 93    # log the summary to console
 94    summary = ""
 95    if ctitle:  # group or channel
 96        summary += f"{ctype_emoji}{ctitle}[{mid}]"
 97
 98    if full_name:  # private chat
 99        summary += f"🤖{full_name}(@{handle})[{uid}]" if is_bot else f"👨{full_name}(@{handle})[{uid}]"
100    summary += f" {mtype_emoji}{mtype}{file_name}".strip()
101    if text:
102        summary += f" 📝{text}"
103    if not silent:
104        logger.info(f"{summary!r}")
105
106    info = {  # ensure the type of each field
107        "mtype": str(mtype),
108        "ctype": str(ctype),
109        "ctitle": str(ctitle),
110        "chandle": str(chandle),
111        "uid": int(uid),
112        "cid": int(cid),
113        "tid": int(tid),
114        "mid": int(mid),
115        "media_group_id": int(media_group_id),
116        "is_bot": bool(is_bot),
117        "text": str(text),
118        "quote_text": glom(message, "quote.text", default="") or "",
119        "html": getattr(text, "html", ""),
120        "full_name": str(full_name),
121        "handle": str(handle),
122        "datetime": dt,
123        "time": str(time),
124        "file_name": sanitize_filename(file_name, replacement_text="_"),
125        "file_id": str(file_id),
126        "mime_type": str(mime_type),
127        "file_size": int(file_size),
128        "duration": int(duration),
129        "summary": str(summary),
130        "message_url": str(message_url),
131        "entity_urls": entity_urls,
132        "reply_mid": int(reply_mid),
133        "reply_text": str(reply_text),
134        "reply_uid": int(reply_uid),
135        "reply_handle": str(reply_handle),
136        "reply_full_name": str(reply_full_name),
137        "fwd_cid": int(fwd_cid),
138        "fwd_ctype": str(fwd_ctype),
139        "fwd_uid": int(fwd_uid),
140        "fwd_handle": str(fwd_handle),
141        "fwd_full_name": str(fwd_full_name),
142    }
143    if use_cache:
144        cache.set(f"parse_msg-{message.chat.id}-{message.id}", info, ttl=120)  # cache the same msg for 2 minutes
145    return info
146
147
148def parse_chat(chat: Chat, *, use_cache: bool = True) -> dict:
149    """Parse a Chat object and return a dictionary of its attributes.
150
151    Abbreviations: c = chat, m = message, u = user
152    """
153    if use_cache and (cached := cache.get(f"parse_chat-{chat.id}")):
154        return cached
155    ctype = glom(chat, "type.name", default="") or ""
156    ctitle = glom(chat, "title", default="") or ""
157    chandle = glom(chat, "username", default="") or ""
158    cid = glom(chat, "id", default=0) or 0
159
160    # parse user attributes
161    first_name = glom(chat, "first_name", default="") or ""
162    last_name = glom(chat, "last_name", default="") or ""
163    full_name = f"{first_name} {last_name}".strip()
164
165    info = {  # ensure the type of each field
166        "cid": int(cid),
167        "ctype": str(ctype),
168        "ctitle": str(ctitle) or str(full_name),
169        "chandle": str(chandle),
170    }
171    if use_cache:
172        cache.set(f"parse_chat-{chat.id}", info, ttl=120)  # cache the same msg for 2 minutes
173    return info
174
175
176def get_thread_id(message: Message) -> int:
177    """Get the thread ID of a message.
178
179    If the message is not a reply, return the message ID.
180    """
181    tid = glom(message, "message_thread_id", default=0) or 0
182    if not tid and glom(message, "chat.is_forum", default=False):
183        tid = 1  # this message is sent to `General` topic thread
184    return tid