main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4
5from datetime import datetime
6from zoneinfo import ZoneInfo
7
8from glom import Coalesce, glom
9from loguru import logger
10from pathvalidate import sanitize_filename
11from pyrogram.enums import MessageEntityType
12from pyrogram.types import Chat, Message
13
14from config import TZ, cache
15from others.emoji import CTYPE_EMOJI, MTYPE_EMOJI
16from utils import nowdt
17
18
19def parse_msg(message: Message, *, silent: bool = False, verbose: bool = False, use_cache: bool = True) -> dict:
20 """Parse a message object and return a dictionary of its attributes.
21
22 Abbreviations: c = chat, m = message, u = user
23 """
24 if use_cache and (cached := cache.get(f"parse_msg-{message.chat.id}-{message.id}")):
25 return cached
26 if not silent and verbose:
27 logger.trace(f"{message!r}")
28 mtype = glom(message, "media.value", default="") or "text"
29 ctype = glom(message, "chat.type.name", default="") or ""
30 ctitle = glom(message, "chat.title", default="") or ""
31 chandle = glom(message, "chat.username", default="") or ""
32 uid = glom(message, "from_user.id", default=1) or 1 # uid must > 0
33 cid = glom(message, "chat.id", default=0) or 0
34 tid = get_thread_id(message)
35 mid = glom(message, "id", default=0) or 0
36 media_group_id = glom(message, "media_group_id", default=0) or 0
37 is_bot = glom(message, "from_user.is_bot", default=False)
38 text = message.content
39 dt = message.date.astimezone(ZoneInfo(TZ)) if isinstance(message.date, datetime) else nowdt(TZ)
40 time = f"{dt:%Y-%m-%d %H:%M:%S}"
41 message_url = glom(message, "link", default="") or ""
42 # parse user attributes
43 first_name = glom(message, "from_user.first_name", default="") or ""
44 last_name = glom(message, "from_user.last_name", default="") or ""
45 handle = glom(message, "from_user.username", default="") or ""
46 full_name = f"{first_name} {last_name}".strip()
47
48 # parse reply message
49 reply_uid = glom(message, "reply_to_message.from_user.id", default=1) or 1
50 reply_mid = glom(message, "reply_to_message.id", default=0) or 0
51 reply_text = glom(message, "reply_to_message.content", default="") or ""
52 reply_first_name = glom(message, "reply_to_message.from_user.first_name", default="") or ""
53 reply_last_name = glom(message, "reply_to_message.from_user.last_name", default="") or ""
54 reply_handle = glom(message, "reply_to_message.from_user.username", default="") or ""
55 reply_full_name = f"{reply_first_name} {reply_last_name}".strip()
56
57 # parse forward message
58 forward_origin = message.forward_origin
59 fwd_cid = glom(forward_origin, "chat.id", default=0) or 0
60 fwd_ctype = glom(forward_origin, "chat.type.name", default="") or ""
61 fwd_uid = glom(forward_origin, "sender_user.id", default=1) or 1
62 fwd_handle = glom(forward_origin, Coalesce("sender_user.username", "chat.username"), default="") or ""
63 fwd_first_name = glom(forward_origin, "sender_user.first_name", default="") or ""
64 fwd_last_name = glom(forward_origin, "sender_user.last_name", default="") or ""
65 fwd_full_name = f"{fwd_first_name} {fwd_last_name}".strip() or glom(forward_origin, Coalesce("sender_user_name", "chat.title"), default="") or ""
66
67 # parse media attributes. for photo, we should use `sizes[-1]`. ref: TelegramPlayground/pyrogram @1ea5e797f920776bfeecf985a51dc03ff22906af
68 if mtype == "photo":
69 file_id = glom(message, f"{mtype}.sizes")[-1].file_id
70 file_size = glom(message, f"{mtype}.sizes")[-1].file_size
71 else:
72 file_id = glom(message, f"{mtype}.file_id", default=0) or 0
73 file_size = glom(message, f"{mtype}.file_size", default=0) or 0
74
75 file_name = glom(message, f"{mtype}.file_name", default="") or ""
76 mime_type = glom(message, f"{mtype}.mime_type", default="") or ""
77 duration = glom(message, f"{mtype}.duration", default=0) or 0
78 # Parse URL from message entities
79 entity_urls = []
80 if message.entities:
81 entity_urls.extend(entity.url for entity in message.entities if entity.type == MessageEntityType.TEXT_LINK)
82 if message.caption_entities:
83 entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
84
85 ctype_emoji = CTYPE_EMOJI.get(ctype, "")
86 mtype_emoji = MTYPE_EMOJI.get(mtype, mtype)
87 # log the summary to console
88 summary = ""
89 if ctitle: # group or channel
90 summary += f"{ctype_emoji}{ctitle}[{mid}]"
91
92 if full_name: # private chat
93 summary += f"🤖{full_name}(@{handle})[{uid}]" if is_bot else f"👨{full_name}(@{handle})[{uid}]"
94 summary += f" {mtype_emoji}{mtype}{file_name}".strip()
95 if text:
96 summary += f" 📝{text}"
97 if not silent:
98 logger.info(f"{summary!r}")
99
100 info = { # ensure the type of each field
101 "mtype": str(mtype),
102 "ctype": str(ctype),
103 "ctitle": str(ctitle),
104 "chandle": str(chandle),
105 "uid": int(uid),
106 "cid": int(cid),
107 "tid": int(tid),
108 "mid": int(mid),
109 "media_group_id": int(media_group_id),
110 "is_bot": bool(is_bot),
111 "text": str(text),
112 "quote_text": glom(message, "quote.text", default="") or "",
113 "html": getattr(text, "html", ""),
114 "full_name": str(full_name),
115 "handle": str(handle),
116 "datetime": dt,
117 "time": str(time),
118 "file_name": sanitize_filename(file_name, replacement_text="_"),
119 "file_id": str(file_id),
120 "mime_type": str(mime_type),
121 "file_size": int(file_size),
122 "duration": int(duration),
123 "summary": str(summary),
124 "message_url": str(message_url),
125 "entity_urls": entity_urls,
126 "reply_mid": int(reply_mid),
127 "reply_text": str(reply_text),
128 "reply_uid": int(reply_uid),
129 "reply_handle": str(reply_handle),
130 "reply_full_name": str(reply_full_name),
131 "fwd_cid": int(fwd_cid),
132 "fwd_ctype": str(fwd_ctype),
133 "fwd_uid": int(fwd_uid),
134 "fwd_handle": str(fwd_handle),
135 "fwd_full_name": str(fwd_full_name),
136 }
137 if use_cache:
138 cache.set(f"parse_msg-{message.chat.id}-{message.id}", info, ttl=120) # cache the same msg for 2 minutes
139 return info
140
141
142def parse_chat(chat: Chat, *, use_cache: bool = True) -> dict:
143 """Parse a Chat object and return a dictionary of its attributes.
144
145 Abbreviations: c = chat, m = message, u = user
146 """
147 if use_cache and (cached := cache.get(f"parse_chat-{chat.id}")):
148 return cached
149 ctype = glom(chat, "type.name", default="") or ""
150 ctitle = glom(chat, "title", default="") or ""
151 chandle = glom(chat, "username", default="") or ""
152 cid = glom(chat, "id", default=0) or 0
153
154 # parse user attributes
155 first_name = glom(chat, "first_name", default="") or ""
156 last_name = glom(chat, "last_name", default="") or ""
157 full_name = f"{first_name} {last_name}".strip()
158
159 info = { # ensure the type of each field
160 "cid": int(cid),
161 "ctype": str(ctype),
162 "ctitle": str(ctitle) or str(full_name),
163 "chandle": str(chandle),
164 }
165 if use_cache:
166 cache.set(f"parse_chat-{chat.id}", info, ttl=120) # cache the same msg for 2 minutes
167 return info
168
169
170def get_thread_id(message: Message) -> int:
171 """Get the thread ID of a message.
172
173 If the message is not a reply, return the message ID.
174 """
175 tid = glom(message, "message_thread_id", default=0) or 0
176 if not tid and glom(message, "chat.is_forum", default=False):
177 tid = 1 # this message is sent to `General` topic thread
178 return tid