main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import contextlib
5import json
6from pathlib import Path
7
8from glom import flatten, glom
9from google import genai
10from google.genai import types
11from google.genai.types import FileState, Part
12from loguru import logger
13from pyrogram.client import Client
14from pyrogram.types import Chat, Message
15from pyrogram.types.messages_and_media.message import Str
16
17from ai.main import ai_text_generation
18from ai.utils import literal_eval
19from config import AI, PREFIX, PROXY, TZ, cache
20from custom.config import ACCOUNT_NAME
21from emby.account import all_accounts
22from messages.utils import delete_message
23from utils import nowdt, rand_number, strings_list
24
25MODEL_ID = "gemini-flash-latest" # bypass captcha
26SKIP_CHECKIN_BOTS = {
27 "emby_hohai",
28 "EmbyPublicBot", # 终点站, 已弃用
29 "sntp_lite_emby_bot",
30 "embyhellobot",
31 "yeziypsy_bot",
32 "liminallitebot",
33}
34
35
36async def daily_checkin_emby(client: Client) -> None:
37 if ACCOUNT_NAME not in ["xiaohao", "benny"]:
38 return
39 accounts = await all_accounts()
40 all_bots = glom(accounts, "*.bot")
41
42 benny_accounts = {"sntp_lite_emby_bot", "lilyembybot", "shrekpublic_bot"}
43 xiaohao_bots = {x for x in all_bots if x not in benny_accounts}
44
45 checkin_bots = benny_accounts if ACCOUNT_NAME == "benny" else xiaohao_bots
46 for bot_name in checkin_bots - SKIP_CHECKIN_BOTS:
47 if not bot_name.endswith("bot"):
48 continue
49 logger.info(f"Emby daily checkin start: {bot_name}")
50 with contextlib.suppress(Exception):
51 await client.send_message(f"@{bot_name}", "/start")
52 logger.success(f"Emby daily checkin done: {bot_name}")
53 await asyncio.sleep(1)
54
55
56async def checkin_emby(client: Client, message: Message) -> None:
57 # 只处理bot消息
58 if not glom(message, "from_user.is_bot", default=False) and glom(message, "chat.type.name", default="") != "BOT":
59 return
60 if str(message.text).startswith("🎉 签到成功"):
61 await delete_message(message)
62
63 # 回答签到问题 (无按钮)
64 await checkin_number_question(client, message)
65 await checkin_poetry(client, message)
66
67 # 只处理带按钮消息
68 reply_markup = glom(message, "reply_markup.inline_keyboard.**.callback_data", default=[])
69 reply_markup = [x for x in reply_markup if isinstance(x, str)]
70 if not reply_markup:
71 return
72
73 checkin_button = next((x for x in reply_markup if "checkin" in x or "check1n" in x), None)
74 bot_name = glom(message, "from_user.username", default="")
75 if bot_name == "EmbyPublicBot" and message.photo:
76 await checkin_terminus(client, message, reply_markup)
77 elif checkin_button and nowdt(TZ).hour in [6, 7]:
78 await client.request_callback_answer(message.chat.id, message.id, callback_data=checkin_button)
79
80
81async def checkin_terminus(client: Client, message: Message, reply_markup: list[str]) -> None:
82 """@EmbyPublicBot 终点站.
83
84 reply_markup:
85 ["checkin-选项A", "checkin-选项B", "checkin-选项C"]
86 """
87 options = [x.removeprefix("checkin-").strip() for x in reply_markup]
88 if not options:
89 return
90 fpath: str = await client.download_media(message) # type: ignore
91 if not Path(fpath).exists():
92 return
93 for api_key in strings_list(AI.GEMINI_API_KEYS, shuffle=True):
94 try:
95 app = genai.Client(
96 api_key=api_key,
97 http_options=types.HttpOptions(
98 base_url=AI.GEMINI_BASE_URL,
99 headers=literal_eval(AI.GEMINI_DEFAULT_HEADERS),
100 async_client_args={"proxy": PROXY.GOOGLE},
101 ),
102 )
103 photo = await app.aio.files.upload(file=fpath)
104 while photo.state == FileState.PROCESSING:
105 logger.trace("Waiting for upload to complete...")
106 await asyncio.sleep(1)
107 photo = await app.aio.files.get(name=photo.name) # type: ignore
108 if photo.state == FileState.ACTIVE and photo.uri:
109 response = await app.aio.models.generate_content(
110 model=MODEL_ID,
111 contents=[Part.from_uri(file_uri=photo.uri, mime_type=photo.mime_type), Part.from_text(text="请识别图中的物体")],
112 config={
113 "response_mime_type": "application/json",
114 "response_schema": {
115 "type": "STRING",
116 "enum": options,
117 },
118 },
119 )
120 await app.aio.aclose()
121 answer = glom(response, "candidates.0.content.parts.0.text", default="").strip('" ')
122 if answer in options:
123 logger.success(f"终点站 answer: checkin-{answer}")
124 await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin-{answer}")
125 Path(fpath).unlink(missing_ok=True)
126 return
127 logger.warning(f"终点站 wrong answer: {response}")
128 except Exception as e:
129 logger.exception(e)
130
131
132async def checkin_number_question(client: Client, message: Message) -> None:
133 if not str(message.text).startswith("🎯 每日签到\n"):
134 return
135 if nowdt(TZ).hour not in [6, 7]:
136 return
137 if glom(message, "from_user.username", default="").lower() not in {"es666_bot", "fouremby_bot"}:
138 return
139 query = str(message.text).removeprefix("🎯 每日签到\n")
140 query = query.removesuffix("⏱ 有效期: 1 分钟").strip()
141 ai_msg = Message(id=rand_number(), chat=Chat(id=0), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @emby {query}"))
142 ai_res = await ai_text_generation(
143 "fake-client", # type: ignore
144 ai_msg,
145 silent=True,
146 gemini_generate_content_config={
147 "tools": [{"code_execution": {}}],
148 "responseMimeType": "application/json",
149 "responseJsonSchema": {
150 "title": "Math Question",
151 "type": "object",
152 "strict": True,
153 "properties": {"answer": {"type": "integer"}},
154 "required": ["answer"],
155 "additionalProperties": False,
156 },
157 },
158 openai_responses_config={
159 "tools": [{"type": "web_search"}],
160 "text": {
161 "format": {
162 "type": "json_schema",
163 "name": "MathQuestion",
164 "strict": True,
165 "description": "A simple math question.",
166 "schema": {
167 "title": "Math Question",
168 "type": "object",
169 "properties": {"answer": {"type": "integer"}},
170 "required": ["answer"],
171 "additionalProperties": False,
172 },
173 }
174 },
175 },
176 openai_completions_config={
177 "response_format": {
178 "type": "json_schema",
179 "strict": True,
180 "json_schema": {
181 "name": "MathQuestion", # name must match ^[a-zA-Z0-9_-]+$
182 "schema": {
183 "type": "object",
184 "properties": {"answer": {"type": "integer"}},
185 "required": ["answer"],
186 "additionalProperties": False,
187 },
188 "strict": True,
189 },
190 }
191 },
192 )
193 if not ai_res.get("texts"):
194 logger.error(f"【Emby】数学题签到: {ai_res}")
195 return
196 logger.success(f"【Emby】数学题签到: {ai_res['texts']}")
197 answer = json.loads(ai_res["texts"])["answer"]
198 await client.send_message(message.chat.id, text=str(answer))
199
200
201async def checkin_poetry(client: Client, message: Message) -> None:
202 # ruff: noqa: RUF001
203 if not str(message.content).startswith("🌸 防机器签到验证 🌸\n"):
204 return
205 # if nowdt(TZ).hour not in [6, 7]:
206 # return
207 bot_name = glom(message, "from_user.username", default="")
208 if bot_name.lower() != "niubi2233_bot":
209 return
210 query = str(message.content).removeprefix("🌸 防机器签到验证 🌸").strip()
211 query = query.replace("░", "?").strip().replace("请依次点击下方按钮补全诗句:", "你是专业的诗词研究专家,核心任务是依据提供的诗词文本,精准补全其中缺失的汉字。以下是需要补全的诗句:")
212 query += '\n请根据你掌握的诗词知识,准确填充诗句中“?”处缺失的汉字。\n注意:必须严格匹配原诗词的用字,不得自行创作或修改。\n最终结果请以数组格式返回,数组元素为按缺失顺序排列的汉字。例如,若诗句为“?前明月?,疑是地上霜”,则返回 ["床", "光"]'
213 reply_markup = glom(message, "reply_markup.inline_keyboard.**.callback_data", default=[])
214 reply_markup = [x for x in reply_markup if isinstance(x, str)]
215 logger.debug(f"【Emby】诗词题签到: {reply_markup}")
216 if not reply_markup:
217 return
218
219 if chars := cache.get(f"checkin_poetry_{bot_name}"): # 缓存中存在字符
220 if f"checkin_shici_{chars[0]}" not in reply_markup:
221 logger.error(f"【Emby】诗词题签到: {chars}")
222 return
223 if len(chars) == 1: # 只用填一个字, 直接返回
224 await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
225 cache.delete(f"checkin_poetry_{bot_name}") # 删除缓存
226 return
227 cache.set(f"checkin_poetry_{bot_name}", chars[1:], ttl=60) # 缓存后面的字
228 await asyncio.sleep(1) # 等待1秒, 确保缓存生效
229 await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
230
231 ai_msg = Message(id=rand_number(), chat=Chat(id=0), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @emby {query}"))
232 ai_res = await ai_text_generation(
233 "fake-client", # type: ignore
234 ai_msg,
235 silent=True,
236 gemini_generate_content_config={
237 "tools": [{"google_search": {}}],
238 "responseMimeType": "application/json",
239 "responseJsonSchema": {
240 "title": "Fill Poetry",
241 "type": "array",
242 "items": {"type": "string"},
243 "description": "The missing characters in the poetry.",
244 },
245 },
246 openai_responses_config={
247 "tools": [{"type": "web_search"}],
248 "text": {
249 "format": {
250 "type": "json_schema",
251 "name": "FillPoetry",
252 "strict": True,
253 "description": "Fill the missing characters in the poetry.",
254 "schema": {
255 "title": "Fill Poetry",
256 "type": "array",
257 "items": {"type": "string"},
258 "description": "The missing characters in the poetry.",
259 },
260 }
261 },
262 },
263 openai_completions_config={
264 "response_format": {
265 "type": "json_schema",
266 "strict": True,
267 "json_schema": {
268 "name": "FillPoetry", # name must match ^[a-zA-Z0-9_-]+$
269 "schema": {
270 "type": "array",
271 "items": {"type": "string"},
272 "description": "The missing characters in the poetry.",
273 },
274 "strict": True,
275 },
276 }
277 },
278 )
279 if not ai_res.get("texts"):
280 logger.error(f"【Emby】诗词题签到: {ai_res}")
281 return
282 logger.success(f"【Emby】诗词题签到: {ai_res['texts']}")
283 chars = json.loads(ai_res["texts"])
284 if not chars:
285 logger.error(f"【Emby】诗词题签到: {chars}")
286 return
287 chars = flatten(chars)
288 # 回答第一个字
289 if f"checkin_shici_{chars[0]}" not in reply_markup:
290 logger.error(f"【Emby】诗词题签到: {chars}")
291 return
292 if len(chars) == 1: # 只用填一个字, 直接返回
293 await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
294 return
295 cache.set(f"checkin_poetry_{bot_name}", chars[1:], ttl=60) # 缓存后面的字
296 await asyncio.sleep(1) # 等待1秒, 确保缓存生效
297 await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")