main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import hashlib
  5import os
  6import re
  7import smtplib
  8import ssl
  9from email.headerregistry import Address
 10from email.message import EmailMessage
 11from io import BytesIO
 12from urllib.parse import quote
 13
 14from glom import glom
 15from loguru import logger
 16from pyrogram.client import Client
 17from pyrogram.errors import FloodWait
 18from pyrogram.types import InputMediaDocument
 19
 20from ai.utils import remove_consecutive_newlines
 21from config import CAPTION_LENGTH, DB, cutter
 22from database.d1 import insert_d1, query_d1
 23from database.r2 import set_cf_r2
 24from database.turso import insert_statement, turso_exec
 25from history.utils import TURSO_KWARGS
 26from messages.utils import count_without_entities, remove_img_tag, smart_split
 27from networking import hx_req
 28from utils import convert_html, convert_md
 29
 30
 31# ruff: noqa: PLW2901,RUF001,RUF002,RUF003
 32async def dnkt_email(client: Client):
 33    """DNKT email."""
 34    if os.getenv("DNKT_EMAIL_DISABLED", "0") == "1":
 35        return
 36    emails: list[dict] = await hx_req("https://dl.zydou.me/d/emails.json", timeout=60, silent=True)  # ty:ignore[invalid-assignment]
 37
 38    if not emails:
 39        return
 40    last_time = await get_last_time()
 41    entries = [x for x in emails if x["ReceivedTime"] > last_time]
 42    entries = sorted(entries, key=lambda x: x["ReceivedTime"], reverse=False)  # old to latest
 43
 44    for entry in entries:
 45        logger.info(entry["Subject"])
 46        await sync_to_d1(entry)
 47        await sync_to_turso(entry)
 48        caption, html, markdown = await parse_entry(entry)
 49        await send_email_notice(client, entry, caption, html, markdown)
 50        await forward_email(entry, html, markdown)
 51
 52
 53async def forward_email(
 54    entry: dict,
 55    html: str,
 56    markdown: str,
 57    target_email: str = "douzy94@gmail.com",
 58    riseup_email: str = os.getenv("RISEUP_EMAIL", ""),
 59    riseup_pass: str = os.getenv("RISEUP_PASSWORD", ""),
 60    alias_email: str | None = os.getenv("RISEUP_ALIAS"),
 61):
 62    """解析 .msg 文件并通过 Riseup SMTP 转发给目标邮箱."""
 63    if not riseup_email or not riseup_pass:
 64        logger.error("Riseup 邮箱或密码为空")
 65        return
 66
 67    try:
 68        eml = EmailMessage()
 69        eml["Subject"] = entry["Subject"]
 70        # 发件人必须是你的 Riseup 邮箱,否则 SMTP 会拒绝投递 (Relay Access Denied)
 71        eml["From"] = format_addresses(entry["Sender"], alias_email or riseup_email or "")
 72        eml["To"] = format_addresses(entry["To"], target_email)
 73        logger.info(f"主题: {entry['Subject']}")
 74        logger.info(f"发件人: {entry['Sender']}")
 75        logger.info(f"收件人: {entry['To']}")
 76
 77        eml.set_content(markdown or "")
 78        eml.add_alternative(html, subtype="html")
 79
 80        # 处理 ICS 附件
 81        if ics := entry.get("ICS"):
 82            eml.add_attachment(
 83                ics.encode("utf-8"),
 84                maintype="text",
 85                subtype="calendar",
 86                filename="invite.ics",
 87                params={"method": "PUBLISH"},  # 使用 PUBLISH 权限更轻,不容易被拦
 88            )
 89            logger.debug("已检测到会议邀请,成功注入了 ICS 日历组件")
 90
 91        # 6. 连接 Riseup SMTP 服务器并发送
 92        # Riseup 官方 SMTP 要求:SSL/TLS 加密,端口 465
 93        smtp_server = "mail.riseup.net"
 94        smtp_port = 465
 95        context = ssl.create_default_context()
 96        with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
 97            # server.set_debuglevel(1)  # 如果发送失败,取消注释这行可以打印底层的网络通信日志
 98            server.login(riseup_email, riseup_pass)
 99            server.send_message(eml, from_addr=alias_email or riseup_email, to_addrs=[target_email])
