main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import base64
  5import hashlib
  6import json
  7import mimetypes
  8import os
  9import re
 10import smtplib
 11import ssl
 12from email.headerregistry import Address
 13from email.message import EmailMessage
 14from io import BytesIO
 15from urllib.parse import unquote
 16
 17import anyio
 18from glom import Coalesce, glom
 19from loguru import logger
 20from pyrogram.client import Client
 21from pyrogram.errors import FloodWait
 22from pyrogram.types import InputMediaDocument
 23
 24from ai.utils import remove_consecutive_newlines
 25from config import CAPTION_LENGTH, cutter
 26from custom.ai_ics import event_detect
 27from database.d1 import insert_d1, query_d1
 28from database.r2 import set_cf_r2
 29from database.turso import insert_statement, turso_exec
 30from history.utils import TURSO_KWARGS
 31from messages.utils import count_without_entities, remove_img_tag, smart_split
 32from networking import hx_req
 33from utils import convert2html, convert2md, match_urls
 34
 35
 36# ruff: noqa: PLW2901,RUF001,RUF002,RUF003
 37async def dnkt_email(client: Client):
 38    """DNKT email."""
 39    if os.getenv("DNKT_EMAIL_DISABLED", "0") == "1":
 40        return
 41    path = os.getenv("DNKT_EMAIL_PATH", "/alist/emails.json")
 42    if os.path.exists(path):
 43        async with await anyio.open_file(path) as f:
 44            emails = json.loads(await f.read())
 45    else:
 46        logger.warning("从网络下载 emails.json")
 47        emails: list[dict] = await hx_req("https://dl.zydou.me/d/emails.json", timeout=60, silent=True)  # ty:ignore[invalid-assignment]
 48    if not emails:
 49        return
 50    last_time = await get_last_time()
 51    # last_time = "2026-05-11 14:28:21"
 52    entries = [x for x in emails if x["ReceivedTime"] > last_time]
 53    entries = sorted(entries, key=lambda x: x["ReceivedTime"], reverse=False)  # old to latest
 54
 55    for entry in entries:
 56        logger.info(entry["Subject"])
 57        await sync_to_d1(entry)
 58        await sync_to_turso(entry)
 59        caption, html, markdown = await parse_entry(entry)
 60        await send_email_notice(client, entry, caption, html, markdown)
 61        await forward_email(entry, html, markdown)
 62
 63
 64async def forward_email(
 65    entry: dict,
 66    html: str,
 67    markdown: str,
 68    target_email: str = "douzy94@gmail.com",
 69    riseup_email: str = os.getenv("RISEUP_EMAIL", ""),
 70    riseup_pass: str = os.getenv("RISEUP_PASSWORD", ""),
 71    alias_email: str | None = os.getenv("RISEUP_ALIAS"),
 72):
 73    """解析 .msg 文件并通过 Riseup SMTP 转发给目标邮箱."""
 74    if not riseup_email or not riseup_pass:
 75        logger.error("Riseup 邮箱或密码为空")
 76        return
 77
 78    try:
 79        eml = EmailMessage()
 80        eml["Subject"] = entry["Subject"]
 81        # 发件人必须是你的 Riseup 邮箱,否则 SMTP 会拒绝投递 (Relay Access Denied)
 82        eml["From"] = format_addresses(entry["Sender"], alias_email or riseup_email or "")
 83        eml["To"] = format_addresses(entry["To"], target_email)
 84        logger.info(f"主题: {entry['Subject']}")
 85        logger.info(f"发件人: {entry['Sender']}")
 86        logger.info(f"收件人: {entry['To']}")
 87
 88        eml.set_content(markdown or "")
 89        eml.add_alternative(html, subtype="html")
 90        # 处理图片附件 (从 Base64 还原)
 91        if images := entry.get("Images"):
 92            for cid, b64_data in images.items():
 93                filename = cid.split("@")[0]
 94                try:
 95                    file_bytes = base64.b64decode(b64_data.split(",")[1])  # 将 base64 字符串解码为原始 bytes
 96                    ctype, encoding = mimetypes.guess_type(filename)
 97                    if ctype is None or encoding is not None:
 98                        ctype = "image/jpeg"  # 猜不到就用默认的 JPEG 类型
 99                    maintype, subtype = ctype.split("/", 1)
