main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3import json
4import uuid
5from contextlib import suppress
6from datetime import UTC, datetime, timedelta
7from zoneinfo import ZoneInfo
8
9from pyrogram.types import Chat, Message
10from pyrogram.types.messages_and_media.message import Str
11
12from ai.main import ai_text_generation
13from config import PREFIX, TZ
14from utils import nowdt, nowstr, rand_number
15
16JSON_SCHEMA = {
17 "title": "Event Detection",
18 "type": "object",
19 "properties": {
20 "is_event": {"description": "Is this event?", "title": "Is Event", "type": "boolean"},
21 "title": {"description": "Event title", "title": "Title", "type": "string"},
22 "description": {"description": "(Optional) Event description", "title": "Description", "type": ["string", "null"]},
23 "location": {"description": "(Optional) Event location", "title": "Location", "type": ["string", "null"]},
24 "start_time": {"description": "Event start time (YYYY-MM-DD HH:MM)", "title": "Start Time", "type": "string"},
25 "end_time": {"description": "(Optional) Event end time (YYYY-MM-DD HH:MM)", "title": "End Time", "type": ["string", "null"]},
26 },
27 "required": ["title", "description", "start_time", "end_time"],
28 "additionalProperties": False,
29}
30
31
32async def event_detect(texts: str) -> dict:
33 """Get event info from texts."""
34 res = await ai_text_generation(
35 "fake-client", # type: ignore
36 Message(id=rand_number(), chat=Chat(id=rand_number()), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @email {texts}")),
37 gemini_generate_content_config={
38 "system_instruction": system_prompt(),
39 "responseMimeType": "application/json",
40 "responseJsonSchema": JSON_SCHEMA,
41 },
42 gemini_append_grounding=False,
43 silent=True,
44 )
45 with suppress(Exception):
46 if not res.get("texts", ""):
47 return {}
48 event = json.loads(res.get("texts", "{}"))
49 if not event.get("is_event"):
50 return {}
51 ics = gen_ics(event)
52 return event | {"ics": ics}
53 return {}
54
55
56def system_prompt() -> str:
57 prompt = f"""# 身份设定
58你是企业行政事务处理的专业助手,核心职责是快速识别邮件中的会议邀请信息并精准提取关键要素,帮助用户高效管理日程。
59
60# 工作准则
611. 收到邮件内容后,首先完整解析文本,判断是否包含会议邀请的核心特征(如“会议”“邀请”“时间”“地点”等关键词);
622. 若判定为会议邀请,需提取标题、说明、开始时间、结束时间、位置等关键信息;
63
64# 约束条件
651. 若邮件不包含会议邀请特征,直接将is_event设置为False,其余字段设为空字符串;
662. 开始时间与结束时间应使用YYYY-MM-DD HH:MM格式,如“2024-10-01 14:00”,不得自行转换格式;
673. 无需考虑时区问题,因为用户与邮件邀请始终位于{TZ}时区;
684. 仅提取邮件正文中的会议信息,不分析签名栏内容。
69
70# 补充信息
71当前时间为:{nowstr(TZ)}
72""" # noqa: RUF001
73 return prompt.strip()
74
75
76def gen_ics(event: dict) -> str:
77 """Generate ICS file from event info."""
78 start = datetime.strptime(event["start_time"], "%Y-%m-%d %H:%M").astimezone(ZoneInfo(TZ))
79 end = datetime.strptime(event["end_time"], "%Y-%m-%d %H:%M").astimezone(ZoneInfo(TZ)) if event.get("end_time") else start + timedelta(hours=1)
80 start_utc = start.astimezone(UTC)
81 end_utc = end.astimezone(UTC)
82
83 # 构建事件字段列表
84 vevent_fields = [
85 f"UID:{uuid.uuid4()}",
86 f"DTSTAMP:{nowdt('UTC'):%Y%m%dT%H%M%SZ}",
87 f"DTSTART:{start_utc:%Y%m%dT%H%M%SZ}",
88 f"DTEND:{end_utc:%Y%m%dT%H%M%SZ}",
89 f"SUMMARY:{_escape_ics_text(event['title'])}",
90 ]
91
92 # 可选字段处理
93 if description := event.get("description"):
94 vevent_fields.append(f"DESCRIPTION:{_escape_ics_text(description)}")
95 if location := event.get("location"):
96 vevent_fields.append(f"LOCATION:{_escape_ics_text(location)}")
97
98 # 折叠所有超长行
99 folded_fields = [_fold_ics_line(field) for field in vevent_fields]
100
101 # 构建完整ICS内容(使用CRLF换行)
102 ics_lines = [
103 "BEGIN:VCALENDAR",
104 "VERSION:2.0",
105 "PRODID:-//Benny//Benny Bot 1.0//ZH-CN",
106 "CALSCALE:GREGORIAN",
107 "BEGIN:VEVENT",
108 *folded_fields,
109 "END:VEVENT",
110 "END:VCALENDAR",
111 ]
112 return "\r\n".join(ics_lines)
113
114
115def _escape_ics_text(text: str) -> str:
116 """转义ICS文本中的特殊字符."""
117 if not text:
118 return ""
119 text = text.replace("\\", "\\\\")
120 text = text.replace(",", "\\,")
121 text = text.replace(";", "\\;")
122 return text.replace("\n", "\\n")
123
124
125def _fold_ics_line(line: str) -> str:
126 """按ICS标准折叠超长行(每行不超过75字符)."""
127 if len(line) <= 75:
128 return line
129 lines = []
130 current = line[:75]
131 remaining = line[75:]
132 lines.append(current)
133 while remaining:
134 # 后续行以空格开头
135 current = " " + remaining[:74]
136 remaining = remaining[74:]
137 lines.append(current)
138 return "\r\n".join(lines)