100        logger.success(f"成功将邮件 [{entry['Subject']}] 发送至: {target_email}")
101    except smtplib.SMTPAuthenticationError:
102        logger.error("SMTP 认证失败:请检查你的 Riseup 用户名和密码。")
103    except smtplib.SMTPException as e:
104        logger.error(f"SMTP 通信或发信被拒: {e}")
105    except Exception as e:
106        logger.error(f"邮件重组或发送过程中发生未知错误: {e}")
107
108
109async def send_email_notice(client: Client, entry: dict, caption: str, html: str, markdown: str):
110    try:
111        if await count_without_entities(f"{caption}\n\n{markdown}") < CAPTION_LENGTH:
112            caption = f"{caption}\n\n{markdown}"
113        caption = (await smart_split(caption, CAPTION_LENGTH))[0]
114        fname = entry["Subject"][:64]
115        ics = entry.get("ICS", "")
116        media = []
117        with BytesIO(markdown.encode("utf-8")) as f_md, BytesIO(html.encode("utf-8")) as f_html, BytesIO(ics.encode("utf-8")) as f_ics:
118            media.append(InputMediaDocument(f_md, file_name=f"{fname}.txt"))
119            media.append(InputMediaDocument(f_html, caption=caption, file_name=f"{fname}.html"))
120            if ics:
121                media.append(InputMediaDocument(f_ics, file_name=f"{fname}.ics"))
122            await client.send_media_group(chat_id=-1003768991336, message_thread_id=1204, media=media)
123        await asyncio.sleep(3)
124    except FloodWait as e:
125        logger.warning(e)
126        await asyncio.sleep(e.value)  # type: ignore
127        return await send_email_notice(client, entry, caption, html, markdown)
128
129
130async def get_last_time() -> str:
131    resp = await query_d1(sql="SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1", db_name="dnkt", silent=True)
132    last_time = glom(resp, "result.0.results.0.ReceivedTime", default="")
133    if not last_time:
134        params = TURSO_KWARGS | {"db_name": "dnkt"}
135        resp = await turso_exec(
136            [
137                {
138                    "type": "execute",
139                    "stmt": {"sql": "SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1"},
140                }
141            ],
142            retry=2,
143            silent=True,
144            **params,
145        )
146        last_time = glom(resp, "results.0.response.result.rows.0.0.value", default="")
147    return last_time or "9999-12-31 23:59:59"
148
149
150async def parse_entry(entry: dict) -> tuple[str, str, str]:
151    """Parse entry."""
152    caption = ""
153    r2_key = f"TTL/365d/{entry['ReceivedTime']}-{entry['Subject']}.html"
154    url = f"{DB.CF_R2_PUBLIC_URL.rstrip('/')}/{quote(r2_key)}"
155    if "Start" not in entry:  #  普通邮件
156        caption += f"📧**[{entry['Subject']}]({url})**\n🕒{entry['ReceivedTime']}"
157        if entry["Sender"]:
158            caption += f"\n发件人: {combine_names(entry['Sender'])}"
159        if entry["To"]:
160            caption += f"\n收件人: {combine_names(entry['To'])}"
161        if entry["CC"]:
162            caption += f"\n抄送: {combine_names(entry['CC'])}"
163    else:  #  会议邮件
164        caption += f"📅**[{entry['Subject']}]({url})**"
165        if entry.get("IsRecurring"):
166            caption += "🔄"
167        if entry.get("Start"):
168            caption += f"\n▶️{entry['Start']}"
169        if entry.get("End"):
170            caption += f"\n⏸️{entry['End']}"
171        if entry.get("Location"):
172            caption += f"\n📍{combine_names(entry['Location'])}"
173        if entry.get("Sender"):
174            caption += f"\n组织人: {combine_names(entry['Sender'])}"
175        if entry.get("To"):
176            caption += f"\n必需: {combine_names(entry['To'])}"
177        if entry.get("CC"):
178            caption += f"\n可选: {combine_names(entry['CC'])}"
179        if entry.get("IsConflict"):
180            caption += "\n⚠️此会议时间有冲突"
181    body = entry["Body"].replace("\r\n", "\n")
182    body = body.replace("CONFIDENTIAL<br>", "").replace("CONFIDENTIAL", "")
183    html = body if body.startswith("<") else convert_html(body)
184    if "<head>" not in html:
185        html = f"<head></head>{html}"
186    html = html.replace("\n", "<br>")
187    html = html.replace("<head>", '<head><meta charset="UTF-8">')
188    markdown = convert_md(html=body) if body.startswith("<") else body
189    markdown, _ = remove_img_tag(markdown)
190    markdown = remove_consecutive_newlines(markdown)
191    markdown = markdown.replace("\\_", "")
192    for cid, b64 in entry.get("Images", {}).items():
193        html = html.replace(f"cid:{cid}", b64)
194    await set_cf_r2(r2_key, html, mime_type="text/html")
195    return caption, html, markdown
196
197
198async def sync_to_d1(entry: dict) -> None:
199    """Sync to D1 database."""
200    # from database.d1 import create_d1_table
201
202    # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, BCC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, Attachments TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
203    # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
204    # await create_d1_table(
205    #     "邮件",
206    #     columns,
207    #     idx_cols=indexes,
208    #     fts_on_col="MD5",
209    #     fts_index_col="search_column",
210    #     db_name="dnkt",
211    #     silent=False,
212    # )
213    records = {
214        "Subject": entry.get("Subject", ""),
215        "Sender": combine_names(entry.get("Sender", "")),
216        "ReceivedTime": entry.get("ReceivedTime", ""),
217        "Receiver": combine_names(entry.get("To", "")),
218        "CC": combine_names(entry.get("CC", "")),
219        "BCC": combine_names(entry.get("BCC", "")),
220        "StartTime": entry.get("Start", ""),
221        "EndTime": entry.get("End", ""),
222        "Location": combine_names(entry.get("Location", "")),
223        "Attachments": entry.get("Attachments", ""),
224        "RRULE": entry.get("RRULE", ""),
225        "Body": entry.get("Body", ""),
226        "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
227        "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
228    }
229    await query_d1(**insert_d1("邮件", records, update_on_conflict="MD5"), db_name="dnkt", silent=True)
230
231
232async def sync_to_turso(entry: dict) -> None:
233    """Sync to Turso database."""
234    params = TURSO_KWARGS | {"db_name": "dnkt"}
235    # from database.turso import turso_create_table
236
237    # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, BCC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, Attachments TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
238    # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
239    # await turso_create_table(
240    #     "邮件",
241    #     columns,
242    #     idx_cols=indexes,
243    #     fts_on_col="MD5",
244    #     fts_index_col="search_column",
245    #     **params,
246    # )
247    records = {
248        "Subject": entry.get("Subject", ""),
249        "Sender": combine_names(entry.get("Sender", "")),
250        "ReceivedTime": entry.get("ReceivedTime", ""),
251        "Receiver": combine_names(entry.get("To", "")),
252        "CC": combine_names(entry.get("CC", "")),
253        "BCC": combine_names(entry.get("BCC", "")),
254        "StartTime": entry.get("Start", ""),
255        "EndTime": entry.get("End", ""),
256        "Location": combine_names(entry.get("Location", "")),
257        "Attachments": entry.get("Attachments", ""),
258        "RRULE": entry.get("RRULE", ""),
259        "Body": entry.get("Body", ""),
260        "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
261        "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
262    }
263    await turso_exec([insert_statement("邮件", records, update_on_conflict="MD5")], retry=2, silent=True, **params)
264
265
266def md5(s: str) -> int:
267    """Get md5 hash."""
268    h = hashlib.md5(s.encode("utf-8")).hexdigest()  # noqa: S324
269    return int(h[:12], 16)
270
271
272def combine_names(texts: str) -> str:
273    """Get raw names from texts.
274
275    Input: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
276    Output: "张三; 李四"
277    """
278    names = texts.split(";")
279    final_names = []
280    for item in names:
281        item = item.strip()
282        if item == "Microsoft Teams 会议" and len(names) > 1:
283            continue
284        english, chinese, _ = parse_email(item)
285        final_names.append(chinese or english)
286    return "; ".join(final_names)
287
288
289def format_addresses(contacts: str | None, email: str = "email@domain.com") -> Address:
290    """Format contacts to Address.
291
292    Example:
293    contacts: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
294    email: "email@domain.com"
295    return: Address(display_name="张三; 李四", addr_spec="email@domain.com")
296    """
297    if not contacts:
298        return Address(addr_spec=email)
299    result_parts = []
300    # 按分号分割(处理存在多个收件人的情况)
301    for contact in contacts.split(";"):
302        english, chinese, _ = parse_email(contact.strip())
303        if chinese:
304            result_parts.append(chinese or english)
305    # 用分号和空格将多个联系人重新拼接起来
306    full_name = "; ".join(result_parts).strip()
307    return Address(display_name=full_name, addr_spec=email)
308
309
310def parse_email(s: str) -> tuple[str, str, str]:
311    """解析邮箱字符串,返回英文名、中文名、邮箱.
312
313    Example:
314    'San Zhang (张三) <zhangsan@example.com>' -> ('San Zhang', '张三', 'zhangsan@example.com')
315    'San Zhang <zhangsan@example.com>' -> ('San Zhang', '', 'zhangsan@example.com')
316    'Li Si (李四)' -> ('Li Si', '李四', '')
317    'Li Si' -> ('Li Si', '', '')
318    '(王五) <wangwu@example.com>' -> ('', '王五', 'wangwu@example.com')
319    """
320    s = s.strip()
321    # 提取邮箱: 匹配 <...>
322    email_match = re.search(r"<(.*?)>", s)
323    email = email_match.group(1).strip() if email_match else ""
324
325    # 提取中文名: 匹配 (...)
326    chinese_match = re.search(r"\((.*?)\)", s)
327    chinese_name = chinese_match.group(1).strip() if chinese_match else ""
328
329    # 提取英文名:
330    # 先移除已匹配的邮箱和中文名部分,剩下的即为英文名
331    english_name = s
332    if email_match:
333        english_name = english_name.replace(email_match.group(0), "")
334    if chinese_match:
335        english_name = english_name.replace(chinese_match.group(0), "")
336
337    # 清理英文名两端的空格和特殊符号
338    english_name = english_name.strip()
339    return (english_name, chinese_name, email)