100                    html_part = glom(eml.get_payload(), "-1")
101                    html_part.add_related(file_bytes, maintype=maintype, subtype=subtype, filename=filename, cid=cid)
102                    logger.debug(f"成功添加图片附件: {filename}")
103                except Exception as e:
104                    logger.error(f"处理图片附件 [{filename}] 时出错: {e}")
105        # 处理普通附件 (从 Base64 还原)
106        if attachments := entry.get("Attachments"):
107            for key, b64_data in attachments.items():
108                filename = re.sub(r"^IDX\d+-", "", key)  # 清理文件名前缀,"IDX0-测试文档.pdf" -> "测试文档.pdf"
109                try:
110                    file_bytes = base64.b64decode(b64_data)  # 将 base64 字符串解码为原始 bytes
111                    ctype, encoding = mimetypes.guess_type(filename)
112                    if ctype is None or encoding is not None:
113                        ctype = "application/octet-stream"  # 猜不到就用默认的二进制流类型
114                    maintype, subtype = ctype.split("/", 1)
115                    # 将附件添加到 eml 对象
116                    eml.add_attachment(file_bytes, maintype=maintype, subtype=subtype, filename=filename)
117                    logger.debug(f"成功添加普通附件: {filename}")
118                except Exception as e:
119                    logger.error(f"处理附件 [{filename}] 时出错: {e}")
120
121        # 处理 ICS 附件
122        if ics := entry.get("ICS"):
123            eml.add_attachment(
124                ics.encode("utf-8"),
125                maintype="text",
126                subtype="calendar",
127                filename="invite.ics",
128                params={"method": "PUBLISH"},  # 使用 PUBLISH 权限更轻,不容易被拦
129            )
130            logger.debug("已检测到会议邀请,成功注入了 ICS 日历组件")
131
132        # 6. 连接 Riseup SMTP 服务器并发送
133        # Riseup 官方 SMTP 要求:SSL/TLS 加密,端口 465
134        smtp_server = "mail.riseup.net"
135        smtp_port = 465
136        context = ssl.create_default_context()
137        with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
138            # server.set_debuglevel(1)  # 如果发送失败,取消注释这行可以打印底层的网络通信日志
139            server.login(riseup_email, riseup_pass)
140            server.send_message(eml, from_addr=alias_email or riseup_email, to_addrs=[target_email])
141        logger.success(f"成功将邮件 [{entry['Subject']}] 发送至: {target_email}")
142    except smtplib.SMTPAuthenticationError:
143        logger.error("SMTP 认证失败:请检查你的 Riseup 用户名和密码。")
144    except smtplib.SMTPException as e:
145        logger.error(f"SMTP 通信或发信被拒: {e}")
146    except Exception as e:
147        logger.error(f"邮件重组或发送过程中发生未知错误: {e}")
148
149
150async def send_email_notice(client: Client, entry: dict, caption: str, html: str, markdown: str):
151    try:
152        if await count_without_entities(f"{caption}\n\n{markdown}") < CAPTION_LENGTH:
153            caption = f"{caption}\n\n{markdown}"
154        caption = (await smart_split(caption, CAPTION_LENGTH))[0]
155        fname = entry["Subject"][:64]
156        ics = entry.get("ICS", "")
157        if ics:
158            with BytesIO(html.encode("utf-8")) as f_html, BytesIO(ics.encode("utf-8")) as f_ics:
159                await client.send_media_group(
160                    chat_id=-1003768991336,
161                    message_thread_id=1204,
162                    media=[
163                        InputMediaDocument(f_html, caption=caption, file_name=f"{fname}.html"),
164                        InputMediaDocument(f_ics, file_name=f"{fname}.ics"),
165                    ],
166                )
167        else:
168            with BytesIO(html.encode("utf-8")) as f_html:
169                await client.send_document(
170                    chat_id=-1003768991336,
171                    message_thread_id=1204,
172                    document=f_html,
173                    caption=caption,
174                    file_name=f"{fname}.html",
175                )
176        await asyncio.sleep(3)
177    except FloodWait as e:
178        logger.warning(e)
179        await asyncio.sleep(e.value)  # type: ignore
180        return await send_email_notice(client, entry, caption, html, markdown)
181
182
183async def get_last_time() -> str:
184    resp = await query_d1(sql="SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1", db_name="dnkt", silent=True)
185    last_time = glom(resp, "result.0.results.0.ReceivedTime", default="")
186    if not last_time:
187        params = TURSO_KWARGS | {"db_name": "dnkt"}
188        resp = await turso_exec(
189            [
190                {
191                    "type": "execute",
192                    "stmt": {"sql": "SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1"},
193                }
194            ],
195            retry=2,
196            silent=True,
197            **params,
198        )
199        last_time = glom(resp, "results.0.response.result.rows.0.0.value", default="")
200    return last_time or "9999-12-31 23:59:59"
201
202
203async def parse_entry(entry: dict) -> tuple[str, str, str]:
204    """Parse entry."""
205    md5_hash = hashlib.md5(f"{entry['ReceivedTime']}-{entry['Subject']}".encode()).hexdigest()  # noqa: S324
206    r2_key = f"TTL/365d/{md5_hash}.html"  # 这个要与 WinPC的back-emails.py 中的保持一致
207    url = f"https://r2.zydou.me/{r2_key}"
208    # 发布Web用的HTML
209    html = beautify(glom(entry, Coalesce("HTMLBody", "Body")))
210    html = html if html.startswith("<") else convert2html(html)
211    if "<head>" not in html:
212        html = f"<head></head>{html}"
213    if "charset" not in html.lower():
214        html = html.replace("<head>", '<head><meta charset="UTF-8">')
215    for cid, b64 in entry.get("Images", {}).items():
216        html = html.replace(f"cid:{cid}", b64)
217    await set_cf_r2(r2_key, html, mime_type="text/html", silent=True)
218
219    # Markdown
220    email_html = beautify(entry["Body"])
221    markdown = convert2md(html=email_html) if email_html.startswith("<") else email_html
222    markdown, _ = remove_img_tag(markdown)
223    markdown = remove_consecutive_newlines(markdown)
224    markdown = markdown.replace("\\_", "")
225
226    if "Start" not in entry:  #  普通邮件
227        event = await event_detect(markdown)
228        if ics := event.get("ics"):
229            entry["ICS"] = ics
230            entry["Start"] = event.get("start_time", "")
231            entry["End"] = event.get("end_time", "")
232            entry["Location"] = event.get("location", "")
233    # 生成caption
234    caption = ""
235    if "Start" not in entry:  #  普通邮件
236        caption += f"📧**[{entry['Subject']}]({url})**\n🕒{entry['ReceivedTime']}"
237        if entry["Sender"]:
238            caption += f"\n发件人: {combine_names(entry['Sender'])}"
239        if entry["To"]:
240            caption += f"\n收件人: {combine_names(entry['To'])}"
241        if entry["CC"]:
242            caption += f"\n抄送: {combine_names(entry['CC'])}"
243    else:  #  会议邮件
244        caption += f"📅**[{entry['Subject']}]({url})**"
245        if entry.get("IsRecurring"):
246            caption += "🔄"
247        if entry.get("Start"):
248            caption += f"\n▶️{entry['Start']}"
249        if entry.get("End"):
250            caption += f"\n⏸️{entry['End']}"
251        if entry.get("Location"):
252            caption += f"\n📍{combine_names(entry['Location'])}"
253        if entry.get("Sender"):
254            caption += f"\n组织人: {combine_names(entry['Sender'])}"
255        if entry.get("To"):
256            caption += f"\n必需: {combine_names(entry['To'])}"
257        if entry.get("CC"):
258            caption += f"\n可选: {combine_names(entry['CC'])}"
259        if entry.get("IsConflict"):
260            caption += "\n⚠️此会议时间有冲突"
261    return caption.strip(), email_html.strip(), markdown.strip()
262
263
264async def sync_to_d1(entry: dict) -> None:
265    """Sync to D1 database."""
266    # from database.d1 import create_d1_table
267
268    # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
269    # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
270    # await create_d1_table(
271    #     "邮件",
272    #     columns,
273    #     idx_cols=indexes,
274    #     fts_on_col="MD5",
275    #     fts_index_col="search_column",
276    #     db_name="dnkt",
277    #     silent=False,
278    # )
279    records = {
280        "Subject": entry.get("Subject", ""),
281        "Sender": combine_names(entry.get("Sender", "")),
282        "ReceivedTime": entry.get("ReceivedTime", ""),
283        "Receiver": combine_names(entry.get("To", "")),
284        "CC": combine_names(entry.get("CC", "")),
285        "StartTime": entry.get("Start", ""),
286        "EndTime": entry.get("End", ""),
287        "Location": combine_names(entry.get("Location", "")),
288        "RRULE": entry.get("RRULE", ""),
289        "Body": entry.get("Body", ""),
290        "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
291        "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
292    }
293    await query_d1(**insert_d1("邮件", records, update_on_conflict="MD5"), db_name="dnkt", silent=True)
294
295
296async def sync_to_turso(entry: dict) -> None:
297    """Sync to Turso database."""
298    params = TURSO_KWARGS | {"db_name": "dnkt"}
299    # from database.turso import turso_create_table
300
301    # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
302    # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
303    # await turso_create_table(
304    #     "邮件",
305    #     columns,
306    #     idx_cols=indexes,
307    #     fts_on_col="MD5",
308    #     fts_index_col="search_column",
309    #     **params,
310    # )
311    records = {
312        "Subject": entry.get("Subject", ""),
313        "Sender": combine_names(entry.get("Sender", "")),
314        "ReceivedTime": entry.get("ReceivedTime", ""),
315        "Receiver": combine_names(entry.get("To", "")),
316        "CC": combine_names(entry.get("CC", "")),
317        "StartTime": entry.get("Start", ""),
318        "EndTime": entry.get("End", ""),
319        "Location": combine_names(entry.get("Location", "")),
320        "RRULE": entry.get("RRULE", ""),
321        "Body": entry.get("Body", ""),
322        "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
323        "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
324    }
325    await turso_exec([insert_statement("邮件", records, update_on_conflict="MD5")], retry=2, silent=True, **params)
326
327
328def md5(s: str) -> int:
329    """Get md5 hash."""
330    h = hashlib.md5(s.encode("utf-8")).hexdigest()  # noqa: S324
331    return int(h[:12], 16)
332
333
334def combine_names(texts: str) -> str:
335    """Get raw names from texts.
336
337    Input: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
338    Output: "张三; 李四"
339    """
340    names = texts.split(";")
341    final_names = []
342    for item in names:
343        item = item.strip()
344        if item == "Microsoft Teams 会议" and len(names) > 1:
345            continue
346        english, chinese, _ = parse_email(item)
347        final_names.append(chinese or english)
348    return "; ".join(final_names)
349
350
351def format_addresses(contacts: str | None, email: str = "email@domain.com") -> Address:
352    """Format contacts to Address.
353
354    Example:
355    contacts: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
356    email: "email@domain.com"
357    return: Address(display_name="张三; 李四", addr_spec="email@domain.com")
358    """
359    if not contacts:
360        return Address(addr_spec=email)
361    result_parts = []
362    # 按分号分割(处理存在多个收件人的情况)
363    for contact in contacts.split(";"):
364        english, chinese, _ = parse_email(contact.strip())
365        if chinese:
366            result_parts.append(chinese or english)
367    # 用分号和空格将多个联系人重新拼接起来
368    full_name = "; ".join(result_parts).strip()
369    return Address(display_name=full_name, addr_spec=email)
370
371
372def parse_email(s: str) -> tuple[str, str, str]:
373    """解析邮箱字符串,返回英文名、中文名、邮箱.
374
375    Example:
376    'San Zhang (张三) <zhangsan@example.com>' -> ('San Zhang', '张三', 'zhangsan@example.com')
377    'San Zhang <zhangsan@example.com>' -> ('San Zhang', '', 'zhangsan@example.com')
378    'Li Si (李四)' -> ('Li Si', '李四', '')
379    'Li Si' -> ('Li Si', '', '')
380    '(王五) <wangwu@example.com>' -> ('', '王五', 'wangwu@example.com')
381    """
382    s = s.strip()
383    # 提取邮箱: 匹配 <...>
384    email_match = re.search(r"<(.*?)>", s)
385    email = email_match.group(1).strip() if email_match else ""
386
387    # 提取中文名: 匹配 (...)
388    chinese_match = re.search(r"\((.*?)\)", s)
389    chinese_name = chinese_match.group(1).strip() if chinese_match else ""
390
391    # 提取英文名:
392    # 先移除已匹配的邮箱和中文名部分,剩下的即为英文名
393    english_name = s
394    if email_match:
395        english_name = english_name.replace(email_match.group(0), "")
396    if chinese_match:
397        english_name = english_name.replace(chinese_match.group(0), "")
398
399    def clean(s: str) -> str:
400        """Clean email string."""
401        s = s.strip(" '\"")  # 空格和引号
402        s = s.removesuffix(".dnkt")
403        s = s.removesuffix("@kotei.com.cn")
404        return s.removesuffix(".dnkt@cn.denso.com")
405
406    english_name = clean(english_name)
407    return (clean(english_name), clean(chinese_name), email)
408
409
410def beautify(text: str) -> str:
411    if not text:
412        return ""
413    text = remove_tracking(text)
414    text = text.replace("\r\n", "\n")
415    text = text.replace("_" * 8, "")
416    texts = ""
417    for line in text.splitlines():
418        line = line.strip()
419        if line == "Microsoft Teams 会议":
420            continue
421        if line.startswith(("是否需要帮助? ", "对于组织者:")):
422            continue
423        if line.startswith("加入: https://teams.microsoft.com"):
424            line = re.sub(r"<https://.*?teams.microsoft.com.*?>", "", line)
425        texts += line.strip() + "\n"
426    return texts.replace("CONFIDENTIAL<br>", "").replace("CONFIDENTIAL", "")
427
428
429def remove_tracking(text: str) -> str:
430    """Remove tracking info from email."""
431    urls = match_urls(text)
432    for url in urls:
433        if matched := re.match(r"https://.*?safelinks\.protection\.outlook\.com.*?\?url=(.*?)(:?|&amp;|&)data=", url):
434            text = text.replace(url, unquote(matched.group(1)))
435    return text