main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import ast
  4import contextlib
  5import json
  6import re
  7from collections.abc import Mapping
  8from copy import deepcopy
  9from datetime import datetime
 10
 11from anthropic import AsyncAnthropic, DefaultAioHttpClient
 12from anthropic.types.beta.file_metadata import FileMetadata
 13from glom import glom
 14from google import genai
 15from google.genai.types import HttpOptions
 16from loguru import logger
 17from pyrogram.parser.markdown import BLOCKQUOTE_DELIM, BLOCKQUOTE_EXPANDABLE_DELIM, SPOILER_DELIM
 18
 19from config import AI, PREFIX, PROXY, cache
 20from database.kv import get_cf_kv
 21from utils import nowdt, remove_consecutive_newlines, remove_dash, remove_pound, strings_list, ts_to_dt, zhcn
 22
 23# ruff: noqa: RUF001
 24EMOJI_TEXT_BOT = "🤖"
 25EMOJI_IMG_BOT = "🌠"
 26EMOJI_VIDEO_BOT = "📽"
 27EMOJI_REASONING_BEGIN = "🤔"  # use emoji to separate model reasoning and content
 28EMOJI_REASONING_END = "💡"
 29BOT_TIPS = "(回复以继续)"
 30
 31
 32async def text_generation_docs() -> str:
 33    kv = await get_cf_kv(AI.TEXT_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
 34    return kv.get("docs", f"{EMOJI_TEXT_BOT}**AI对话**: `{PREFIX.AI_TEXT_GENERATION}` + 提示词")
 35
 36
 37async def img_generation_docs() -> str:
 38    kv = await get_cf_kv(AI.IMG_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
 39    return kv.get("docs", f"{EMOJI_IMG_BOT}**AI生图**: `{PREFIX.AI_IMG_GENERATION}` + 提示词")
 40
 41
 42async def video_generation_docs() -> str:
 43    kv = await get_cf_kv(AI.VIDEO_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
 44    return kv.get("docs", f"{EMOJI_VIDEO_BOT}**AI视频**: `{PREFIX.AI_VIDEO_GENERATION}` + 提示词")
 45
 46
 47def literal_eval(string: str | dict) -> dict:
 48    if isinstance(string, dict):
 49        return string
 50    with contextlib.suppress(Exception):
 51        string = re.sub(r"\btrue\b", "True", string)
 52        string = re.sub(r"\bfalse\b", "False", string)
 53        string = re.sub(r"\bnull\b", "None", string)
 54        return ast.literal_eval(string)
 55    return {}
 56
 57
 58def trim_none(obj: dict) -> dict:
 59    if isinstance(obj, dict):
 60        return {k: trim_none(v) for k, v in obj.items() if v is not None}
 61    if isinstance(obj, list):
 62        return [trim_none(item) for item in obj if item is not None]  # ty:ignore[invalid-return-type]
 63    return obj
 64
 65
 66def prettify(data: dict) -> str:
 67    with contextlib.suppress(Exception):
 68        data = trim_none(data)
 69        if isinstance(data.get("created"), int):
 70            data["created"] = ts_to_dt(data["created"]).strftime("%Y-%m-%d %H:%M:%S")  # ty:ignore[unresolved-attribute]
 71        if isinstance(data.get("created_at"), int):
 72            data["created_at"] = ts_to_dt(data["created_at"]).strftime("%Y-%m-%d %H:%M:%S")  # ty:ignore[unresolved-attribute]
 73        return json.dumps(data, ensure_ascii=False, indent=2)
 74    return str(data)
 75
 76
 77def clean_cmd_prefix(text: str) -> str:
 78    for prefix in [PREFIX.AI_TEXT_GENERATION, PREFIX.AI_IMG_GENERATION, PREFIX.AI_VIDEO_GENERATION]:
 79        text = text.removeprefix(prefix).lstrip()
 80    return re.sub(r"^@([a-zA-Z0-9_\-\.]+)(\s+)?", "", text, flags=re.DOTALL).strip()
 81
 82
 83def clean_bot_tips(text: str) -> str:
 84    return re.sub(rf"^{EMOJI_TEXT_BOT}(.*?){BOT_TIPS}", "", text, flags=re.DOTALL).strip()
 85
 86
 87def clean_reasoning(s: str) -> str:
 88    s = re.sub(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", "", s.strip(), flags=re.DOTALL).strip()
 89    texts = ""
 90    for line in s.splitlines():
 91        texts += line.removeprefix(BLOCKQUOTE_EXPANDABLE_DELIM).removeprefix(BLOCKQUOTE_DELIM).removesuffix(SPOILER_DELIM) + "\n"
 92    return texts.strip()
 93
 94
 95def clean_context(text: str) -> str:
 96    """Remove bot prefix and reasoning content."""
 97    if not text:
 98        return ""
 99    text = re.sub(r"^👤@.*?\/\/", "", text)  # remove markdown send_from_user
100    text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", text)  # remove html send_from_user
101    text = clean_cmd_prefix(text)
102    text = clean_bot_tips(text)
103    return clean_reasoning(text)
104
105
106def clean_source_marks(text: str) -> str:
107    """Remove [username], [message], ... marks.
108
109    Should align with the tags in `contexts.py`
110    """
111    if not text:
112        return text
113    clean_text = ""
114    for line in text.split("\n"):
115        if line.strip().startswith(("[username]:", "[filename]:", "[fileowner]:")):
116            continue
117        if line.strip() in ["[message]:", "[file content]:"]:
118            continue
119        clean_text += line + "\n"
120    return clean_text.removesuffix("\n")  # remove the last newline
121
122
123def split_reasoning(text: str) -> tuple[str, str]:
124    """Split reasoning from text.
125
126    Args:
127        text: LLM response
128    Returns:
129        (reasoning, content)
130    """
131    text = clean_cmd_prefix(text)
132    text = clean_bot_tips(text)
133    content = clean_reasoning(text)
134    reasoning = ""
135    if matched := re.search(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", text, flags=re.DOTALL):
136        reasoning = EMOJI_REASONING_BEGIN + matched.group(1) + EMOJI_REASONING_END
137    return reasoning.strip(), content.strip()
138
139
140def beautify_llm_response(text: str, newline_level: int = 3) -> str:
141    """Beautify LLM response.
142
143    Args:
144        text: LLM response
145    Returns:
146        beautified LLM response
147    """
148    if not text:
149        return text
150    clean_text = clean_source_marks(text)
151    clean_text = remove_pound(clean_text)
152    clean_text = remove_dash(clean_text)
153    clean_text = zhcn(clean_text)
154    return remove_consecutive_newlines(clean_text, newline_level)
155
156
157def replace_placeholder(data: dict | list | str, pairs: dict[str, str]) -> dict | list | str:
158    """Replace placeholder in data.
159
160    Args:
161        data: nested dict with placeholder.
162        pairs: dict of placeholder and value. e.g. {"%PROMPT%": "prompt"}
163
164    Returns:
165        dict with replaced placeholder
166    """
167    if isinstance(data, dict):
168        return {key: replace_placeholder(value, pairs) for key, value in data.items()}
169    if isinstance(data, list):
170        return [replace_placeholder(item, pairs) for item in data]
171    if isinstance(data, str):
172        replaced_str = data
173        for placeholder, value in pairs.items():
174            replaced_str = replaced_str.replace(placeholder, value)
175        return replaced_str
176    return data
177
178
179async def clean_gemini_files():
180    """Clean Gemini files.
181
182    Gemini allows only 20 GB of data.
183    """
184    if AI.GEMINI_FILES_TTL >= 48 * 3600:
185        return
186    now = nowdt()
187    for api_key in strings_list(AI.GEMINI_API_KEYS):
188        app = genai.Client(api_key=api_key, http_options=HttpOptions(async_client_args={"proxy": PROXY.GOOGLE}))
189        for f in await app.aio.files.list():
190            if isinstance(f.update_time, datetime) and isinstance(f.name, str):
191                delta = now - f.update_time
192                if delta.total_seconds() > AI.GEMINI_FILES_TTL:
193                    logger.debug(f"Delete Gemini file: {f.name}")
194                    await app.aio.files.delete(name=f.name)
195
196
197@cache.memoize(ttl=300)
198async def load_skills(skill_name: str) -> str:
199    skills = await get_cf_kv(skill_name)
200    skill_str = ""
201    if "SKILL.md" in skills:
202        skill_str = skills.pop("SKILL.md")
203    for fname, content in sorted(skills.items()):
204        skill_str += f"\n\nReference: {fname}\n{content}"
205    return skill_str
206
207
208async def clean_anthropic_files():
209    """Clean Anthropic files.
210
211    Total storage: 100 GB per organization.
212    """
213    for api_key in strings_list(AI.ANTHROPIC_API_KEYS):
214        anthropic = AsyncAnthropic(
215            api_key=api_key,
216            base_url=AI.ANTHROPIC_BASE_URL,
217            http_client=DefaultAioHttpClient(proxy=PROXY.ANTHROPIC),
218        )
219        files = await anthropic.beta.files.list()
220        for f in glom(files, "data", default=[]):
221            if not isinstance(f, FileMetadata):
222                continue
223            delta = nowdt("UTC") - f.created_at
224            if delta.total_seconds() > AI.ANTHROPIC_FILES_TTL:
225                logger.debug(f"Delete Anthropic file: {f.filename}")
226                await anthropic.beta.files.delete(file_id=f.id)
227
228
229def deep_merge(base_dict: dict, *update_dicts: dict) -> dict:
230    """Deep merge multiple dicts into a new dict.
231
232    Args:
233        base_dict: The base dictionary to merge into
234        *update_dicts: Dictionaries to merge into the base
235
236    Returns:
237        A new dictionary with all values merged
238    """
239    result = deepcopy(base_dict)
240    for update_dict in update_dicts:
241        for k, v in update_dict.items():
242            if isinstance(v, Mapping) and isinstance(result.get(k), Mapping):
243                result[k] = deep_merge(result[k], v)
244            else:
245                result[k] = v
246    return result