main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import ast
4import contextlib
5import json
6import re
7from collections.abc import Mapping
8from copy import deepcopy
9from datetime import datetime
10
11from anthropic import AsyncAnthropic, DefaultAioHttpClient
12from anthropic.types.beta.file_metadata import FileMetadata
13from glom import glom
14from google import genai
15from google.genai.types import HttpOptions
16from loguru import logger
17from pyrogram.parser.markdown import BLOCKQUOTE_DELIM, BLOCKQUOTE_EXPANDABLE_DELIM, SPOILER_DELIM
18
19from config import AI, PREFIX, PROXY, cache
20from database.kv import get_cf_kv
21from utils import nowdt, remove_consecutive_newlines, remove_dash, remove_pound, strings_list, ts_to_dt, zhcn
22
23# ruff: noqa: RUF001
24EMOJI_TEXT_BOT = "🤖"
25EMOJI_IMG_BOT = "🌠"
26EMOJI_VIDEO_BOT = "📽"
27EMOJI_REASONING_BEGIN = "🤔" # use emoji to separate model reasoning and content
28EMOJI_REASONING_END = "💡"
29BOT_TIPS = "(回复以继续)"
30
31
32async def text_generation_docs() -> str:
33 kv = await get_cf_kv(AI.TEXT_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
34 return kv.get("docs", f"{EMOJI_TEXT_BOT}**AI对话**: `{PREFIX.AI_TEXT_GENERATION}` + 提示词")
35
36
37async def img_generation_docs() -> str:
38 kv = await get_cf_kv(AI.IMG_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
39 return kv.get("docs", f"{EMOJI_IMG_BOT}**AI生图**: `{PREFIX.AI_IMG_GENERATION}` + 提示词")
40
41
42async def video_generation_docs() -> str:
43 kv = await get_cf_kv(AI.VIDEO_MODEL_CONFIG_KEY, cache_ttl=600, silent=True)
44 return kv.get("docs", f"{EMOJI_VIDEO_BOT}**AI视频**: `{PREFIX.AI_VIDEO_GENERATION}` + 提示词")
45
46
47def literal_eval(string: str | dict) -> dict:
48 if isinstance(string, dict):
49 return string
50 with contextlib.suppress(Exception):
51 string = re.sub(r"\btrue\b", "True", string)
52 string = re.sub(r"\bfalse\b", "False", string)
53 string = re.sub(r"\bnull\b", "None", string)
54 return ast.literal_eval(string)
55 return {}
56
57
58def trim_none(obj: dict) -> dict:
59 if isinstance(obj, dict):
60 return {k: trim_none(v) for k, v in obj.items() if v is not None}
61 if isinstance(obj, list):
62 return [trim_none(item) for item in obj if item is not None] # ty:ignore[invalid-return-type]
63 return obj
64
65
66def prettify(data: dict) -> str:
67 with contextlib.suppress(Exception):
68 data = trim_none(data)
69 if isinstance(data.get("created"), int):
70 data["created"] = ts_to_dt(data["created"]).strftime("%Y-%m-%d %H:%M:%S") # ty:ignore[unresolved-attribute]
71 if isinstance(data.get("created_at"), int):
72 data["created_at"] = ts_to_dt(data["created_at"]).strftime("%Y-%m-%d %H:%M:%S") # ty:ignore[unresolved-attribute]
73 return json.dumps(data, ensure_ascii=False, indent=2)
74 return str(data)
75
76
77def clean_cmd_prefix(text: str) -> str:
78 for prefix in [PREFIX.AI_TEXT_GENERATION, PREFIX.AI_IMG_GENERATION, PREFIX.AI_VIDEO_GENERATION]:
79 text = text.removeprefix(prefix).lstrip()
80 return re.sub(r"^@([a-zA-Z0-9_\-\.]+)(\s+)?", "", text, flags=re.DOTALL).strip()
81
82
83def clean_bot_tips(text: str) -> str:
84 return re.sub(rf"^{EMOJI_TEXT_BOT}(.*?){BOT_TIPS}", "", text, flags=re.DOTALL).strip()
85
86
87def clean_reasoning(s: str) -> str:
88 s = re.sub(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", "", s.strip(), flags=re.DOTALL).strip()
89 texts = ""
90 for line in s.splitlines():
91 texts += line.removeprefix(BLOCKQUOTE_EXPANDABLE_DELIM).removeprefix(BLOCKQUOTE_DELIM).removesuffix(SPOILER_DELIM) + "\n"
92 return texts.strip()
93
94
95def clean_context(text: str) -> str:
96 """Remove bot prefix and reasoning content."""
97 if not text:
98 return ""
99 text = re.sub(r"^👤@.*?\/\/", "", text) # remove markdown send_from_user
100 text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", text) # remove html send_from_user
101 text = clean_cmd_prefix(text)
102 text = clean_bot_tips(text)
103 return clean_reasoning(text)
104
105
106def clean_source_marks(text: str) -> str:
107 """Remove [username], [message], ... marks.
108
109 Should align with the tags in `contexts.py`
110 """
111 if not text:
112 return text
113 clean_text = ""
114 for line in text.split("\n"):
115 if line.strip().startswith(("[username]:", "[filename]:", "[fileowner]:")):
116 continue
117 if line.strip() in ["[message]:", "[file content]:"]:
118 continue
119 clean_text += line + "\n"
120 return clean_text.removesuffix("\n") # remove the last newline
121
122
123def split_reasoning(text: str) -> tuple[str, str]:
124 """Split reasoning from text.
125
126 Args:
127 text: LLM response
128 Returns:
129 (reasoning, content)
130 """
131 text = clean_cmd_prefix(text)
132 text = clean_bot_tips(text)
133 content = clean_reasoning(text)
134 reasoning = ""
135 if matched := re.search(rf"{EMOJI_REASONING_BEGIN}(.*?){EMOJI_REASONING_END}", text, flags=re.DOTALL):
136 reasoning = EMOJI_REASONING_BEGIN + matched.group(1) + EMOJI_REASONING_END
137 return reasoning.strip(), content.strip()
138
139
140def beautify_llm_response(text: str, newline_level: int = 3) -> str:
141 """Beautify LLM response.
142
143 Args:
144 text: LLM response
145 Returns:
146 beautified LLM response
147 """
148 if not text:
149 return text
150 clean_text = clean_source_marks(text)
151 clean_text = remove_pound(clean_text)
152 clean_text = remove_dash(clean_text)
153 clean_text = zhcn(clean_text)
154 return remove_consecutive_newlines(clean_text, newline_level)
155
156
157def replace_placeholder(data: dict | list | str, pairs: dict[str, str]) -> dict | list | str:
158 """Replace placeholder in data.
159
160 Args:
161 data: nested dict with placeholder.
162 pairs: dict of placeholder and value. e.g. {"%PROMPT%": "prompt"}
163
164 Returns:
165 dict with replaced placeholder
166 """
167 if isinstance(data, dict):
168 return {key: replace_placeholder(value, pairs) for key, value in data.items()}
169 if isinstance(data, list):
170 return [replace_placeholder(item, pairs) for item in data]
171 if isinstance(data, str):
172 replaced_str = data
173 for placeholder, value in pairs.items():
174 replaced_str = replaced_str.replace(placeholder, value)
175 return replaced_str
176 return data
177
178
179async def clean_gemini_files():
180 """Clean Gemini files.
181
182 Gemini allows only 20 GB of data.
183 """
184 if AI.GEMINI_FILES_TTL >= 48 * 3600:
185 return
186 now = nowdt()
187 for api_key in strings_list(AI.GEMINI_API_KEYS):
188 app = genai.Client(api_key=api_key, http_options=HttpOptions(async_client_args={"proxy": PROXY.GOOGLE}))
189 for f in await app.aio.files.list():
190 if isinstance(f.update_time, datetime) and isinstance(f.name, str):
191 delta = now - f.update_time
192 if delta.total_seconds() > AI.GEMINI_FILES_TTL:
193 logger.debug(f"Delete Gemini file: {f.name}")
194 await app.aio.files.delete(name=f.name)
195
196
197@cache.memoize(ttl=300)
198async def load_skills(skill_name: str) -> str:
199 skills = await get_cf_kv(skill_name)
200 skill_str = ""
201 if "SKILL.md" in skills:
202 skill_str = skills.pop("SKILL.md")
203 for fname, content in sorted(skills.items()):
204 skill_str += f"\n\nReference: {fname}\n{content}"
205 return skill_str
206
207
208async def clean_anthropic_files():
209 """Clean Anthropic files.
210
211 Total storage: 100 GB per organization.
212 """
213 for api_key in strings_list(AI.ANTHROPIC_API_KEYS):
214 anthropic = AsyncAnthropic(
215 api_key=api_key,
216 base_url=AI.ANTHROPIC_BASE_URL,
217 http_client=DefaultAioHttpClient(proxy=PROXY.ANTHROPIC),
218 )
219 files = await anthropic.beta.files.list()
220 for f in glom(files, "data", default=[]):
221 if not isinstance(f, FileMetadata):
222 continue
223 delta = nowdt("UTC") - f.created_at
224 if delta.total_seconds() > AI.ANTHROPIC_FILES_TTL:
225 logger.debug(f"Delete Anthropic file: {f.filename}")
226 await anthropic.beta.files.delete(file_id=f.id)
227
228
229def deep_merge(base_dict: dict, *update_dicts: dict) -> dict:
230 """Deep merge multiple dicts into a new dict.
231
232 Args:
233 base_dict: The base dictionary to merge into
234 *update_dicts: Dictionaries to merge into the base
235
236 Returns:
237 A new dictionary with all values merged
238 """
239 result = deepcopy(base_dict)
240 for update_dict in update_dicts:
241 for k, v in update_dict.items():
242 if isinstance(v, Mapping) and isinstance(result.get(k), Mapping):
243 result[k] = deep_merge(result[k], v)
244 else:
245 result[k] = v
246 return result