main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3import asyncio
4import hashlib
5import os
6import re
7import smtplib
8import ssl
9from email.headerregistry import Address
10from email.message import EmailMessage
11from io import BytesIO
12from urllib.parse import quote
13
14from glom import glom
15from loguru import logger
16from pyrogram.client import Client
17from pyrogram.errors import FloodWait
18from pyrogram.types import InputMediaDocument
19
20from ai.utils import remove_consecutive_newlines
21from config import CAPTION_LENGTH, DB, cutter
22from database.d1 import insert_d1, query_d1
23from database.r2 import set_cf_r2
24from database.turso import insert_statement, turso_exec
25from history.utils import TURSO_KWARGS
26from messages.utils import count_without_entities, remove_img_tag, smart_split
27from networking import hx_req
28from utils import convert_html, convert_md
29
30
31# ruff: noqa: PLW2901,RUF001,RUF002,RUF003
32async def dnkt_email(client: Client):
33 """DNKT email."""
34 if os.getenv("DNKT_EMAIL_DISABLED", "0") == "1":
35 return
36 emails: list[dict] = await hx_req("https://dl.zydou.me/d/emails.json", timeout=60, silent=True) # ty:ignore[invalid-assignment]
37
38 if not emails:
39 return
40 last_time = await get_last_time()
41 entries = [x for x in emails if x["ReceivedTime"] > last_time]
42 entries = sorted(entries, key=lambda x: x["ReceivedTime"], reverse=False) # old to latest
43
44 for entry in entries:
45 logger.info(entry["Subject"])
46 await sync_to_d1(entry)
47 await sync_to_turso(entry)
48 caption, html, markdown = await parse_entry(entry)
49 await send_email_notice(client, entry, caption, html, markdown)
50 await forward_email(entry, html, markdown)
51
52
53async def forward_email(
54 entry: dict,
55 html: str,
56 markdown: str,
57 target_email: str = "douzy94@gmail.com",
58 riseup_email: str = os.getenv("RISEUP_EMAIL", ""),
59 riseup_pass: str = os.getenv("RISEUP_PASSWORD", ""),
60 alias_email: str | None = os.getenv("RISEUP_ALIAS"),
61):
62 """解析 .msg 文件并通过 Riseup SMTP 转发给目标邮箱."""
63 if not riseup_email or not riseup_pass:
64 logger.error("Riseup 邮箱或密码为空")
65 return
66
67 try:
68 eml = EmailMessage()
69 eml["Subject"] = entry["Subject"]
70 # 发件人必须是你的 Riseup 邮箱,否则 SMTP 会拒绝投递 (Relay Access Denied)
71 eml["From"] = format_addresses(entry["Sender"], alias_email or riseup_email or "")
72 eml["To"] = format_addresses(entry["To"], target_email)
73 logger.info(f"主题: {entry['Subject']}")
74 logger.info(f"发件人: {entry['Sender']}")
75 logger.info(f"收件人: {entry['To']}")
76
77 eml.set_content(markdown or "")
78 eml.add_alternative(html, subtype="html")
79
80 # 处理 ICS 附件
81 if ics := entry.get("ICS"):
82 eml.add_attachment(
83 ics.encode("utf-8"),
84 maintype="text",
85 subtype="calendar",
86 filename="invite.ics",
87 params={"method": "PUBLISH"}, # 使用 PUBLISH 权限更轻,不容易被拦
88 )
89 logger.debug("已检测到会议邀请,成功注入了 ICS 日历组件")
90
91 # 6. 连接 Riseup SMTP 服务器并发送
92 # Riseup 官方 SMTP 要求:SSL/TLS 加密,端口 465
93 smtp_server = "mail.riseup.net"
94 smtp_port = 465
95 context = ssl.create_default_context()
96 with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
97 # server.set_debuglevel(1) # 如果发送失败,取消注释这行可以打印底层的网络通信日志
98 server.login(riseup_email, riseup_pass)
99 server.send_message(eml, from_addr=alias_email or riseup_email, to_addrs=[target_email])
100 logger.success(f"成功将邮件 [{entry['Subject']}] 发送至: {target_email}")
101 except smtplib.SMTPAuthenticationError:
102 logger.error("SMTP 认证失败:请检查你的 Riseup 用户名和密码。")
103 except smtplib.SMTPException as e:
104 logger.error(f"SMTP 通信或发信被拒: {e}")
105 except Exception as e:
106 logger.error(f"邮件重组或发送过程中发生未知错误: {e}")
107
108
109async def send_email_notice(client: Client, entry: dict, caption: str, html: str, markdown: str):
110 try:
111 if await count_without_entities(f"{caption}\n\n{markdown}") < CAPTION_LENGTH:
112 caption = f"{caption}\n\n{markdown}"
113 caption = (await smart_split(caption, CAPTION_LENGTH))[0]
114 fname = entry["Subject"][:64]
115 ics = entry.get("ICS", "")
116 media = []
117 with BytesIO(markdown.encode("utf-8")) as f_md, BytesIO(html.encode("utf-8")) as f_html, BytesIO(ics.encode("utf-8")) as f_ics:
118 media.append(InputMediaDocument(f_md, file_name=f"{fname}.txt"))
119 media.append(InputMediaDocument(f_html, caption=caption, file_name=f"{fname}.html"))
120 if ics:
121 media.append(InputMediaDocument(f_ics, file_name=f"{fname}.ics"))
122 await client.send_media_group(chat_id=-1003768991336, message_thread_id=1204, media=media)
123 await asyncio.sleep(3)
124 except FloodWait as e:
125 logger.warning(e)
126 await asyncio.sleep(e.value) # type: ignore
127 return await send_email_notice(client, entry, caption, html, markdown)
128
129
130async def get_last_time() -> str:
131 resp = await query_d1(sql="SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1", db_name="dnkt", silent=True)
132 last_time = glom(resp, "result.0.results.0.ReceivedTime", default="")
133 if not last_time:
134 params = TURSO_KWARGS | {"db_name": "dnkt"}
135 resp = await turso_exec(
136 [
137 {
138 "type": "execute",
139 "stmt": {"sql": "SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1"},
140 }
141 ],
142 retry=2,
143 silent=True,
144 **params,
145 )
146 last_time = glom(resp, "results.0.response.result.rows.0.0.value", default="")
147 return last_time or "9999-12-31 23:59:59"
148
149
150async def parse_entry(entry: dict) -> tuple[str, str, str]:
151 """Parse entry."""
152 caption = ""
153 r2_key = f"TTL/365d/{entry['ReceivedTime']}-{entry['Subject']}.html"
154 url = f"{DB.CF_R2_PUBLIC_URL.rstrip('/')}/{quote(r2_key)}"
155 if "Start" not in entry: # 普通邮件
156 caption += f"📧**[{entry['Subject']}]({url})**\n🕒{entry['ReceivedTime']}"
157 if entry["Sender"]:
158 caption += f"\n发件人: {combine_names(entry['Sender'])}"
159 if entry["To"]:
160 caption += f"\n收件人: {combine_names(entry['To'])}"
161 if entry["CC"]:
162 caption += f"\n抄送: {combine_names(entry['CC'])}"
163 else: # 会议邮件
164 caption += f"📅**[{entry['Subject']}]({url})**"
165 if entry.get("IsRecurring"):
166 caption += "🔄"
167 if entry.get("Start"):
168 caption += f"\n▶️{entry['Start']}"
169 if entry.get("End"):
170 caption += f"\n⏸️{entry['End']}"
171 if entry.get("Location"):
172 caption += f"\n📍{combine_names(entry['Location'])}"
173 if entry.get("Sender"):
174 caption += f"\n组织人: {combine_names(entry['Sender'])}"
175 if entry.get("To"):
176 caption += f"\n必需: {combine_names(entry['To'])}"
177 if entry.get("CC"):
178 caption += f"\n可选: {combine_names(entry['CC'])}"
179 if entry.get("IsConflict"):
180 caption += "\n⚠️此会议时间有冲突"
181 body = entry["Body"].replace("\r\n", "\n")
182 body = body.replace("CONFIDENTIAL<br>", "").replace("CONFIDENTIAL", "")
183 html = body if body.startswith("<") else convert_html(body)
184 if "<head>" not in html:
185 html = f"<head></head>{html}"
186 html = html.replace("\n", "<br>")
187 html = html.replace("<head>", '<head><meta charset="UTF-8">')
188 markdown = convert_md(html=body) if body.startswith("<") else body
189 markdown, _ = remove_img_tag(markdown)
190 markdown = remove_consecutive_newlines(markdown)
191 markdown = markdown.replace("\\_", "")
192 for cid, b64 in entry.get("Images", {}).items():
193 html = html.replace(f"cid:{cid}", b64)
194 await set_cf_r2(r2_key, html, mime_type="text/html")
195 return caption, html, markdown
196
197
198async def sync_to_d1(entry: dict) -> None:
199 """Sync to D1 database."""
200 # from database.d1 import create_d1_table
201
202 # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, BCC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, Attachments TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
203 # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
204 # await create_d1_table(
205 # "邮件",
206 # columns,
207 # idx_cols=indexes,
208 # fts_on_col="MD5",
209 # fts_index_col="search_column",
210 # db_name="dnkt",
211 # silent=False,
212 # )
213 records = {
214 "Subject": entry.get("Subject", ""),
215 "Sender": combine_names(entry.get("Sender", "")),
216 "ReceivedTime": entry.get("ReceivedTime", ""),
217 "Receiver": combine_names(entry.get("To", "")),
218 "CC": combine_names(entry.get("CC", "")),
219 "BCC": combine_names(entry.get("BCC", "")),
220 "StartTime": entry.get("Start", ""),
221 "EndTime": entry.get("End", ""),
222 "Location": combine_names(entry.get("Location", "")),
223 "Attachments": entry.get("Attachments", ""),
224 "RRULE": entry.get("RRULE", ""),
225 "Body": entry.get("Body", ""),
226 "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
227 "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
228 }
229 await query_d1(**insert_d1("邮件", records, update_on_conflict="MD5"), db_name="dnkt", silent=True)
230
231
232async def sync_to_turso(entry: dict) -> None:
233 """Sync to Turso database."""
234 params = TURSO_KWARGS | {"db_name": "dnkt"}
235 # from database.turso import turso_create_table
236
237 # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, BCC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, Attachments TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
238 # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
239 # await turso_create_table(
240 # "邮件",
241 # columns,
242 # idx_cols=indexes,
243 # fts_on_col="MD5",
244 # fts_index_col="search_column",
245 # **params,
246 # )
247 records = {
248 "Subject": entry.get("Subject", ""),
249 "Sender": combine_names(entry.get("Sender", "")),
250 "ReceivedTime": entry.get("ReceivedTime", ""),
251 "Receiver": combine_names(entry.get("To", "")),
252 "CC": combine_names(entry.get("CC", "")),
253 "BCC": combine_names(entry.get("BCC", "")),
254 "StartTime": entry.get("Start", ""),
255 "EndTime": entry.get("End", ""),
256 "Location": combine_names(entry.get("Location", "")),
257 "Attachments": entry.get("Attachments", ""),
258 "RRULE": entry.get("RRULE", ""),
259 "Body": entry.get("Body", ""),
260 "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
261 "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
262 }
263 await turso_exec([insert_statement("邮件", records, update_on_conflict="MD5")], retry=2, silent=True, **params)
264
265
266def md5(s: str) -> int:
267 """Get md5 hash."""
268 h = hashlib.md5(s.encode("utf-8")).hexdigest() # noqa: S324
269 return int(h[:12], 16)
270
271
272def combine_names(texts: str) -> str:
273 """Get raw names from texts.
274
275 Input: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
276 Output: "张三; 李四"
277 """
278 names = texts.split(";")
279 final_names = []
280 for item in names:
281 item = item.strip()
282 if item == "Microsoft Teams 会议" and len(names) > 1:
283 continue
284 english, chinese, _ = parse_email(item)
285 final_names.append(chinese or english)
286 return "; ".join(final_names)
287
288
289def format_addresses(contacts: str | None, email: str = "email@domain.com") -> Address:
290 """Format contacts to Address.
291
292 Example:
293 contacts: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
294 email: "email@domain.com"
295 return: Address(display_name="张三; 李四", addr_spec="email@domain.com")
296 """
297 if not contacts:
298 return Address(addr_spec=email)
299 result_parts = []
300 # 按分号分割(处理存在多个收件人的情况)
301 for contact in contacts.split(";"):
302 english, chinese, _ = parse_email(contact.strip())
303 if chinese:
304 result_parts.append(chinese or english)
305 # 用分号和空格将多个联系人重新拼接起来
306 full_name = "; ".join(result_parts).strip()
307 return Address(display_name=full_name, addr_spec=email)
308
309
310def parse_email(s: str) -> tuple[str, str, str]:
311 """解析邮箱字符串,返回英文名、中文名、邮箱.
312
313 Example:
314 'San Zhang (张三) <zhangsan@example.com>' -> ('San Zhang', '张三', 'zhangsan@example.com')
315 'San Zhang <zhangsan@example.com>' -> ('San Zhang', '', 'zhangsan@example.com')
316 'Li Si (李四)' -> ('Li Si', '李四', '')
317 'Li Si' -> ('Li Si', '', '')
318 '(王五) <wangwu@example.com>' -> ('', '王五', 'wangwu@example.com')
319 """
320 s = s.strip()
321 # 提取邮箱: 匹配 <...>
322 email_match = re.search(r"<(.*?)>", s)
323 email = email_match.group(1).strip() if email_match else ""
324
325 # 提取中文名: 匹配 (...)
326 chinese_match = re.search(r"\((.*?)\)", s)
327 chinese_name = chinese_match.group(1).strip() if chinese_match else ""
328
329 # 提取英文名:
330 # 先移除已匹配的邮箱和中文名部分,剩下的即为英文名
331 english_name = s
332 if email_match:
333 english_name = english_name.replace(email_match.group(0), "")
334 if chinese_match:
335 english_name = english_name.replace(chinese_match.group(0), "")
336
337 # 清理英文名两端的空格和特殊符号
338 english_name = english_name.strip()
339 return (english_name, chinese_name, email)