main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import ast
4import contextlib
5import json
6import re
7from collections.abc import Mapping
8from copy import deepcopy
9from datetime import datetime
10
11from anthropic import AsyncAnthropic, DefaultAioHttpClient
12from anthropic.types.beta.file_metadata import FileMetadata
13from glom import glom
14from google import genai
15from google.genai.types import HttpOptions
16from loguru import logger
17from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
18
19from config import AI, PREFIX, PROXY, cache
20from database.kv import get_cf_kv
21from utils import nowdt, remove_consecutive_newlines, remove_dash, remove_pound, strings_list, zhcn
22
23# ruff: noqa: RUF001
24EMOJI_TEXT_BOT = "🤖"
25EMOJI_IMG_BOT = "🌠"
26EMOJI_VIDEO_BOT = "📽"
27EMOJI_REASONING_BEGIN = "🤔" # use emoji to separate model reasoning and content
28EMOJI_REASONING_END = "💡"
29BOT_TIPS = "(回复以继续)"
30
31
32async def text_generation_docs() -> str:
33 kv = await get_cf_kv(AI.TEXT_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
34 return kv.get("docs", f"{EMOJI_TEXT_BOT}**AI对话**: `{PREFIX.AI_TEXT_GENERATION}` + 提示词")
35
36
37async def img_generation_docs() -> str:
38 kv = await get_cf_kv(AI.IMG_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
39 return kv.get("docs", f"{EMOJI_IMG_BOT}**AI生图**: `{PREFIX.AI_IMG_GENERATION}` + 提示词")
40
41
42async def video_generation_docs() -> str:
43 kv = await get_cf_kv(AI.VIDEO_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
44 return kv.get("docs", f"{EMOJI_VIDEO_BOT}**AI视频**: `{PREFIX.AI_VIDEO_GENERATION}` + 提示词")
45
46
47def literal_eval(string: str | dict) -> dict:
48 if isinstance(string, dict):
49 return string
50 with contextlib.suppress(Exception):
51 string = re.sub(r"\btrue\b", "True", string)
52 string = re.sub(r"\bfalse\b", "False", string)
53 string = re.sub(r"\bnull\b", "None", string)
54 return ast.literal_eval(string)
55 return {}
56
57
58def trim_none(obj: dict) -> dict:
59 if isinstance(obj, dict):
60 return {k: trim_none(v) for k, v in obj.items() if v is not None}
61 if isinstance(obj, list):
62 return [trim_none(item) for item in obj if item is not None] # ty:ignore[invalid-return-type]
63 return obj
64
65
66def prettify(data: dict) -> str:
67 with contextlib.suppress(Exception):
68 data = trim_none(data)
69 return json.dumps(data, ensure_ascii=False, indent=2)
70 return str(data)
71
72
73def clean_cmd_prefix(text: str) -> str:
74 for prefix in [PREFIX.AI_TEXT_GENERATION, PREFIX.AI_IMG_GENERATION, PREFIX.AI_VIDEO_GENERATION]:
75 text = text.removeprefix(prefix).lstrip()
76 return re.sub(r"^@([a-zA-Z0-9_\-\.]+)(\s+)?", "", text, flags=re.DOTALL).strip()
77
78
79def clean_bot_tips(text: str) -> str:
80 return re.sub(rf"^{EMOJI_TEXT_BOT}(.*?){BOT_TIPS}", "", text, flags=re.DOTALL).strip()
81
82
83def clean_reasoning(text: str) -> str:
84 text = re.sub(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", "", text.strip(), flags=re.DOTALL).strip()
85 return text.replace(BLOCKQUOTE_EXPANDABLE_DELIM, "").strip()
86
87
88def clean_context(text: str) -> str:
89 """Remove bot prefix and reasoning content."""
90 if not text:
91 return ""
92 text = re.sub(r"^👤@.*?\/\/", "", text) # remove markdown send_from_user
93 text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", text) # remove html send_from_user
94 text = clean_cmd_prefix(text)
95 text = clean_bot_tips(text)
96 return clean_reasoning(text)
97
98
99def clean_source_marks(text: str) -> str:
100 """Remove [username], [message], ... marks.
101
102 Should align with the tags in `contexts.py`
103 """
104 if not text:
105 return text
106 clean_text = ""
107 for line in text.split("\n"):
108 if line.strip().startswith(("[username]:", "[filename]:", "[fileowner]:")):
109 continue
110 if line.strip() in ["[message]:", "[file content]:"]:
111 continue
112 clean_text += line + "\n"
113 return clean_text.removesuffix("\n") # remove the last newline
114
115
116def split_reasoning(text: str) -> tuple[str, str]:
117 """Split reasoning from text.
118
119 Args:
120 text: LLM response
121 Returns:
122 (reasoning, content)
123 """
124 text = clean_cmd_prefix(text)
125 text = clean_bot_tips(text)
126 content = clean_reasoning(text)
127 reasoning = ""
128 if matched := re.search(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", text, flags=re.DOTALL):
129 reasoning = EMOJI_REASONING_BEGIN + matched.group(1) + EMOJI_REASONING_END
130 return reasoning.strip(), content.strip()
131
132
133def beautify_llm_response(text: str, newline_level: int = 3) -> str:
134 """Beautify LLM response.
135
136 Args:
137 text: LLM response
138 Returns:
139 beautified LLM response
140 """
141 if not text:
142 return text
143 clean_text = clean_source_marks(text)
144 clean_text = remove_pound(clean_text)
145 clean_text = remove_dash(clean_text)
146 clean_text = zhcn(clean_text)
147 return remove_consecutive_newlines(clean_text, newline_level)
148
149
150def replace_placeholder(data: dict | list | str, pairs: dict[str, str]) -> dict | list | str:
151 """Replace placeholder in data.
152
153 Args:
154 data: nested dict with placeholder.
155 pairs: dict of placeholder and value. e.g. {"%PROMPT%": "prompt"}
156
157 Returns:
158 dict with replaced placeholder
159 """
160 if isinstance(data, dict):
161 return {key: replace_placeholder(value, pairs) for key, value in data.items()}
162 if isinstance(data, list):
163 return [replace_placeholder(item, pairs) for item in data]
164 if isinstance(data, str):
165 replaced_str = data
166 for placeholder, value in pairs.items():
167 replaced_str = replaced_str.replace(placeholder, value)
168 return replaced_str
169 return data
170
171
172async def clean_gemini_files():
173 """Clean Gemini files.
174
175 Gemini allows only 20 GB of data.
176 """
177 if AI.GEMINI_FILES_TTL >= 48 * 3600:
178 return
179 now = nowdt()
180 for api_key in strings_list(AI.GEMINI_API_KEYS):
181 app = genai.Client(api_key=api_key, http_options=HttpOptions(async_client_args={"proxy": PROXY.GOOGLE}))
182 for f in await app.aio.files.list():
183 if isinstance(f.update_time, datetime) and isinstance(f.name, str):
184 delta = now - f.update_time
185 if delta.total_seconds() > AI.GEMINI_FILES_TTL:
186 logger.debug(f"Delete Gemini file: {f.name}")
187 await app.aio.files.delete(name=f.name)
188
189
190@cache.memoize(ttl=300)
191async def load_skills(skill_name: str) -> str:
192 skills = await get_cf_kv(skill_name)
193 skill_str = ""
194 if "SKILL.md" in skills:
195 skill_str = skills.pop("SKILL.md")
196 for fname, content in sorted(skills.items()):
197 skill_str += f"\n\nReference: {fname}\n{content}"
198 return skill_str
199
200
201async def clean_anthropic_files():
202 """Clean Anthropic files.
203
204 Total storage: 100 GB per organization.
205 """
206 for api_key in strings_list(AI.ANTHROPIC_API_KEYS):
207 anthropic = AsyncAnthropic(
208 api_key=api_key,
209 base_url=AI.ANTHROPIC_BASE_URL,
210 http_client=DefaultAioHttpClient(proxy=PROXY.ANTHROPIC),
211 )
212 files = await anthropic.beta.files.list()
213 for f in glom(files, "data", default=[]):
214 if not isinstance(f, FileMetadata):
215 continue
216 delta = nowdt("UTC") - f.created_at
217 if delta.total_seconds() > AI.ANTHROPIC_FILES_TTL:
218 logger.debug(f"Delete Anthropic file: {f.filename}")
219 await anthropic.beta.files.delete(file_id=f.id)
220
221
222def deep_merge(base_dict: dict, *update_dicts: dict) -> dict:
223 """Deep merge multiple dicts into a new dict.
224
225 Args:
226 base_dict: The base dictionary to merge into
227 *update_dicts: Dictionaries to merge into the base
228
229 Returns:
230 A new dictionary with all values merged
231 """
232 result = deepcopy(base_dict)
233 for update_dict in update_dicts:
234 for k, v in update_dict.items():
235 if isinstance(v, Mapping) and isinstance(result.get(k), Mapping):
236 result[k] = deep_merge(result[k], v)
237 else:
238 result[k] = v
239 return result