main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3import json
  4import uuid
  5from contextlib import suppress
  6from datetime import UTC, datetime, timedelta
  7from zoneinfo import ZoneInfo
  8
  9from pyrogram.types import Chat, Message
 10from pyrogram.types.messages_and_media.message import Str
 11
 12from ai.main import ai_text_generation
 13from config import PREFIX, TZ
 14from utils import nowdt, nowstr, rand_number
 15
 16JSON_SCHEMA = {
 17    "title": "Event Detection",
 18    "type": "object",
 19    "properties": {
 20        "is_event": {"description": "Is this event?", "title": "Is Event", "type": "boolean"},
 21        "title": {"description": "Event title", "title": "Title", "type": "string"},
 22        "description": {"description": "(Optional) Event description", "title": "Description", "type": ["string", "null"]},
 23        "location": {"description": "(Optional) Event location", "title": "Location", "type": ["string", "null"]},
 24        "start_time": {"description": "Event start time (YYYY-MM-DD HH:MM)", "title": "Start Time", "type": "string"},
 25        "end_time": {"description": "(Optional) Event end time (YYYY-MM-DD HH:MM)", "title": "End Time", "type": ["string", "null"]},
 26    },
 27    "required": ["title", "description", "start_time", "end_time"],
 28    "additionalProperties": False,
 29}
 30
 31
 32async def event_detect(texts: str) -> dict:
 33    """Get event info from texts."""
 34    res = await ai_text_generation(
 35        "fake-client",  # type: ignore
 36        Message(id=rand_number(), chat=Chat(id=rand_number()), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @email {texts}")),
 37        gemini_generate_content_config={
 38            "system_instruction": system_prompt(),
 39            "responseMimeType": "application/json",
 40            "responseJsonSchema": JSON_SCHEMA,
 41        },
 42        gemini_append_grounding=False,
 43        silent=True,
 44    )
 45    with suppress(Exception):
 46        if not res.get("texts", ""):
 47            return {}
 48        event = json.loads(res.get("texts", "{}"))
 49        if not event.get("is_event"):
 50            return {}
 51        ics = gen_ics(event)
 52        return event | {"ics": ics}
 53    return {}
 54
 55
 56def system_prompt() -> str:
 57    prompt = f"""# 身份设定
 58你是企业行政事务处理的专业助手,核心职责是快速识别邮件中的会议邀请信息并精准提取关键要素,帮助用户高效管理日程。
 59
 60# 工作准则
 611. 收到邮件内容后,首先完整解析文本,判断是否包含会议邀请的核心特征(如“会议”“邀请”“时间”“地点”等关键词);
 622. 若判定为会议邀请,需提取标题、说明、开始时间、结束时间、位置等关键信息;
 63
 64# 约束条件
 651. 若邮件不包含会议邀请特征,直接将is_event设置为False,其余字段设为空字符串;
 662. 开始时间与结束时间应使用YYYY-MM-DD HH:MM格式,如“2024-10-01 14:00”,不得自行转换格式;
 673. 无需考虑时区问题,因为用户与邮件邀请始终位于{TZ}时区;
 684. 仅提取邮件正文中的会议信息,不分析签名栏内容。
 69
 70# 补充信息
 71当前时间为:{nowstr(TZ)}
 72"""  # noqa: RUF001
 73    return prompt.strip()
 74
 75
 76def gen_ics(event: dict) -> str:
 77    """Generate ICS file from event info."""
 78    start = datetime.strptime(event["start_time"], "%Y-%m-%d %H:%M").astimezone(ZoneInfo(TZ))
 79    end = datetime.strptime(event["end_time"], "%Y-%m-%d %H:%M").astimezone(ZoneInfo(TZ)) if event.get("end_time") else start + timedelta(hours=1)
 80    start_utc = start.astimezone(UTC)
 81    end_utc = end.astimezone(UTC)
 82
 83    # 构建事件字段列表
 84    vevent_fields = [
 85        f"UID:{uuid.uuid4()}",
 86        f"DTSTAMP:{nowdt('UTC'):%Y%m%dT%H%M%SZ}",
 87        f"DTSTART:{start_utc:%Y%m%dT%H%M%SZ}",
 88        f"DTEND:{end_utc:%Y%m%dT%H%M%SZ}",
 89        f"SUMMARY:{_escape_ics_text(event['title'])}",
 90    ]
 91
 92    # 可选字段处理
 93    if description := event.get("description"):
 94        vevent_fields.append(f"DESCRIPTION:{_escape_ics_text(description)}")
 95    if location := event.get("location"):
 96        vevent_fields.append(f"LOCATION:{_escape_ics_text(location)}")
 97
 98    # 折叠所有超长行
 99    folded_fields = [_fold_ics_line(field) for field in vevent_fields]
100
101    # 构建完整ICS内容(使用CRLF换行)
102    ics_lines = [
103        "BEGIN:VCALENDAR",
104        "VERSION:2.0",
105        "PRODID:-//Benny//Benny Bot 1.0//ZH-CN",
106        "CALSCALE:GREGORIAN",
107        "BEGIN:VEVENT",
108        *folded_fields,
109        "END:VEVENT",
110        "END:VCALENDAR",
111    ]
112    return "\r\n".join(ics_lines)
113
114
115def _escape_ics_text(text: str) -> str:
116    """转义ICS文本中的特殊字符."""
117    if not text:
118        return ""
119    text = text.replace("\\", "\\\\")
120    text = text.replace(",", "\\,")
121    text = text.replace(";", "\\;")
122    return text.replace("\n", "\\n")
123
124
125def _fold_ics_line(line: str) -> str:
126    """按ICS标准折叠超长行(每行不超过75字符)."""
127    if len(line) <= 75:
128        return line
129    lines = []
130    current = line[:75]
131    remaining = line[75:]
132    lines.append(current)
133    while remaining:
134        # 后续行以空格开头
135        current = " " + remaining[:74]
136        remaining = remaining[74:]
137        lines.append(current)
138    return "\r\n".join(lines)