main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4
5from datetime import datetime
6from zoneinfo import ZoneInfo
7
8from glom import Coalesce, glom
9from loguru import logger
10from pathvalidate import sanitize_filename
11from pyrogram.enums import MessageEntityType
12from pyrogram.types import Chat, Message
13
14from config import TZ, cache
15from others.emoji import CTYPE_EMOJI, MTYPE_EMOJI
16from utils import nowdt, slim_cid
17
18
19def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False, use_cache: bool = True) -> dict:
20 """Parse a message object and return a dictionary of its attributes.
21
22 Abbreviations: c = chat, m = message, u = user
23 """
24 if use_cache and (cached := cache.get(f"parse_msg-{message.chat.id}-{message.id}")):
25 return cached
26 if not silent and verbose:
27 logger.trace(f"{message!r}")
28 mtype = glom(message, "media.value", default="") or "text"
29 ctype = glom(message, "chat.type.name", default="") or ""
30 ctitle = glom(message, "chat.title", default="") or ""
31 chandle = glom(message, "chat.username", default="") or ""
32 uid = glom(message, "from_user.id", default=1) or 1 # uid must > 0
33 cid = glom(message, "chat.id", default=0) or 0
34 tid = get_thread_id(message)
35 mid = glom(message, "id", default=0) or 0
36 media_group_id = glom(message, "media_group_id", default=0) or 0
37 is_bot = glom(message, "from_user.is_bot", default=False)
38 text = message.content
39 dt = message.date.astimezone(ZoneInfo(TZ)) if isinstance(message.date, datetime) else nowdt(TZ)
40 time = f"{dt:%Y-%m-%d %H:%M:%S}"
41
42 # parse msg link
43 if chandle:
44 message_url = f"https://t.me/{chandle}/{tid}/{mid}" if tid else f"https://t.me/{chandle}/{mid}"
45 else:
46 message_url = f"https://t.me/c/{slim_cid(cid)}/{tid}/{mid}" if tid else f"https://t.me/c/{slim_cid(cid)}/{mid}"
47
48 # parse user attributes
49 first_name = glom(message, "from_user.first_name", default="") or ""
50 last_name = glom(message, "from_user.last_name", default="") or ""
51 handle = glom(message, "from_user.username", default="") or ""
52 full_name = f"{first_name} {last_name}".strip()
53
54 # parse reply message
55 reply_uid = glom(message, "reply_to_message.from_user.id", default=1) or 1
56 reply_mid = glom(message, "reply_to_message.id", default=0) or 0
57 reply_text = glom(message, "reply_to_message.content", default="") or ""
58 reply_first_name = glom(message, "reply_to_message.from_user.first_name", default="") or ""
59 reply_last_name = glom(message, "reply_to_message.from_user.last_name", default="") or ""
60 reply_handle = glom(message, "reply_to_message.from_user.username", default="") or ""
61 reply_full_name = f"{reply_first_name} {reply_last_name}".strip()
62
63 # parse forward message
64 forward_origin = message.forward_origin
65 fwd_cid = glom(forward_origin, "chat.id", default=0) or 0
66 fwd_ctype = glom(forward_origin, "chat.type.name", default="") or ""
67 fwd_uid = glom(forward_origin, "sender_user.id", default=1) or 1
68 fwd_handle = glom(forward_origin, Coalesce("sender_user.username", "chat.username"), default="") or ""
69 fwd_first_name = glom(forward_origin, "sender_user.first_name", default="") or ""
70 fwd_last_name = glom(forward_origin, "sender_user.last_name", default="") or ""
71 fwd_full_name = f"{fwd_first_name} {fwd_last_name}".strip() or glom(forward_origin, Coalesce("sender_user_name", "chat.title"), default="") or ""
72
73 # parse media attributes. for photo, we should use `sizes[-1]`. ref: TelegramPlayground/pyrogram @1ea5e797f920776bfeecf985a51dc03ff22906af
74 if mtype == "photo":
75 file_id = glom(message, f"{mtype}.sizes")[-1].file_id
76 file_size = glom(message, f"{mtype}.sizes")[-1].file_size
77 else:
78 file_id = glom(message, f"{mtype}.file_id", default=0) or 0
79 file_size = glom(message, f"{mtype}.file_size", default=0) or 0
80
81 file_name = glom(message, f"{mtype}.file_name", default="") or ""
82 mime_type = glom(message, f"{mtype}.mime_type", default="") or ""
83 duration = glom(message, f"{mtype}.duration", default=0) or 0
84 # Parse URL from message entities
85 entity_urls = []
86 if message.entities:
87 entity_urls.extend(entity.url for entity in message.entities if entity.type == MessageEntityType.TEXT_LINK)
88 if message.caption_entities:
89 entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
90
91 ctype_emoji = CTYPE_EMOJI.get(ctype, "")
92 mtype_emoji = MTYPE_EMOJI.get(mtype, mtype)
93 # log the summary to console
94 summary = ""
95 if ctitle: # group or channel
96 summary += f"{ctype_emoji}{ctitle}[{mid}]"
97
98 if full_name: # private chat
99 summary += f"🤖{full_name}(@{handle})[{uid}]" if is_bot else f"👨{full_name}(@{handle})[{uid}]"
100 summary += f" {mtype_emoji}{mtype}{file_name}".strip()
101 if text:
102 summary += f" 📝{text}"
103 if not silent:
104 logger.info(f"{summary!r}")
105
106 info = { # ensure the type of each field
107 "mtype": str(mtype),
108 "ctype": str(ctype),
109 "ctitle": str(ctitle),
110 "chandle": str(chandle),
111 "uid": int(uid),
112 "cid": int(cid),
113 "tid": int(tid),
114 "mid": int(mid),
115 "media_group_id": int(media_group_id),
116 "is_bot": bool(is_bot),
117 "text": str(text),
118 "quote_text": glom(message, "quote.text", default="") or "",
119 "html": getattr(text, "html", ""),
120 "full_name": str(full_name),
121 "handle": str(handle),
122 "datetime": dt,
123 "time": str(time),
124 "file_name": sanitize_filename(file_name, replacement_text="_"),
125 "file_id": str(file_id),
126 "mime_type": str(mime_type),
127 "file_size": int(file_size),
128 "duration": int(duration),
129 "summary": str(summary),
130 "message_url": str(message_url),
131 "entity_urls": entity_urls,
132 "reply_mid": int(reply_mid),
133 "reply_text": str(reply_text),
134 "reply_uid": int(reply_uid),
135 "reply_handle": str(reply_handle),
136 "reply_full_name": str(reply_full_name),
137 "fwd_cid": int(fwd_cid),
138 "fwd_ctype": str(fwd_ctype),
139 "fwd_uid": int(fwd_uid),
140 "fwd_handle": str(fwd_handle),
141 "fwd_full_name": str(fwd_full_name),
142 }
143 if use_cache:
144 cache.set(f"parse_msg-{message.chat.id}-{message.id}", info, ttl=120) # cache the same msg for 2 minutes
145 return info
146
147
148def parse_chat(chat: Chat, *, use_cache: bool = True) -> dict:
149 """Parse a Chat object and return a dictionary of its attributes.
150
151 Abbreviations: c = chat, m = message, u = user
152 """
153 if use_cache and (cached := cache.get(f"parse_chat-{chat.id}")):
154 return cached
155 ctype = glom(chat, "type.name", default="") or ""
156 ctitle = glom(chat, "title", default="") or ""
157 chandle = glom(chat, "username", default="") or ""
158 cid = glom(chat, "id", default=0) or 0
159
160 # parse user attributes
161 first_name = glom(chat, "first_name", default="") or ""
162 last_name = glom(chat, "last_name", default="") or ""
163 full_name = f"{first_name} {last_name}".strip()
164
165 info = { # ensure the type of each field
166 "cid": int(cid),
167 "ctype": str(ctype),
168 "ctitle": str(ctitle) or str(full_name),
169 "chandle": str(chandle),
170 }
171 if use_cache:
172 cache.set(f"parse_chat-{chat.id}", info, ttl=120) # cache the same msg for 2 minutes
173 return info
174
175
176def get_thread_id(message: Message) -> int:
177 """Get the thread ID of a message.
178
179 If the message is not a reply, return the message ID.
180 """
181 tid = glom(message, "message_thread_id", default=0) or 0
182 if not tid and glom(message, "chat.is_forum", default=False):
183 tid = 1 # this message is sent to `General` topic thread
184 return tid