Commit 9af933f
Changed files (2)
src
src/llm/response_stream.py
@@ -13,33 +13,48 @@ from pyrogram.types import Message, ReplyParameters
from config import GPT, TEXT_LENGTH
from llm.hooks import pre_hooks
-from llm.utils import BOT_TIPS, REASONING_BEGIN, REASONING_END, add_search_results_to_response, beautify_llm_response, raw_reasoning, split_reasoning
+from llm.utils import BOT_TIPS, REASONING_BEGIN, REASONING_END, add_search_results_to_response, beautify_llm_response, split_reasoning
from messages.progress import modify_progress
from messages.utils import blockquote, count_without_entities, smart_split
async def send_to_gpt_stream(
client: Client,
- status: Message,
+ status_msg: Message | None,
config: dict,
+ prefix: str | None = None,
*,
retry: int = 0,
+ silent: bool = False,
+ remove_thinking: bool = True,
+ single_thinking_msg: bool = True,
system_prompt: str | None = None,
**kwargs,
) -> dict:
"""Get GPT response in stream mode.
+ Args:
+ single_thinking_msg (bool, optional): Only use one message for displaying thinking.
+ remove_thinking (bool, optional): Remove thinking parts once finished.
+
Returns:
dict: {"texts": str, "thoughts": str, "prefix": str, "model_name": str, "sent_messages": list[Message]}
"""
- prefix = f"🤖**{config['friendly_name']}**:{BOT_TIPS}\n"
- final = {"prefix": prefix, "model_name": config["friendly_name"], "sent_messages": [status]}
+ if prefix is None:
+ prefix = f"🤖**{config['friendly_name']}**:{BOT_TIPS}\n"
+
+ answers = "" # all model responses
+ thoughts = "" # all model thoughts
+ runtime_texts = "" # for a single telegram message
+ if silent:
+ status_msg = None
+ status_cid = status_msg.chat.id if isinstance(status_msg, Message) else 0
+ status_mid = status_msg.id if isinstance(status_msg, Message) else 0
try:
pre_hooks(config["client"], config["completions"], message_info=kwargs.get("message_info"), system_prompt=system_prompt)
openai = AsyncOpenAI(**config["client"])
logger.trace(config)
- answers = prefix
- all_answers = ""
+ sent_messages = []
is_reasoning = False
is_reasoning_conversation = None # 用于指示是否是推理对话
gen = await openai.chat.completions.create(**config["completions"], stream=True)
@@ -48,84 +63,124 @@ async def send_to_gpt_stream(
logger.trace(resp)
error = await parse_error(resp, retry, **kwargs)
if error["retry"]:
- return await send_to_gpt_stream(client, status, config, retry=retry + 1, **kwargs)
+ return await send_to_gpt_stream(
+ client,
+ status_msg,
+ config,
+ prefix=prefix,
+ retry=retry + 1,
+ silent=silent,
+ remove_thinking=remove_thinking,
+ single_thinking_msg=single_thinking_msg,
+ system_prompt=system_prompt,
+ **kwargs,
+ )
if error["error"]:
- await modify_progress(message=status, text=error["error"], force_update=True, **kwargs)
+ await modify_progress(message=status_msg, text=error["error"], force_update=True, **kwargs)
return {}
answer = glom(resp, "choices.0.delta.content", default="") or ""
- reasoning_content = glom(resp, "choices.0.delta.reasoning_content", default="") or ""
- if is_reasoning_conversation is None and reasoning_content:
+ thinking = glom(resp, "choices.0.delta.reasoning_content", default="") or ""
+ if is_reasoning_conversation is None and thinking:
is_reasoning_conversation = True
- if reasoning_content and not is_reasoning: # 首次收到推理内容
+ if thinking and not is_reasoning: # 首次收到推理内容
is_reasoning = True
- answers += f"{BLOCKQUOTE_EXPANDABLE_DELIM}{REASONING_BEGIN}{reasoning_content.lstrip()}"
- elif reasoning_content and is_reasoning: # 收到推理内容且正在思考
- answers += reasoning_content
+ runtime_texts += f"{BLOCKQUOTE_EXPANDABLE_DELIM}{REASONING_BEGIN}{thinking.lstrip()}"
+ elif thinking and is_reasoning: # 收到推理内容且正在思考
+ runtime_texts += thinking
elif is_reasoning_conversation is True and is_reasoning: # 收到回答, 关闭推理标志
is_reasoning = False
- answers = f"{answers.rstrip()}{REASONING_END}\n{BLOCKQUOTE_EXPANDABLE_END_DELIM}\n" + answer.lstrip()
+ runtime_texts = answer.lstrip() if remove_thinking else f"{runtime_texts.rstrip()}{REASONING_END}\n{BLOCKQUOTE_EXPANDABLE_END_DELIM}\n" + answer.lstrip()
else:
- answers += answer
+ runtime_texts += answer
# Sometimes the reasoning content is included in the content field.
# handle "<think>...</think>\n\n"
- if answers.removeprefix(prefix).lstrip().startswith("<think>"):
+ if runtime_texts.removeprefix(prefix).lstrip().startswith("<think>"):
is_reasoning = True
- answers = answers.replace("<think>", f"{BLOCKQUOTE_EXPANDABLE_DELIM}{REASONING_BEGIN}")
- if "</think>" in answers:
+ runtime_texts = runtime_texts.replace("<think>", f"{BLOCKQUOTE_EXPANDABLE_DELIM}{REASONING_BEGIN}")
+ if "</think>" in runtime_texts:
is_reasoning = False
- answers = re.sub(r"</think>\s*", f"{REASONING_END}\n{BLOCKQUOTE_EXPANDABLE_END_DELIM}", answers, count=1)
+ runtime_texts = re.sub(r"</think>\s*", f"{REASONING_END}\n{BLOCKQUOTE_EXPANDABLE_END_DELIM}", runtime_texts, count=1)
- answers = beautify_llm_response(answers)
- if await count_without_entities(answers) <= TEXT_LENGTH - 10: # leave some flexibility
- if len(answers.removeprefix(prefix)) > 10: # start response if answer is not empty
- await modify_progress(message=status, text=answers, detail_progress=True)
+ thoughts += thinking
+ answers += answer
+ runtime_texts = beautify_llm_response(runtime_texts)
+ length = await count_without_entities(prefix + runtime_texts)
+ if length <= TEXT_LENGTH - 10: # leave some flexibility
+ if len(runtime_texts.removeprefix(prefix)) > 10: # start response if answer is not empty
+ await modify_progress(message=status_msg, text=prefix + runtime_texts, detail_progress=True)
else: # answers is too long, split it into multiple messages
- parts = await smart_split(answers)
+ parts = await smart_split(prefix + runtime_texts)
if len(parts) == 1:
continue
- if parts[0].startswith(prefix): # remove prefix from the first part
- reasoning_resp, response = split_reasoning(parts[0])
- content = reasoning_resp + "\n\n" + response
- await modify_progress(message=status, text=f"{prefix}{blockquote(content.strip())}", force_update=True)
+ if is_reasoning and single_thinking_msg:
+ runtime_texts = f"{BLOCKQUOTE_EXPANDABLE_DELIM}{REASONING_BEGIN}{parts[-1].lstrip()}" # remove previous thinking
+ await modify_progress(message=status_msg, text=parts[0], force_update=True) # force send the first part
else:
- await modify_progress(message=status, text=blockquote(parts[0]), force_update=True)
- all_answers += parts[0]
- answers = parts[-1] # keep the last part
- if is_reasoning:
- answers = f"{BLOCKQUOTE_EXPANDABLE_DELIM}{answers.lstrip()}"
- status = await client.send_message(status.chat.id, text=prefix + answers, reply_parameters=ReplyParameters(message_id=status.id))
- final["sent_messages"].append(status)
+ await modify_progress(message=status_msg, text=blockquote(parts[0]), force_update=True) # force send the first part
+ runtime_texts = parts[-1] # keep the last part
+ if is_reasoning:
+ runtime_texts = f"{BLOCKQUOTE_EXPANDABLE_DELIM}{REASONING_BEGIN}{runtime_texts.lstrip()}"
+ if not silent:
+ status_msg = await client.send_message(status_cid, text=prefix + runtime_texts, reply_parameters=ReplyParameters(message_id=status_mid)) # the new message
+ sent_messages.append(status_msg)
+ status_mid = status_msg.id
+
# all chunks are processed
- all_answers += answers
-
- all_reasoning, all_texts = split_reasoning(answers)
- final |= {"thoughts": raw_reasoning(all_reasoning), "texts": all_texts}
-
- all_answers = add_search_results_to_response(config.get("search_results", []), all_answers)
- length = await count_without_entities(answers)
-
- answers = (await smart_split(all_answers))[-1] # the last part (or the only one part)
- reasoning_resp, response = split_reasoning(answers)
- if answers.startswith(prefix):
- if length > GPT.COLLAPSE_LENGTH:
- content = reasoning_resp + "\n\n" + response
- await modify_progress(message=status, text=f"{prefix}{blockquote(content.strip())}", force_update=True)
- elif reasoning_resp:
- await modify_progress(message=status, text=f"{prefix}{blockquote(reasoning_resp)}\n{response}", force_update=True)
+ if not answers.strip() and not thoughts.strip(): # empty response
+ return await send_to_gpt_stream(
+ client,
+ status_msg,
+ config,
+ prefix=prefix,
+ retry=retry + 1,
+ silent=silent,
+ remove_thinking=remove_thinking,
+ single_thinking_msg=single_thinking_msg,
+ system_prompt=system_prompt,
+ **kwargs,
+ )
+
+ if not thoughts: # no structured thinking in response
+ thoughts, answers = split_reasoning(answers)
+
+ answers = add_search_results_to_response(config.get("search_results", []), answers)
+ final_thoughts = "" if remove_thinking else thoughts
+ if await count_without_entities(prefix + final_thoughts + answers) <= TEXT_LENGTH - 10: # short answer in single msg
+ if length > GPT.COLLAPSE_LENGTH: # collapse the response if the answer is too long
+ quoted = REASONING_BEGIN + final_thoughts.strip() + REASONING_END + "\n\n" + answers.strip() if final_thoughts.strip() else answers.strip()
+ await modify_progress(message=status_msg, text=f"{prefix}{blockquote(quoted)}", force_update=True)
else:
- await modify_progress(message=status, text=f"{prefix}{response}", force_update=True)
+ quoted = blockquote(REASONING_BEGIN + final_thoughts.strip() + REASONING_END) + "\n" if final_thoughts.strip() else ""
+ await modify_progress(message=status_msg, text=f"{prefix}{quoted}{answers}", force_update=True)
+ # total length is too long, answers are splitted into multiple messages
elif length > GPT.COLLAPSE_LENGTH:
- await modify_progress(message=status, text=prefix + blockquote(response), force_update=True)
+ await modify_progress(message=status_msg, text=prefix + blockquote(runtime_texts), force_update=True)
else:
- await modify_progress(message=status, text=prefix + response, force_update=True)
+ await modify_progress(message=status_msg, text=prefix + runtime_texts, force_update=True)
+
except Exception as e:
error = f"🤖{config['friendly_name']}请求失败, 重试次数: {retry + 1}/{GPT.MAX_RETRY + 1}\n{e}"
+ if "resp" in locals():
+ error += f"\n{resp}"
logger.error(error)
- await modify_progress(text=error, force_update=True, **kwargs)
+ with contextlib.suppress(Exception):
+ await modify_progress(text=error, force_update=True, **kwargs)
+ [await modify_progress(msg, del_status=True) for msg in sent_messages]
if retry < GPT.MAX_RETRY:
- return await send_to_gpt_stream(client, status, config, retry=retry + 1, **kwargs)
- return final
+ return await send_to_gpt_stream(
+ client,
+ status_msg,
+ config,
+ prefix=prefix,
+ retry=retry + 1,
+ silent=silent,
+ remove_thinking=remove_thinking,
+ single_thinking_msg=single_thinking_msg,
+ system_prompt=system_prompt,
+ **kwargs,
+ )
+ return {"texts": answers, "thoughts": thoughts, "prefix": prefix, "model_name": config["friendly_name"], "sent_messages": sent_messages}
async def parse_error(resp: dict, retry: int, **kwargs) -> dict:
src/llm/utils.py
@@ -192,7 +192,7 @@ def clean_bot_tips(text: str) -> str:
def clean_reasoning(text: str) -> str:
if not text:
return ""
- text = re.sub(rf"^{REASONING_BEGIN}(.*?){REASONING_END}", "", text.strip(), flags=re.DOTALL).strip()
+ text = re.sub(rf"{REASONING_BEGIN}(.*?){REASONING_END}", "", text.strip(), flags=re.DOTALL).strip()
text = text.removeprefix(BLOCKQUOTE_EXPANDABLE_DELIM).lstrip()
return text.removeprefix(BLOCKQUOTE_EXPANDABLE_END_DELIM).lstrip()