Commit 9c17938
Changed files (2)
src
ai
src/ai/texts/contexts.py
@@ -4,6 +4,7 @@ import asyncio
import base64
import contextlib
import hashlib
+import time
from pathlib import Path
from typing import TYPE_CHECKING
@@ -11,13 +12,14 @@ from glom import glom
from google import genai
from google.genai.types import FileState, Part, UploadFileConfig
from loguru import logger
+from openai import AsyncOpenAI, DefaultAsyncHttpxClient
from pyrogram.client import Client
from pyrogram.types import Message
from ai.utils import BOT_TIPS, clean_context
from asr.utils import GEMINI_AUDIO_EXT, downsampe_audio
from config import AI, DOWNLOAD_DIR
-from database.r2 import head_cf_r2
+from database.r2 import head_cf_r2, set_cf_r2
from messages.parser import get_thread_id, parse_msg
from utils import convert_md, read_text
@@ -94,7 +96,7 @@ async def single_openai_chat_context(client: Client, message: Message) -> dict:
return {"role": role, "content": contexts} if contexts else {}
-async def get_openai_response_contexts(client: Client, message: Message, api_key: str, model_id: str, cache_day: int) -> tuple[str, list[dict]]:
+async def get_openai_response_contexts(client: Client, message: Message, openai_params: dict) -> tuple[str, list[dict]]:
"""Generate OpenAI response contexts.
Returns:
@@ -107,6 +109,9 @@ async def get_openai_response_contexts(client: Client, message: Message, api_key
Returns:
previous_response_id: str
"""
+ api_key = openai_params["api_key"]
+ model_id = openai_params["model_id"]
+ cache_day = openai_params["cache_day"]
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
tid = get_thread_id(msg)
resp = await head_cf_r2(f"TTL/{cache_day}d/OpenAI/{model_id}/{key_hash}/{msg.chat.id}/{msg.id}{'/' + str(tid) if tid else ''}")
@@ -121,10 +126,10 @@ async def get_openai_response_contexts(client: Client, message: Message, api_key
break
messages.append(message)
messages.reverse() # old to new
- return previous_response_id, [ctx for msg in messages if (ctx := await single_openai_response_context(client, msg))]
+ return previous_response_id, [ctx for msg in messages if (ctx := await single_openai_response_context(client, msg, openai_params))]
-async def single_openai_response_context(client: Client, message: Message) -> dict:
+async def single_openai_response_context(client: Client, message: Message, openai_params: dict) -> dict:
"""Generate OpenAI response contexts for a single message.
Returns:
@@ -148,14 +153,30 @@ async def single_openai_response_context(client: Client, message: Message) -> di
info = parse_msg(msg, silent=True)
sender = info["fwd_full_name"] or info["full_name"]
media_path = DOWNLOAD_DIR + "/" + info["file_name"]
+ media_send_as = openai_params.get("openai_media_send_as", "file_id")
try:
if info["mtype"] == "photo":
- res = await base64_media(client, msg)
- contexts.append({"type": "input_image", "image_url": f"data:image/{res['ext']};base64,{res['base64']}", "detail": "high"})
- elif info["mtype"] == "document":
- if info["mime_type"] == "application/pdf" or Path(info["file_name"]).suffix == ".pdf":
+ if media_send_as == "file_id" and (file_id := await get_openai_file_id(client, msg, openai_params, info["mtype"])):
+ contexts.append({"type": "input_image", "file_id": file_id, "detail": "high"})
+ elif media_send_as == "base64":
+ res = await base64_media(client, msg)
+ contexts.append({"type": "input_image", "image_url": f"data:image/{res['ext']};base64,{res['base64']}", "detail": "high"})
+
+ elif info["mtype"] == "video":
+ if media_send_as == "file_id" and (file_id := await get_openai_file_id(client, msg, openai_params, info["mtype"])):
+ contexts.append({"type": "input_video", "file_id": file_id})
+ elif media_send_as == "base64":
res = await base64_media(client, msg)
- contexts.append({"type": "input_file", "file_data": f"data:application/pdf;base64,{res['base64']}", "filename": info["file_name"]})
+ contexts.append({"type": "input_video", "image_url": f"data:video/{res['ext']};base64,{res['base64']}"})
+
+ elif info["mtype"] == "document":
+ if info["mime_type"] == "application/pdf":
+ if media_send_as == "file_id" and (file_id := await get_openai_file_id(client, msg, openai_params, info["mtype"])):
+ contexts.append({"type": "input_file", "file_id": file_id})
+ elif media_send_as == "base64":
+ res = await base64_media(client, msg)
+ contexts.append({"type": "input_file", "file_data": f"data:application/pdf;base64,{res['base64']}", "filename": info["file_name"]})
+
elif info["mime_type"].startswith("text/") or Path(info["file_name"]).suffix in extra_txt_extensions:
fpath: str = await client.download_media(msg, media_path) # type: ignore
contexts.append(
@@ -186,6 +207,71 @@ async def single_openai_response_context(client: Client, message: Message) -> di
return {"role": role, "content": contexts} if contexts else {}
+async def get_openai_file_id(client: Client, message: Message, openai_params: dict, mtype: str) -> str:
+ def get_real_baseurl() -> str:
+ base_url = str(openai_params["base_url"]) or ""
+ default_headers = openai_params.get("default_headers", {})
+ default_headers = {k.lower(): v for k, v in default_headers.items()}
+ if base_url.startswith("https://gateway.helicone.ai"):
+ helicone_target_url = default_headers.get("helicone-target-url") or ""
+ return base_url.replace("https://gateway.helicone.ai", helicone_target_url.rstrip("/"))
+ return base_url
+
+ if mtype not in ["photo", "video", "document"]:
+ return ""
+ if not openai_params["allow_image"] and mtype == "photo":
+ return ""
+ if not openai_params["allow_video"] and mtype == "video":
+ return ""
+ if not openai_params["allow_file"] and mtype == "document":
+ return ""
+
+ cache_day = 30
+ api_key = openai_params["api_key"]
+ model_id = openai_params["model_id"]
+ key_hash = hashlib.sha256(api_key.encode()).hexdigest()
+ tid = get_thread_id(message)
+ r2_key = f"TTL/{cache_day}d/OpenAI/{model_id}/{key_hash}/{message.chat.id}/{message.id}{'/' + str(tid) if tid else ''}-file_id"
+ r2 = await head_cf_r2(r2_key)
+ if file_id := glom(r2, "Metadata.file_id", default=""):
+ return file_id
+
+ openai = AsyncOpenAI(
+ base_url=get_real_baseurl(),
+ api_key=api_key,
+ http_client=DefaultAsyncHttpxClient(proxy=openai_params["proxy"]) if openai_params.get("proxy") else None,
+ )
+ fpath: str = await client.download_media(message) # type: ignore
+ extra_body = {"expire_at": int(time.time()) + 3600 * 24 * cache_day}
+
+ preprocess_configs = {}
+ if message.video:
+ duration = glom(message, "video.duration", default=1e8)
+ ratio = int(duration // 300)
+ fps = ratio * 0.5
+ if fps < 0.5:
+ fps = 0.5
+ elif fps > 5.0:
+ fps = 5.0
+ preprocess_configs = {"video": {"fps": fps, "model": openai_params["model_id"]}}
+ if preprocess_configs:
+ extra_body["preprocess_configs"] = preprocess_configs
+ try:
+ resp = await openai.files.create(file=Path(fpath), purpose="user_data", extra_body=extra_body)
+ while resp.status == "processing":
+ logger.trace(f"Upload media to OpenAI processing: {resp.model_dump()}")
+ await asyncio.sleep(3)
+ resp = await openai.files.retrieve(file_id=resp.id)
+ if resp.status == "active":
+ Path(fpath).unlink(missing_ok=True)
+ await set_cf_r2(r2_key, data=resp.model_dump(), metadata={"file_id": resp.id})
+ return resp.id
+ logger.error(f"Upload media to OpenAI failed: {resp.model_dump()}")
+ except Exception as e:
+ logger.error(f"Upload media to OpenAI failed: {e}")
+ return ""
+
+
async def get_gemini_contexts(client: Client, message: Message, gemini: genai.Client) -> list[dict]:
"""Generate Gemini contexts from old to new.
src/ai/texts/openai_response.py
@@ -3,6 +3,7 @@
import contextlib
import hashlib
import time
+from typing import Literal
from glom import Coalesce, glom
from loguru import logger
@@ -35,6 +36,10 @@ async def openai_responses_api(
openai_responses_config: str | dict = "",
openai_proxy: str | None = PROXY.OPENAI,
cache_response_ttl: int = 0,
+ openai_allow_image: bool = False, # whether to allow image in input modalities
+ openai_allow_video: bool = False, # whether to allow video in input modalities
+ openai_allow_file: bool = False, # whether to allow file in input modalities
+ openai_media_send_as: Literal["base64", "file_id"] = "file_id",
silent: bool = False,
max_retries: int = 3,
**kwargs,
@@ -71,7 +76,20 @@ async def openai_responses_api(
openai_client |= {"base_url": openai_base_url, "api_key": api_key}
logger.trace(f"AsyncOpenAI(**{openai_client})")
openai = AsyncOpenAI(**openai_client)
- previous_response_id, contexts = await get_openai_response_contexts(client, message, api_key, model_id, cache_day)
+ previous_response_id, contexts = await get_openai_response_contexts(
+ client,
+ message,
+ openai_params=openai_client
+ | {
+ "proxy": openai_proxy,
+ "model_id": model_id,
+ "cache_day": cache_day,
+ "allow_image": openai_allow_image,
+ "allow_video": openai_allow_video,
+ "allow_file": openai_allow_file,
+ "openai_media_send_as": openai_media_send_as,
+ },
+ )
params = {}
params |= {"model": model_id, "stream": True, "input": contexts}
if literal_eval(openai_responses_config):