main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import contextlib
  5import json
  6from pathlib import Path
  7
  8from glom import flatten, glom
  9from google import genai
 10from google.genai import types
 11from google.genai.types import FileState, Part
 12from loguru import logger
 13from pyrogram.client import Client
 14from pyrogram.types import Chat, Message
 15from pyrogram.types.messages_and_media.message import Str
 16
 17from ai.main import ai_text_generation
 18from ai.utils import literal_eval
 19from config import AI, PREFIX, PROXY, TZ, cache
 20from custom.config import ACCOUNT_NAME
 21from emby.account import all_accounts
 22from messages.utils import delete_message
 23from utils import nowdt, rand_number, strings_list
 24
 25MODEL_ID = "gemini-flash-latest"  # bypass captcha
 26SKIP_CHECKIN_BOTS = {
 27    "emby_hohai",
 28    "EmbyPublicBot",  # 终点站, 已弃用
 29    "sntp_lite_emby_bot",
 30    "embyhellobot",
 31    "yeziypsy_bot",
 32    "liminallitebot",
 33}
 34
 35
 36async def daily_checkin_emby(client: Client) -> None:
 37    if ACCOUNT_NAME not in ["xiaohao", "benny"]:
 38        return
 39    accounts = await all_accounts()
 40    all_bots = glom(accounts, "*.bot")
 41
 42    benny_accounts = {"sntp_lite_emby_bot", "lilyembybot", "shrekpublic_bot"}
 43    xiaohao_bots = {x for x in all_bots if x not in benny_accounts}
 44
 45    checkin_bots = benny_accounts if ACCOUNT_NAME == "benny" else xiaohao_bots
 46    for bot_name in checkin_bots - SKIP_CHECKIN_BOTS:
 47        if not bot_name.endswith("bot"):
 48            continue
 49        logger.info(f"Emby daily checkin start: {bot_name}")
 50        with contextlib.suppress(Exception):
 51            await client.send_message(f"@{bot_name}", "/start")
 52            logger.success(f"Emby daily checkin done: {bot_name}")
 53            await asyncio.sleep(1)
 54
 55
 56async def checkin_emby(client: Client, message: Message) -> None:
 57    # 只处理bot消息
 58    if not glom(message, "from_user.is_bot", default=False) and glom(message, "chat.type.name", default="") != "BOT":
 59        return
 60    if str(message.text).startswith("🎉 签到成功"):
 61        await delete_message(message)
 62
 63    # 回答签到问题 (无按钮)
 64    await checkin_number_question(client, message)
 65    await checkin_poetry(client, message)
 66
 67    # 只处理带按钮消息
 68    reply_markup = glom(message, "reply_markup.inline_keyboard.**.callback_data", default=[])
 69    reply_markup = [x for x in reply_markup if isinstance(x, str)]
 70    if not reply_markup:
 71        return
 72
 73    checkin_button = next((x for x in reply_markup if "checkin" in x or "check1n" in x), None)
 74    bot_name = glom(message, "from_user.username", default="")
 75    if bot_name == "EmbyPublicBot" and message.photo:
 76        await checkin_terminus(client, message, reply_markup)
 77    elif checkin_button and nowdt(TZ).hour in [6, 7]:
 78        await client.request_callback_answer(message.chat.id, message.id, callback_data=checkin_button)
 79
 80
 81async def checkin_terminus(client: Client, message: Message, reply_markup: list[str]) -> None:
 82    """@EmbyPublicBot 终点站.
 83
 84    reply_markup:
 85    ["checkin-选项A", "checkin-选项B", "checkin-选项C"]
 86    """
 87    options = [x.removeprefix("checkin-").strip() for x in reply_markup]
 88    if not options:
 89        return
 90    fpath: str = await client.download_media(message)  # type: ignore
 91    if not Path(fpath).exists():
 92        return
 93    for api_key in strings_list(AI.GEMINI_API_KEYS, shuffle=True):
 94        try:
 95            app = genai.Client(
 96                api_key=api_key,
 97                http_options=types.HttpOptions(
 98                    base_url=AI.GEMINI_BASE_URL,
 99                    headers=literal_eval(AI.GEMINI_DEFAULT_HEADERS),
100                    async_client_args={"proxy": PROXY.GOOGLE},
101                ),
102            )
103            photo = await app.aio.files.upload(file=fpath)
104            while photo.state == FileState.PROCESSING:
105                logger.trace("Waiting for upload to complete...")
106                await asyncio.sleep(1)
107                photo = await app.aio.files.get(name=photo.name)  # type: ignore
108            if photo.state == FileState.ACTIVE and photo.uri:
109                response = await app.aio.models.generate_content(
110                    model=MODEL_ID,
111                    contents=[Part.from_uri(file_uri=photo.uri, mime_type=photo.mime_type), Part.from_text(text="请识别图中的物体")],
112                    config={
113                        "response_mime_type": "application/json",
114                        "response_schema": {
115                            "type": "STRING",
116                            "enum": options,
117                        },
118                    },
119                )
120                await app.aio.aclose()
121                answer = glom(response, "candidates.0.content.parts.0.text", default="").strip('" ')
122                if answer in options:
123                    logger.success(f"终点站 answer: checkin-{answer}")
124                    await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin-{answer}")
125                    Path(fpath).unlink(missing_ok=True)
126                    return
127                logger.warning(f"终点站 wrong answer: {response}")
128        except Exception as e:
129            logger.exception(e)
130
131
132async def checkin_number_question(client: Client, message: Message) -> None:
133    if not str(message.text).startswith("🎯 每日签到\n"):
134        return
135    if nowdt(TZ).hour not in [6, 7]:
136        return
137    if glom(message, "from_user.username", default="").lower() not in {"es666_bot", "fouremby_bot"}:
138        return
139    query = str(message.text).removeprefix("🎯 每日签到\n")
140    query = query.removesuffix("⏱ 有效期: 1 分钟").strip()
141    ai_msg = Message(id=rand_number(), chat=Chat(id=0), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @emby {query}"))
142    ai_res = await ai_text_generation(
143        "fake-client",  # type: ignore
144        ai_msg,
145        silent=True,
146        gemini_generate_content_config={
147            "tools": [{"code_execution": {}}],
148            "responseMimeType": "application/json",
149            "responseJsonSchema": {
150                "title": "Math Question",
151                "type": "object",
152                "strict": True,
153                "properties": {"answer": {"type": "integer"}},
154                "required": ["answer"],
155                "additionalProperties": False,
156            },
157        },
158        openai_responses_config={
159            "tools": [{"type": "web_search"}],
160            "text": {
161                "format": {
162                    "type": "json_schema",
163                    "name": "MathQuestion",
164                    "strict": True,
165                    "description": "A simple math question.",
166                    "schema": {
167                        "title": "Math Question",
168                        "type": "object",
169                        "properties": {"answer": {"type": "integer"}},
170                        "required": ["answer"],
171                        "additionalProperties": False,
172                    },
173                }
174            },
175        },
176        openai_completions_config={
177            "response_format": {
178                "type": "json_schema",
179                "strict": True,
180                "json_schema": {
181                    "name": "MathQuestion",  # name must match ^[a-zA-Z0-9_-]+$
182                    "schema": {
183                        "type": "object",
184                        "properties": {"answer": {"type": "integer"}},
185                        "required": ["answer"],
186                        "additionalProperties": False,
187                    },
188                    "strict": True,
189                },
190            }
191        },
192    )
193    if not ai_res.get("texts"):
194        logger.error(f"【Emby】数学题签到: {ai_res}")
195        return
196    logger.success(f"【Emby】数学题签到: {ai_res['texts']}")
197    answer = json.loads(ai_res["texts"])["answer"]
198    await client.send_message(message.chat.id, text=str(answer))
199
200
201async def checkin_poetry(client: Client, message: Message) -> None:
202    # ruff: noqa: RUF001
203    if not str(message.content).startswith("🌸 防机器签到验证 🌸\n"):
204        return
205    # if nowdt(TZ).hour not in [6, 7]:
206    #     return
207    bot_name = glom(message, "from_user.username", default="")
208    if bot_name.lower() != "niubi2233_bot":
209        return
210    query = str(message.content).removeprefix("🌸 防机器签到验证 🌸").strip()
211    query = query.replace("", "?").strip().replace("请依次点击下方按钮补全诗句:", "你是专业的诗词研究专家,核心任务是依据提供的诗词文本,精准补全其中缺失的汉字。以下是需要补全的诗句:")
212    query += '\n请根据你掌握的诗词知识,准确填充诗句中“?”处缺失的汉字。\n注意:必须严格匹配原诗词的用字,不得自行创作或修改。\n最终结果请以数组格式返回,数组元素为按缺失顺序排列的汉字。例如,若诗句为“?前明月?,疑是地上霜”,则返回 ["", ""]'
213    reply_markup = glom(message, "reply_markup.inline_keyboard.**.callback_data", default=[])
214    reply_markup = [x for x in reply_markup if isinstance(x, str)]
215    logger.debug(f"【Emby】诗词题签到: {reply_markup}")
216    if not reply_markup:
217        return
218
219    if chars := cache.get(f"checkin_poetry_{bot_name}"):  # 缓存中存在字符
220        if f"checkin_shici_{chars[0]}" not in reply_markup:
221            logger.error(f"【Emby】诗词题签到: {chars}")
222            return
223        if len(chars) == 1:  # 只用填一个字, 直接返回
224            await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
225            cache.delete(f"checkin_poetry_{bot_name}")  # 删除缓存
226            return
227        cache.set(f"checkin_poetry_{bot_name}", chars[1:], ttl=60)  # 缓存后面的字
228        await asyncio.sleep(1)  # 等待1秒, 确保缓存生效
229        await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
230
231    ai_msg = Message(id=rand_number(), chat=Chat(id=0), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @emby {query}"))
232    ai_res = await ai_text_generation(
233        "fake-client",  # type: ignore
234        ai_msg,
235        silent=True,
236        gemini_generate_content_config={
237            "tools": [{"google_search": {}}],
238            "responseMimeType": "application/json",
239            "responseJsonSchema": {
240                "title": "Fill Poetry",
241                "type": "array",
242                "items": {"type": "string"},
243                "description": "The missing characters in the poetry.",
244            },
245        },
246        openai_responses_config={
247            "tools": [{"type": "web_search"}],
248            "text": {
249                "format": {
250                    "type": "json_schema",
251                    "name": "FillPoetry",
252                    "strict": True,
253                    "description": "Fill the missing characters in the poetry.",
254                    "schema": {
255                        "title": "Fill Poetry",
256                        "type": "array",
257                        "items": {"type": "string"},
258                        "description": "The missing characters in the poetry.",
259                    },
260                }
261            },
262        },
263        openai_completions_config={
264            "response_format": {
265                "type": "json_schema",
266                "strict": True,
267                "json_schema": {
268                    "name": "FillPoetry",  # name must match ^[a-zA-Z0-9_-]+$
269                    "schema": {
270                        "type": "array",
271                        "items": {"type": "string"},
272                        "description": "The missing characters in the poetry.",
273                    },
274                    "strict": True,
275                },
276            }
277        },
278    )
279    if not ai_res.get("texts"):
280        logger.error(f"【Emby】诗词题签到: {ai_res}")
281        return
282    logger.success(f"【Emby】诗词题签到: {ai_res['texts']}")
283    chars = json.loads(ai_res["texts"])
284    if not chars:
285        logger.error(f"【Emby】诗词题签到: {chars}")
286        return
287    chars = flatten(chars)
288    # 回答第一个字
289    if f"checkin_shici_{chars[0]}" not in reply_markup:
290        logger.error(f"【Emby】诗词题签到: {chars}")
291        return
292    if len(chars) == 1:  # 只用填一个字, 直接返回
293        await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
294        return
295    cache.set(f"checkin_poetry_{bot_name}", chars[1:], ttl=60)  # 缓存后面的字
296    await asyncio.sleep(1)  # 等待1秒, 确保缓存生效
297    await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")