main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3import asyncio
4import base64
5import hashlib
6import json
7import mimetypes
8import os
9import re
10import smtplib
11import ssl
12from email.headerregistry import Address
13from email.message import EmailMessage
14from io import BytesIO
15from urllib.parse import unquote
16
17import anyio
18from glom import Coalesce, glom
19from loguru import logger
20from pyrogram.client import Client
21from pyrogram.errors import FloodWait
22from pyrogram.types import InputMediaDocument
23
24from ai.utils import remove_consecutive_newlines
25from config import CAPTION_LENGTH, cutter
26from custom.ai_ics import event_detect
27from database.d1 import insert_d1, query_d1
28from database.r2 import set_cf_r2
29from database.turso import insert_statement, turso_exec
30from history.utils import TURSO_KWARGS
31from messages.utils import count_without_entities, remove_img_tag, smart_split
32from networking import hx_req
33from utils import convert2html, convert2md, match_urls
34
35
36# ruff: noqa: PLW2901,RUF001,RUF002,RUF003
37async def dnkt_email(client: Client):
38 """DNKT email."""
39 if os.getenv("DNKT_EMAIL_DISABLED", "0") == "1":
40 return
41 path = os.getenv("DNKT_EMAIL_PATH", "/alist/emails.json")
42 if os.path.exists(path):
43 async with await anyio.open_file(path) as f:
44 emails = json.loads(await f.read())
45 else:
46 logger.warning("从网络下载 emails.json")
47 emails: list[dict] = await hx_req("https://dl.zydou.me/d/emails.json", timeout=60, silent=True) # ty:ignore[invalid-assignment]
48 if not emails:
49 return
50 last_time = await get_last_time()
51 # last_time = "2026-05-11 14:28:21"
52 entries = [x for x in emails if x["ReceivedTime"] > last_time]
53 entries = sorted(entries, key=lambda x: x["ReceivedTime"], reverse=False) # old to latest
54
55 for entry in entries:
56 logger.info(entry["Subject"])
57 await sync_to_d1(entry)
58 await sync_to_turso(entry)
59 caption, html, markdown = await parse_entry(entry)
60 await send_email_notice(client, entry, caption, html, markdown)
61 await forward_email(entry, html, markdown)
62
63
64async def forward_email(
65 entry: dict,
66 html: str,
67 markdown: str,
68 target_email: str = "douzy94@gmail.com",
69 riseup_email: str = os.getenv("RISEUP_EMAIL", ""),
70 riseup_pass: str = os.getenv("RISEUP_PASSWORD", ""),
71 alias_email: str | None = os.getenv("RISEUP_ALIAS"),
72):
73 """解析 .msg 文件并通过 Riseup SMTP 转发给目标邮箱."""
74 if not riseup_email or not riseup_pass:
75 logger.error("Riseup 邮箱或密码为空")
76 return
77
78 try:
79 eml = EmailMessage()
80 eml["Subject"] = entry["Subject"]
81 # 发件人必须是你的 Riseup 邮箱,否则 SMTP 会拒绝投递 (Relay Access Denied)
82 eml["From"] = format_addresses(entry["Sender"], alias_email or riseup_email or "")
83 eml["To"] = format_addresses(entry["To"], target_email)
84 logger.info(f"主题: {entry['Subject']}")
85 logger.info(f"发件人: {entry['Sender']}")
86 logger.info(f"收件人: {entry['To']}")
87
88 eml.set_content(markdown or "")
89 eml.add_alternative(html, subtype="html")
90 # 处理图片附件 (从 Base64 还原)
91 if images := entry.get("Images"):
92 for cid, b64_data in images.items():
93 filename = cid.split("@")[0]
94 try:
95 file_bytes = base64.b64decode(b64_data.split(",")[1]) # 将 base64 字符串解码为原始 bytes
96 ctype, encoding = mimetypes.guess_type(filename)
97 if ctype is None or encoding is not None:
98 ctype = "image/jpeg" # 猜不到就用默认的 JPEG 类型
99 maintype, subtype = ctype.split("/", 1)
100 html_part = glom(eml.get_payload(), "-1")
101 html_part.add_related(file_bytes, maintype=maintype, subtype=subtype, filename=filename, cid=cid)
102 logger.debug(f"成功添加图片附件: {filename}")
103 except Exception as e:
104 logger.error(f"处理图片附件 [{filename}] 时出错: {e}")
105 # 处理普通附件 (从 Base64 还原)
106 if attachments := entry.get("Attachments"):
107 for key, b64_data in attachments.items():
108 filename = re.sub(r"^IDX\d+-", "", key) # 清理文件名前缀,"IDX0-测试文档.pdf" -> "测试文档.pdf"
109 try:
110 file_bytes = base64.b64decode(b64_data) # 将 base64 字符串解码为原始 bytes
111 ctype, encoding = mimetypes.guess_type(filename)
112 if ctype is None or encoding is not None:
113 ctype = "application/octet-stream" # 猜不到就用默认的二进制流类型
114 maintype, subtype = ctype.split("/", 1)
115 # 将附件添加到 eml 对象
116 eml.add_attachment(file_bytes, maintype=maintype, subtype=subtype, filename=filename)
117 logger.debug(f"成功添加普通附件: {filename}")
118 except Exception as e:
119 logger.error(f"处理附件 [{filename}] 时出错: {e}")
120
121 # 处理 ICS 附件
122 if ics := entry.get("ICS"):
123 eml.add_attachment(
124 ics.encode("utf-8"),
125 maintype="text",
126 subtype="calendar",
127 filename="invite.ics",
128 params={"method": "PUBLISH"}, # 使用 PUBLISH 权限更轻,不容易被拦
129 )
130 logger.debug("已检测到会议邀请,成功注入了 ICS 日历组件")
131
132 # 6. 连接 Riseup SMTP 服务器并发送
133 # Riseup 官方 SMTP 要求:SSL/TLS 加密,端口 465
134 smtp_server = "mail.riseup.net"
135 smtp_port = 465
136 context = ssl.create_default_context()
137 with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
138 # server.set_debuglevel(1) # 如果发送失败,取消注释这行可以打印底层的网络通信日志
139 server.login(riseup_email, riseup_pass)
140 server.send_message(eml, from_addr=alias_email or riseup_email, to_addrs=[target_email])
141 logger.success(f"成功将邮件 [{entry['Subject']}] 发送至: {target_email}")
142 except smtplib.SMTPAuthenticationError:
143 logger.error("SMTP 认证失败:请检查你的 Riseup 用户名和密码。")
144 except smtplib.SMTPException as e:
145 logger.error(f"SMTP 通信或发信被拒: {e}")
146 except Exception as e:
147 logger.error(f"邮件重组或发送过程中发生未知错误: {e}")
148
149
150async def send_email_notice(client: Client, entry: dict, caption: str, html: str, markdown: str):
151 try:
152 if await count_without_entities(f"{caption}\n\n{markdown}") < CAPTION_LENGTH:
153 caption = f"{caption}\n\n{markdown}"
154 caption = (await smart_split(caption, CAPTION_LENGTH))[0]
155 fname = entry["Subject"][:64]
156 ics = entry.get("ICS", "")
157 if ics:
158 with BytesIO(html.encode("utf-8")) as f_html, BytesIO(ics.encode("utf-8")) as f_ics:
159 await client.send_media_group(
160 chat_id=-1003768991336,
161 message_thread_id=1204,
162 media=[
163 InputMediaDocument(f_html, caption=caption, file_name=f"{fname}.html"),
164 InputMediaDocument(f_ics, file_name=f"{fname}.ics"),
165 ],
166 )
167 else:
168 with BytesIO(html.encode("utf-8")) as f_html:
169 await client.send_document(
170 chat_id=-1003768991336,
171 message_thread_id=1204,
172 document=f_html,
173 caption=caption,
174 file_name=f"{fname}.html",
175 )
176 await asyncio.sleep(3)
177 except FloodWait as e:
178 logger.warning(e)
179 await asyncio.sleep(e.value) # type: ignore
180 return await send_email_notice(client, entry, caption, html, markdown)
181
182
183async def get_last_time() -> str:
184 resp = await query_d1(sql="SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1", db_name="dnkt", silent=True)
185 last_time = glom(resp, "result.0.results.0.ReceivedTime", default="")
186 if not last_time:
187 params = TURSO_KWARGS | {"db_name": "dnkt"}
188 resp = await turso_exec(
189 [
190 {
191 "type": "execute",
192 "stmt": {"sql": "SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1"},
193 }
194 ],
195 retry=2,
196 silent=True,
197 **params,
198 )
199 last_time = glom(resp, "results.0.response.result.rows.0.0.value", default="")
200 return last_time or "9999-12-31 23:59:59"
201
202
203async def parse_entry(entry: dict) -> tuple[str, str, str]:
204 """Parse entry."""
205 md5_hash = hashlib.md5(f"{entry['ReceivedTime']}-{entry['Subject']}".encode()).hexdigest() # noqa: S324
206 r2_key = f"TTL/365d/{md5_hash}.html" # 这个要与 WinPC的back-emails.py 中的保持一致
207 url = f"https://r2.zydou.me/{r2_key}"
208 # 发布Web用的HTML
209 html = beautify(glom(entry, Coalesce("HTMLBody", "Body")))
210 html = html if html.startswith("<") else convert2html(html)
211 if "<head>" not in html:
212 html = f"<head></head>{html}"
213 if "charset" not in html.lower():
214 html = html.replace("<head>", '<head><meta charset="UTF-8">')
215 for cid, b64 in entry.get("Images", {}).items():
216 html = html.replace(f"cid:{cid}", b64)
217 await set_cf_r2(r2_key, html, mime_type="text/html", silent=True)
218
219 # Markdown
220 email_html = beautify(entry["Body"])
221 markdown = convert2md(html=email_html) if email_html.startswith("<") else email_html
222 markdown, _ = remove_img_tag(markdown)
223 markdown = remove_consecutive_newlines(markdown)
224 markdown = markdown.replace("\\_", "")
225
226 if "Start" not in entry: # 普通邮件
227 event = await event_detect(markdown)
228 if ics := event.get("ics"):
229 entry["ICS"] = ics
230 entry["Start"] = event.get("start_time", "")
231 entry["End"] = event.get("end_time", "")
232 entry["Location"] = event.get("location", "")
233 # 生成caption
234 caption = ""
235 if "Start" not in entry: # 普通邮件
236 caption += f"📧**[{entry['Subject']}]({url})**\n🕒{entry['ReceivedTime']}"
237 if entry["Sender"]:
238 caption += f"\n发件人: {combine_names(entry['Sender'])}"
239 if entry["To"]:
240 caption += f"\n收件人: {combine_names(entry['To'])}"
241 if entry["CC"]:
242 caption += f"\n抄送: {combine_names(entry['CC'])}"
243 else: # 会议邮件
244 caption += f"📅**[{entry['Subject']}]({url})**"
245 if entry.get("IsRecurring"):
246 caption += "🔄"
247 if entry.get("Start"):
248 caption += f"\n▶️{entry['Start']}"
249 if entry.get("End"):
250 caption += f"\n⏸️{entry['End']}"
251 if entry.get("Location"):
252 caption += f"\n📍{combine_names(entry['Location'])}"
253 if entry.get("Sender"):
254 caption += f"\n组织人: {combine_names(entry['Sender'])}"
255 if entry.get("To"):
256 caption += f"\n必需: {combine_names(entry['To'])}"
257 if entry.get("CC"):
258 caption += f"\n可选: {combine_names(entry['CC'])}"
259 if entry.get("IsConflict"):
260 caption += "\n⚠️此会议时间有冲突"
261 return caption.strip(), email_html.strip(), markdown.strip()
262
263
264async def sync_to_d1(entry: dict) -> None:
265 """Sync to D1 database."""
266 # from database.d1 import create_d1_table
267
268 # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
269 # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
270 # await create_d1_table(
271 # "邮件",
272 # columns,
273 # idx_cols=indexes,
274 # fts_on_col="MD5",
275 # fts_index_col="search_column",
276 # db_name="dnkt",
277 # silent=False,
278 # )
279 records = {
280 "Subject": entry.get("Subject", ""),
281 "Sender": combine_names(entry.get("Sender", "")),
282 "ReceivedTime": entry.get("ReceivedTime", ""),
283 "Receiver": combine_names(entry.get("To", "")),
284 "CC": combine_names(entry.get("CC", "")),
285 "StartTime": entry.get("Start", ""),
286 "EndTime": entry.get("End", ""),
287 "Location": combine_names(entry.get("Location", "")),
288 "RRULE": entry.get("RRULE", ""),
289 "Body": entry.get("Body", ""),
290 "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
291 "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
292 }
293 await query_d1(**insert_d1("邮件", records, update_on_conflict="MD5"), db_name="dnkt", silent=True)
294
295
296async def sync_to_turso(entry: dict) -> None:
297 """Sync to Turso database."""
298 params = TURSO_KWARGS | {"db_name": "dnkt"}
299 # from database.turso import turso_create_table
300
301 # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
302 # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
303 # await turso_create_table(
304 # "邮件",
305 # columns,
306 # idx_cols=indexes,
307 # fts_on_col="MD5",
308 # fts_index_col="search_column",
309 # **params,
310 # )
311 records = {
312 "Subject": entry.get("Subject", ""),
313 "Sender": combine_names(entry.get("Sender", "")),
314 "ReceivedTime": entry.get("ReceivedTime", ""),
315 "Receiver": combine_names(entry.get("To", "")),
316 "CC": combine_names(entry.get("CC", "")),
317 "StartTime": entry.get("Start", ""),
318 "EndTime": entry.get("End", ""),
319 "Location": combine_names(entry.get("Location", "")),
320 "RRULE": entry.get("RRULE", ""),
321 "Body": entry.get("Body", ""),
322 "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
323 "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
324 }
325 await turso_exec([insert_statement("邮件", records, update_on_conflict="MD5")], retry=2, silent=True, **params)
326
327
328def md5(s: str) -> int:
329 """Get md5 hash."""
330 h = hashlib.md5(s.encode("utf-8")).hexdigest() # noqa: S324
331 return int(h[:12], 16)
332
333
334def combine_names(texts: str) -> str:
335 """Get raw names from texts.
336
337 Input: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
338 Output: "张三; 李四"
339 """
340 names = texts.split(";")
341 final_names = []
342 for item in names:
343 item = item.strip()
344 if item == "Microsoft Teams 会议" and len(names) > 1:
345 continue
346 english, chinese, _ = parse_email(item)
347 final_names.append(chinese or english)
348 return "; ".join(final_names)
349
350
351def format_addresses(contacts: str | None, email: str = "email@domain.com") -> Address:
352 """Format contacts to Address.
353
354 Example:
355 contacts: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
356 email: "email@domain.com"
357 return: Address(display_name="张三; 李四", addr_spec="email@domain.com")
358 """
359 if not contacts:
360 return Address(addr_spec=email)
361 result_parts = []
362 # 按分号分割(处理存在多个收件人的情况)
363 for contact in contacts.split(";"):
364 english, chinese, _ = parse_email(contact.strip())
365 if chinese:
366 result_parts.append(chinese or english)
367 # 用分号和空格将多个联系人重新拼接起来
368 full_name = "; ".join(result_parts).strip()
369 return Address(display_name=full_name, addr_spec=email)
370
371
372def parse_email(s: str) -> tuple[str, str, str]:
373 """解析邮箱字符串,返回英文名、中文名、邮箱.
374
375 Example:
376 'San Zhang (张三) <zhangsan@example.com>' -> ('San Zhang', '张三', 'zhangsan@example.com')
377 'San Zhang <zhangsan@example.com>' -> ('San Zhang', '', 'zhangsan@example.com')
378 'Li Si (李四)' -> ('Li Si', '李四', '')
379 'Li Si' -> ('Li Si', '', '')
380 '(王五) <wangwu@example.com>' -> ('', '王五', 'wangwu@example.com')
381 """
382 s = s.strip()
383 # 提取邮箱: 匹配 <...>
384 email_match = re.search(r"<(.*?)>", s)
385 email = email_match.group(1).strip() if email_match else ""
386
387 # 提取中文名: 匹配 (...)
388 chinese_match = re.search(r"\((.*?)\)", s)
389 chinese_name = chinese_match.group(1).strip() if chinese_match else ""
390
391 # 提取英文名:
392 # 先移除已匹配的邮箱和中文名部分,剩下的即为英文名
393 english_name = s
394 if email_match:
395 english_name = english_name.replace(email_match.group(0), "")
396 if chinese_match:
397 english_name = english_name.replace(chinese_match.group(0), "")
398
399 def clean(s: str) -> str:
400 """Clean email string."""
401 s = s.strip(" '\"") # 空格和引号
402 s = s.removesuffix(".dnkt")
403 s = s.removesuffix("@kotei.com.cn")
404 return s.removesuffix(".dnkt@cn.denso.com")
405
406 english_name = clean(english_name)
407 return (clean(english_name), clean(chinese_name), email)
408
409
410def beautify(text: str) -> str:
411 if not text:
412 return ""
413 text = remove_tracking(text)
414 text = text.replace("\r\n", "\n")
415 text = text.replace("_" * 8, "")
416 texts = ""
417 for line in text.splitlines():
418 line = line.strip()
419 if line == "Microsoft Teams 会议":
420 continue
421 if line.startswith(("是否需要帮助? ", "对于组织者:")):
422 continue
423 if line.startswith("加入: https://teams.microsoft.com"):
424 line = re.sub(r"<https://.*?teams.microsoft.com.*?>", "", line)
425 texts += line.strip() + "\n"
426 return texts.replace("CONFIDENTIAL<br>", "").replace("CONFIDENTIAL", "")
427
428
429def remove_tracking(text: str) -> str:
430 """Remove tracking info from email."""
431 urls = match_urls(text)
432 for url in urls:
433 if matched := re.match(r"https://.*?safelinks\.protection\.outlook\.com.*?\?url=(.*?)(:?|&|&)data=", url):
434 text = text.replace(url, unquote(matched.group(1)))
435 return text