main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import ast
  4import contextlib
  5import json
  6import re
  7from collections.abc import Mapping
  8from copy import deepcopy
  9from datetime import datetime
 10
 11from anthropic import AsyncAnthropic, DefaultAioHttpClient
 12from anthropic.types.beta.file_metadata import FileMetadata
 13from glom import glom
 14from google import genai
 15from google.genai.types import HttpOptions
 16from loguru import logger
 17from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
 18
 19from config import AI, PREFIX, PROXY, cache
 20from database.kv import get_cf_kv
 21from utils import nowdt, remove_consecutive_newlines, remove_dash, remove_pound, strings_list, zhcn
 22
 23# ruff: noqa: RUF001
 24EMOJI_TEXT_BOT = "🤖"
 25EMOJI_IMG_BOT = "🌠"
 26EMOJI_VIDEO_BOT = "📽"
 27EMOJI_REASONING_BEGIN = "🤔"  # use emoji to separate model reasoning and content
 28EMOJI_REASONING_END = "💡"
 29BOT_TIPS = "(回复以继续)"
 30
 31
 32async def text_generation_docs() -> str:
 33    kv = await get_cf_kv(AI.TEXT_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
 34    return kv.get("docs", f"{EMOJI_TEXT_BOT}**AI对话**: `{PREFIX.AI_TEXT_GENERATION}` + 提示词")
 35
 36
 37async def img_generation_docs() -> str:
 38    kv = await get_cf_kv(AI.IMG_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
 39    return kv.get("docs", f"{EMOJI_IMG_BOT}**AI生图**: `{PREFIX.AI_IMG_GENERATION}` + 提示词")
 40
 41
 42async def video_generation_docs() -> str:
 43    kv = await get_cf_kv(AI.VIDEO_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
 44    return kv.get("docs", f"{EMOJI_VIDEO_BOT}**AI视频**: `{PREFIX.AI_VIDEO_GENERATION}` + 提示词")
 45
 46
 47def literal_eval(string: str | dict) -> dict:
 48    if isinstance(string, dict):
 49        return string
 50    with contextlib.suppress(Exception):
 51        string = re.sub(r"\btrue\b", "True", string)
 52        string = re.sub(r"\bfalse\b", "False", string)
 53        string = re.sub(r"\bnull\b", "None", string)
 54        return ast.literal_eval(string)
 55    return {}
 56
 57
 58def trim_none(obj: dict) -> dict:
 59    if isinstance(obj, dict):
 60        return {k: trim_none(v) for k, v in obj.items() if v is not None}
 61    if isinstance(obj, list):
 62        return [trim_none(item) for item in obj if item is not None]  # ty:ignore[invalid-return-type]
 63    return obj
 64
 65
 66def prettify(data: dict) -> str:
 67    with contextlib.suppress(Exception):
 68        data = trim_none(data)
 69        return json.dumps(data, ensure_ascii=False, indent=2)
 70    return str(data)
 71
 72
 73def clean_cmd_prefix(text: str) -> str:
 74    for prefix in [PREFIX.AI_TEXT_GENERATION, PREFIX.AI_IMG_GENERATION, PREFIX.AI_VIDEO_GENERATION]:
 75        text = text.removeprefix(prefix).lstrip()
 76    return re.sub(r"^@([a-zA-Z0-9_\-\.]+)(\s+)?", "", text, flags=re.DOTALL).strip()
 77
 78
 79def clean_bot_tips(text: str) -> str:
 80    return re.sub(rf"^{EMOJI_TEXT_BOT}(.*?){BOT_TIPS}", "", text, flags=re.DOTALL).strip()
 81
 82
 83def clean_reasoning(text: str) -> str:
 84    text = re.sub(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", "", text.strip(), flags=re.DOTALL).strip()
 85    return text.replace(BLOCKQUOTE_EXPANDABLE_DELIM, "").strip()
 86
 87
 88def clean_context(text: str) -> str:
 89    """Remove bot prefix and reasoning content."""
 90    if not text:
 91        return ""
 92    text = re.sub(r"^👤@.*?\/\/", "", text)  # remove markdown send_from_user
 93    text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", text)  # remove html send_from_user
 94    text = clean_cmd_prefix(text)
 95    text = clean_bot_tips(text)
 96    return clean_reasoning(text)
 97
 98
 99def clean_source_marks(text: str) -> str:
100    """Remove [username], [message], ... marks.
101
102    Should align with the tags in `contexts.py`
103    """
104    if not text:
105        return text
106    clean_text = ""
107    for line in text.split("\n"):
108        if line.strip().startswith(("[username]:", "[filename]:", "[fileowner]:")):
109            continue
110        if line.strip() in ["[message]:", "[file content]:"]:
111            continue
112        clean_text += line + "\n"
113    return clean_text.removesuffix("\n")  # remove the last newline
114
115
116def split_reasoning(text: str) -> tuple[str, str]:
117    """Split reasoning from text.
118
119    Args:
120        text: LLM response
121    Returns:
122        (reasoning, content)
123    """
124    text = clean_cmd_prefix(text)
125    text = clean_bot_tips(text)
126    content = clean_reasoning(text)
127    reasoning = ""
128    if matched := re.search(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", text, flags=re.DOTALL):
129        reasoning = EMOJI_REASONING_BEGIN + matched.group(1) + EMOJI_REASONING_END
130    return reasoning.strip(), content.strip()
131
132
133def beautify_llm_response(text: str, newline_level: int = 3) -> str:
134    """Beautify LLM response.
135
136    Args:
137        text: LLM response
138    Returns:
139        beautified LLM response
140    """
141    if not text:
142        return text
143    clean_text = clean_source_marks(text)
144    clean_text = remove_pound(clean_text)
145    clean_text = remove_dash(clean_text)
146    clean_text = zhcn(clean_text)
147    return remove_consecutive_newlines(clean_text, newline_level)
148
149
150def replace_placeholder(data: dict | list | str, pairs: dict[str, str]) -> dict | list | str:
151    """Replace placeholder in data.
152
153    Args:
154        data: nested dict with placeholder.
155        pairs: dict of placeholder and value. e.g. {"%PROMPT%": "prompt"}
156
157    Returns:
158        dict with replaced placeholder
159    """
160    if isinstance(data, dict):
161        return {key: replace_placeholder(value, pairs) for key, value in data.items()}
162    if isinstance(data, list):
163        return [replace_placeholder(item, pairs) for item in data]
164    if isinstance(data, str):
165        replaced_str = data
166        for placeholder, value in pairs.items():
167            replaced_str = replaced_str.replace(placeholder, value)
168        return replaced_str
169    return data
170
171
172async def clean_gemini_files():
173    """Clean Gemini files.
174
175    Gemini allows only 20 GB of data.
176    """
177    if AI.GEMINI_FILES_TTL >= 48 * 3600:
178        return
179    now = nowdt()
180    for api_key in strings_list(AI.GEMINI_API_KEYS):
181        app = genai.Client(api_key=api_key, http_options=HttpOptions(async_client_args={"proxy": PROXY.GOOGLE}))
182        for f in await app.aio.files.list():
183            if isinstance(f.update_time, datetime) and isinstance(f.name, str):
184                delta = now - f.update_time
185                if delta.total_seconds() > AI.GEMINI_FILES_TTL:
186                    logger.debug(f"Delete Gemini file: {f.name}")
187                    await app.aio.files.delete(name=f.name)
188
189
190@cache.memoize(ttl=300)
191async def load_skills(skill_name: str) -> str:
192    skills = await get_cf_kv(skill_name)
193    skill_str = ""
194    if "SKILL.md" in skills:
195        skill_str = skills.pop("SKILL.md")
196    for fname, content in sorted(skills.items()):
197        skill_str += f"\n\nReference: {fname}\n{content}"
198    return skill_str
199
200
201async def clean_anthropic_files():
202    """Clean Anthropic files.
203
204    Total storage: 100 GB per organization.
205    """
206    for api_key in strings_list(AI.ANTHROPIC_API_KEYS):
207        anthropic = AsyncAnthropic(
208            api_key=api_key,
209            base_url=AI.ANTHROPIC_BASE_URL,
210            http_client=DefaultAioHttpClient(proxy=PROXY.ANTHROPIC),
211        )
212        files = await anthropic.beta.files.list()
213        for f in glom(files, "data", default=[]):
214            if not isinstance(f, FileMetadata):
215                continue
216            delta = nowdt("UTC") - f.created_at
217            if delta.total_seconds() > AI.ANTHROPIC_FILES_TTL:
218                logger.debug(f"Delete Anthropic file: {f.filename}")
219                await anthropic.beta.files.delete(file_id=f.id)
220
221
222def deep_merge(base_dict: dict, *update_dicts: dict) -> dict:
223    """Deep merge multiple dicts into a new dict.
224
225    Args:
226        base_dict: The base dictionary to merge into
227        *update_dicts: Dictionaries to merge into the base
228
229    Returns:
230        A new dictionary with all values merged
231    """
232    result = deepcopy(base_dict)
233    for update_dict in update_dicts:
234        for k, v in update_dict.items():
235            if isinstance(v, Mapping) and isinstance(result.get(k), Mapping):
236                result[k] = deep_merge(result[k], v)
237            else:
238                result[k] = v
239    return result