Commit 7d7af9f
Changed files (13)
src
others
podcast
preview
subtitles
src/ai/summary.py
@@ -1,10 +1,10 @@
#!/venv/bin/python
# -*- coding: utf-8 -*-
import base64
-import hashlib
import json
import re
import zlib
+from datetime import datetime
from pathlib import Path
from loguru import logger
@@ -24,74 +24,9 @@ from messages.help import social_media_help
from messages.sender import send2tg
from messages.utils import equal_prefix, set_reaction, startswith_prefix
from networking import download_file, shorten_url
-from utils import count_subtitles, rand_number
-
-MERMAID_TEMPLATE = """
-graph LR
- A[核心主题] --> B[子标题1]
- A --> C[子标题2]
- A --> D[子标题3]
- A --> E[子标题4]
-
-
- B --> B1[二级标题1-1]
- B --> B2[二级标题1-2]
- B1 --> B11[核心观点1-1-1]
- B1 --> B12[核心观点1-1-2]
- B2 --> B21[争议点1-2-1]
- B2 --> B22[争议点1-2-2]
-
-
- C --> C1[关键数据2-1]
- C --> C2[主要结论2-1]
- C --> C3[补充结论2-2]
-
- D --> D1[核心问题3-1]
- D --> D2[潜在风险3-2]
- D --> D3[影响因素3-3]
-
- E --> E1[发展趋势4-1]
- E --> E2[行动建议4-2]
- E --> E3[未来结论4-3]
-""".strip()
-
-JSON_SCHEMA = {
- "title": "Content Extraction",
- "description": "精准提炼资料的核心主题、关键观点、主要结论及各片段核心内容,确保输出内容全面覆盖资料的关键信息,用户仅通过总结即可掌握信息全貌。",
- "type": "object",
- "properties": {
- "overview": {
- "title": "全文总结",
- "description": "需涵盖资料核心主题、关键观点和主要结论,采用连贯语言表述,若内容复杂可分段,但需逻辑清晰。禁止过于简略(如仅用一句话概括长文档),确保信息密度足够支撑用户理解。",
- "type": "string",
- },
- "sections": {
- "description": "需将文档划分为逻辑连贯的片段(如按章节、主题、时间线划分);每个片段需拟定**简洁准确**的标题(体现片段核心)、匹配1个相关emoji;并说明该片段的核心内容。",
- "title": "分片内容",
- "type": "array",
- "items": {
- "type": "object",
- "properties": {
- "title": {"type": "string", "description": "该片段的标题"},
- "emoji": {"type": "string", "description": "匹配该片段的emoji,例如💡、💰、⚠️等"},
- "content": {"type": "string", "description": "详细说明该片段的核心事件、具体观点或结论,禁止仅用1-2句话泛泛概括,需传递足够细节。"},
- "start": {
- "type": ["string", "null"],
- "description": "如果资料为含时间戳的文字稿(如播客/视频/音频的转录稿),需补充start字段HH:MM:SS或MM:SS;无时间戳则无需输出start字段。",
- },
- },
- },
- },
- "mermaid": {
- "title": "思维导图",
- "type": "string",
- "pattern": "^graph LR",
- "description": f"以Mermaid graph格式表示的全文思维导图,以'graph LR'开头。需清晰呈现文档的逻辑结构(如核心主题→子主题→关键观点/结论),节点层级明确,便于用户快速梳理文档框架。一个示例Mermaid代码如下:\n{MERMAID_TEMPLATE}",
- },
- },
- "required": ["overview", "sections", "mermaid"],
- "additionalProperties": False,
-}
+from publish import telegraph_aipage
+from schema import AIPage, ContentExtraction, get_schema
+from utils import count_subtitles, digest, rand_number, to_dt
async def ai_summary(client: Client, message: Message, summary_model_id: str = AI.AI_SUMMARY_MODEL_ALIAS, **kwargs):
@@ -120,31 +55,62 @@ async def ai_summary(client: Client, message: Message, summary_model_id: str = A
res = await openai_responses_api(client, message, **params)
if not res.get("texts"):
continue
- texts, _, mermaid_path = await parse_summary(res["texts"])
- media = [{"photo": mermaid_path}] if Path(mermaid_path).is_file() else []
- await send2tg(client, message, texts=texts, media=media, **kwargs)
+ texts, _, _ = await parse_summary(res["texts"])
+ await send2tg(client, message, texts="**🤖AI导读**\n" + texts, **kwargs)
await set_reaction(client, this_msg, "")
return
-async def summarize(article: str, reference: str | None = None, model: str = "gemini") -> dict:
- if count_subtitles(article) < 200: # skip short article
+async def summarize(
+ article: str | None = None,
+ transcripts: str | None = None,
+ reference: str | None = None,
+ model: str = "gemini",
+ title: str | None = None,
+ author: str | None = None,
+ url: str | None = None,
+ date: str | datetime | None = None,
+ description: str | None = None,
+ ttl: str | None = None,
+) -> dict:
+ title = title or "AI导读"
+ if article is None and transcripts is None:
+ raise ValueError("必须传入 article 或 transcripts 其中一个参数")
+ if article is not None and transcripts is not None:
+ raise ValueError("不能同时传入 article 和 transcripts 参数")
+ source = article or transcripts or ""
+ if count_subtitles(source) < 200: # skip short article
return {}
res = await ai_text_generation(
"fake-client", # type: ignore
message=Message(
id=rand_number(),
chat=Chat(id=rand_number()),
- text=Str(f"{PREFIX.AI_TEXT_GENERATION} @{model} {article.strip()}"),
+ text=Str(f"{PREFIX.AI_TEXT_GENERATION} @{model} {source.strip()}"),
),
**summary_params(reference),
)
if not res.get("texts", ""):
return {}
- texts, mermaid_url, mermaid_path = await parse_summary(res["texts"])
- res["texts"] = texts
- res["mermaid_url"] = mermaid_url
- res["mermaid_path"] = mermaid_path
+ texts, mermaid_img_url, mermaid_pako_url = await parse_summary(res["texts"])
+ summary = ContentExtraction.model_validate_json(res["texts"])
+ page = AIPage(
+ title=title,
+ author=author,
+ url=url,
+ date=to_dt(date),
+ description=description,
+ summary=summary,
+ transcripts=transcripts,
+ mermaid_img=mermaid_img_url,
+ mermaid_url=mermaid_pako_url,
+ )
+ if telegraph_url := await telegraph_aipage(page, ttl=ttl):
+ res["telegraph_url"] = telegraph_url
+ res["texts"] = f"**🤖[AI导读]({telegraph_url})**\n" + texts
+ else:
+ res["telegraph_url"] = None
+ res["texts"] = "**🤖AI导读**\n" + texts
return res
@@ -152,31 +118,28 @@ async def parse_summary(texts: str) -> tuple[str, str, str]:
"""Parse the summary JSON string.
Returns:
- (summary_texts, mermaid_url, mermaid_path)
+ (summary_texts, mermaid_img_url, mermaid_pako_url)
"""
try:
- summary = json.loads(texts)
- mermaid = beautify_mermaid(summary["mermaid"])
- img_url, pako_url, mermaid_path = await publish_mermaid(mermaid)
- parsed = f"{summary['overview'].strip()}"
- if img_url:
- logger.success(f"Mermaid: {pako_url}")
- parsed += f"\n🧠**[思维导图]({pako_url})**\n"
- parsed += "\n⚡️**章节速览**"
- for section in summary["sections"]:
- parsed += f"\n{section['emoji']}**{section['title']}**"
- if section.get("start"):
- parsed += f" [{section['start']}]"
- parsed += f"\n{section['content']}"
+ summary = ContentExtraction.model_validate_json(texts)
+ mermaid = beautify_mermaid(summary.mermaid)
+ img_url, pako_url = await publish_mermaid(mermaid)
+ parsed = f"{summary.overview}\n⚡️**章节速览**"
+ for section in summary.sections:
+ parsed += f"\n{section.emoji}**{section.title}**"
+ if section.start:
+ start = section.start.removeprefix("00:") if len(section.start) > 5 else section.start
+ parsed += f" [{start}]"
+ parsed += f"\n{section.content}"
logger.success(parsed)
except Exception as e:
logger.error(f"Error parsing summary: {e}")
return texts, "", ""
- return parsed, img_url, mermaid_path
+ return parsed, img_url, pako_url
def system_prompt(reference: str | None = None) -> str:
- prompt = f"你是一位专业的内容提炼大师,任务是基于用户提供的资料,生成用户无需阅读完整原文档就能清晰理解主要事件、观点、结论的内容,生成符合指定JSON格式的全文总结、分片内容和思维导图。思维导图Mermaid语法说明文档:{mermaid_syntax()}"
+ prompt = "你是一位专业的内容提炼大师,任务是基于用户提供的资料,生成用户无需阅读完整原文档就能清晰理解主要事件、观点、结论的内容,生成符合指定JSON格式的全文总结、分片内容和思维导图。"
if reference:
prompt += f"\n{reference}"
return prompt.strip()
@@ -202,38 +165,35 @@ def beautify_mermaid(mermaid: str) -> str:
return f"---\nconfig:\n theme: neo\n look: neo\n---\n{mermaid.strip()}"
-async def publish_mermaid(mermaid: str) -> tuple[str, str, str]:
+async def publish_mermaid(mermaid: str) -> tuple[str, str]:
"""Save Mermaid image to R2.
Returns:
- (image_url, pako_url, local_path)
+ (image_url, pako_url)
"""
b64_str = base64.urlsafe_b64encode(mermaid.encode("utf-8")).decode("ascii")
- save_path = Path(DOWNLOAD_DIR) / f"{hashlib.md5(mermaid.encode()).hexdigest()}.jpg" # noqa: S324
+ save_path = Path(DOWNLOAD_DIR) / f"{digest(mermaid)}.jpg" # noqa: S324
r2_key = f"TTL/365d/{save_path.name}"
img_url = f"{DB.CF_R2_PUBLIC_URL}/{r2_key}"
- img_url = await shorten_url(img_url)
- await download_file(f"https://mermaid.ink/img/{b64_str}?type=jpeg&theme=forest&width=2160", path=save_path, suffix=".jpg")
+ if await download_file(f"https://mermaid.ink/img/{b64_str}?type=jpeg&theme=forest&width=2160", path=save_path, suffix=".jpg"):
+ img_url = await shorten_url(img_url, alias=digest(mermaid, 16))
mermaid = mermaid.replace("\ngraph LR", f"\n%% {img_url}\ngraph LR")
# generate pako url for mermaid image
json_str = json.dumps({"code": mermaid.strip()}, separators=(",", ":"))
compressed_bytes = zlib.compress(json_str.encode("utf-8"), level=9)
pako_b64_str = base64.urlsafe_b64encode(compressed_bytes).decode("utf-8").rstrip("=")
- pako_url = await shorten_url(f"https://mermaid.live/view#pako:{pako_b64_str}")
+ pako_url = await shorten_url(f"https://mermaid.live/view#pako:{pako_b64_str}", alias=digest(pako_b64_str, 16))
if save_path.is_file():
await set_cf_r2(r2_key, data=save_path.read_bytes(), mime_type="image/jpeg", silent=True)
- return img_url, pako_url, save_path.as_posix()
- return "", "", ""
+ save_path.unlink(missing_ok=True)
+ return img_url, pako_url
+ return "", ""
def summary_params(reference: str | None = None) -> dict:
return {
- "gemini_generate_content_config": {
- "system_instruction": system_prompt(reference),
- "responseMimeType": "application/json",
- "responseJsonSchema": JSON_SCHEMA,
- },
+ "gemini_generate_content_config": {"system_instruction": system_prompt(reference), "responseMimeType": "application/json", "responseJsonSchema": get_schema("content_extraction")},
"openai_responses_config": {
"instructions": system_prompt(reference),
"text": {
@@ -242,7 +202,7 @@ def summary_params(reference: str | None = None) -> dict:
"name": "ContentExtraction",
"strict": True,
"description": "精准提炼资料的核心主题、关键观点、主要结论及各片段核心内容,确保输出内容全面覆盖资料的关键信息,用户仅通过总结即可掌握信息全貌。",
- "schema": JSON_SCHEMA,
+ "schema": get_schema("content_extraction"),
}
},
},
@@ -251,102 +211,3 @@ def summary_params(reference: str | None = None) -> dict:
"openai_append_tool_results": False,
"silent": True,
}
-
-
-def mermaid_syntax() -> str:
- return """
-# Mermaid Graph - Basic Syntax
-
-Graph is composed of **nodes** (geometric shapes) and **edges** (arrows or lines). The Mermaid code defines how nodes and edges are made and accommodates different arrow types, multi-directional arrows, and any linking to and from subgraphs.
-
-## A node (default)
-
-```mermaid
-graph LR
- id
-```
-
-```note
-The id is what is displayed in the box.
-```
-
-### A node with text
-
-It is also possible to set text in the box that differs from the id. If this is done several times, it is the last text
-found for the node that will be used. Also if you define edges for the node later on, you can omit text definitions. The
-one previously defined will be used when rendering the box.
-
-```mermaid
-graph LR
- id1[This is the text in the box]
-```
-
-## Node shapes
-
-### A node with round edges
-
-```mermaid
-graph LR
- id1(This is the text in the box)
-```
-
-## Links between nodes
-
-Nodes can be connected with links/edges. It is possible to have different types of links or attach a text string to a link.
-
-### A link with arrow head
-
-```mermaid
-graph LR
- A-->B
-```
-
-### An open link
-
-```mermaid
-graph LR
- A --- B
-```
-
-### Text on links
-
-```mermaid
-graph LR
- A---|This is the text|B
-```
-
-### A link with arrow head and text
-
-```mermaid
-graph LR
- A-->|text|B
-```
-
-### Dotted link
-
-```mermaid
-graph LR
- A-.->B;
-```
-
-### Dotted link with text
-
-```mermaid
-graph LR
- A-. text .-> B
-```
-
-### Thick link
-
-```mermaid
-graph LR
- A ==> B
-```
-
-### Thick link with text
-
-```mermaid
-graph LR
- A == text ==> B
-```
-"""
src/asr/utils.py
@@ -17,6 +17,7 @@ from soundfile import LibsndfileError
from config import AI, ASR
from multimedia import convert_to_audio
+from schema import Sentence
from utils import strings_list
GEMINI_AUDIO_EXT = [".aac", ".aiff", ".flac", ".mp3", ".oga", ".ogg", ".opus", ".wav"]
@@ -153,3 +154,30 @@ async def audio_chunk_to_path(chunk: ndarray, samplerate: int, path: str | Path,
out_path = Path(path).expanduser().resolve()
out_path.parent.mkdir(exist_ok=True, parents=True)
await asyncio.to_thread(sf.write, out_path.as_posix(), chunk, samplerate, format=fmt, subtype=subtype)
+
+
+def split_transcripts(text: str | None) -> list[Sentence]:
+ """将字幕文本按时间戳分割,返回Sentence列表."""
+ if not text:
+ return []
+ text = text.strip()
+ # 定义正则表达式
+ # 1. ^ 匹配每一行的行首
+ # 2. \[ 和 \] 匹配两侧的方括号
+ # 3. ((?:\d{2}:)?\d{2}:\d{2}) 是捕获组,提取 MM:SS 或 HH:MM:SS
+ pattern = r"^\[((?:\d{2}:)?\d{2}:\d{2})\]"
+
+ # 使用 re.MULTILINE 标志,让 ^ 能够匹配文本中每一行的开头,而不仅仅是整个字符串的开头
+ parts = re.split(pattern, text, flags=re.MULTILINE)
+
+ # 此时 parts 的结构为:
+ # ['', '00:00', ' 我那天续费的时候\n', '00:05', ' 我一看... \n', ...]
+ results: list[Sentence] = []
+
+ # 第 0 个元素是第一个时间戳之前的文本(通常为空字符串),我们从第 1 个元素开始,步长为 2 遍历
+ for i in range(1, len(parts), 2):
+ start = parts[i]
+ content = parts[i + 1].strip() # strip() 会清理掉字幕文本首尾的多余空格和换行
+ results.append(Sentence(start=start, content=content))
+
+ return results
src/asr/voice_recognition.py
@@ -6,7 +6,6 @@ from pathlib import Path
from glom import glom
from loguru import logger
from pyrogram.client import Client
-from pyrogram.enums import ParseMode
from pyrogram.types import Message
from asr.ali import ali_asr
@@ -101,7 +100,6 @@ async def voice_to_text(
asr_engine: str = ASR.DEFAULT_ENGINE,
*,
asr_need_prefix: bool = True,
- to_telegraph: bool = True,
**kwargs,
) -> None:
"""Voice, audio, video message to text.
@@ -165,20 +163,12 @@ async def voice_to_text(
elif length < TEXT_LENGTH: # middle
await client.send_message(to_int(target_chat), final, reply_parameters=reply_parameters)
else: # long
- caption = asr_msg_info["html"]
- if to_telegraph:
- html = "\n".join([f"<p>{s}</p>" for s in texts.split("\n")])
- if telegraph_url := await publish_telegraph(title=asr_msg_info["text"] or "语音识别结果", html=html, author=asr_msg_info["full_name"], url=asr_msg_info["message_url"], **kwargs):
- caption += f"\n<a href={telegraph_url}>⚡️即时预览</a>"
+ caption = ""
+ html = "\n".join([f"<p>{s}</p>" for s in texts.split("\n")])
+ if telegraph_url := await publish_telegraph(title=asr_msg_info["text"], html=html, author=asr_msg_info["full_name"], url=asr_msg_info["message_url"]):
+ caption = f"[⚡️即时预览]({telegraph_url})"
with io.BytesIO(texts.encode("utf-8")) as f:
- await client.send_document(
- to_int(target_chat),
- f,
- parse_mode=ParseMode.HTML,
- file_name="语音识别结果.txt",
- caption=caption.strip(),
- reply_parameters=reply_parameters,
- )
+ await client.send_document(to_int(target_chat), f, file_name="语音识别结果.txt", caption=caption, reply_parameters=reply_parameters)
await modify_progress(del_status=True, **kwargs)
[await delete_message(msg) for msg in res.get("sent_messages", [])]
src/others/download_external.py
@@ -14,7 +14,7 @@ from messages.utils import equal_prefix, get_reply_to, startswith_prefix
from multimedia import is_valid_video_or_audio, validate_img
from networking import download_file
from publish import publish_telegraph
-from utils import convert_md, find_url, guess_mime, readable_size, to_int
+from utils import convert_html, convert_md, find_url, guess_mime, readable_size, to_int
HELP = f"""
⏬**下载文件**
@@ -80,10 +80,8 @@ async def download_url_in_message(client: Client, message: Message, extra_prefix
elif path.stat().st_size < MAX_FILE_BYTES:
await modify_progress(text=f"💾文件下载成功: {readable_size(path=path)}", force_update=True, **kwargs)
if suffix == ".html":
- markdown = convert_md(path)
- markdown_path = path.with_suffix(".md")
- markdown_path.write_text(markdown)
- if telegraph_url := await publish_telegraph(title="全文内容", texts=markdown, author=info["full_name"], url=url, **kwargs):
+ html = convert_html(convert_md(path))
+ if telegraph_url := await publish_telegraph(title="全文内容", html=html, author=info["full_name"], url=url):
caption += f"\n⚡️[即时预览]({telegraph_url})"
success = await client.send_document(target_chat, path.as_posix(), caption=caption, reply_parameters=reply_parameters)
else:
src/podcast/main.py
@@ -41,8 +41,7 @@ from podcast.utils import HEADERS, clean_feed_url, feed_saved_target, get_pubdat
from podcast.xml import get_feed_title, parse_feed, save_xml, update_xml_desc
from preview.bilibili import get_bilibili_vinfo
from preview.youtube import get_youtube_vinfo
-from publish import publish_telegraph
-from utils import bare_url, convert_html, convert_md, count_subtitles, https_url, nowdt, rand_number, remove_consecutive_newlines, seconds_to_hms, strings_list
+from utils import bare_url, count_subtitles, https_url, nowdt, rand_number, seconds_to_hms, strings_list
from ytdlp.download import ytdlp_download
@@ -75,35 +74,28 @@ async def summary_pods(client: Client):
dt = get_pubdate(entry)
pubdate = f"{dt:%Y-%m-%d %H:%M:%S}"
caption = f"🎧[{feed_title}]({homepage})\n📝[{entry['title']}]({entry['link']})\n🕒{pubdate}\n⏳{duration} #️⃣字数: {count_subtitles(transcripts)}"
- markdown_desc = convert_md(html=glom(entry, Coalesce("content.0.value", "summary"), default=""))
- markdown_desc = remove_consecutive_newlines(markdown_desc, newline_level=2)
- prompt = f"该转录稿对应于播客栏目《{feed_title}》的一期节目,节目详情如下:\n节目标题: {entry['title']}\n节目播出日期: {pubdate}"
- prompt += f"\n节目时长: {duration}\n节目简介: {markdown_desc}"
- ai_res = await summarize(transcripts, reference=prompt, model=AI.PODCAST_SUMMARY_MODEL_ALIAS)
- telegraph_content = ""
- if ai_res.get("texts"):
- telegraph_content += f"\n🤖**{ai_res['model_name']}总结**:\n{ai_res['texts']}"
- telegraph_content += f"\n📖**节目简介**:\n {markdown_desc}" if markdown_desc else ""
- telegraph_content += f"\n🔤**转录字幕**:\n{transcripts}"
-
- if telegraph_url := await publish_telegraph(title=entry["title"], html=convert_html(telegraph_content), author=feed_title, url=entry["link"]):
- caption += f"\n[🤖总结 & 🔤字幕]({telegraph_url})"
-
+ desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
+ summary = await summarize(
+ transcripts=transcripts,
+ reference=f"该转录稿对应于播客栏目《{feed_title}》的一期节目,节目详情:\n标题: {entry['title']}\n日期: {pubdate}\n时长: {duration}\n节目简介: {desc}",
+ model=AI.PODCAST_SUMMARY_MODEL_ALIAS,
+ title=entry["title"],
+ author=feed_title,
+ url=entry["link"],
+ date=dt,
+ description=desc,
+ ttl="forever",
+ )
+ if telegraph_url := summary.get("telegraph_url"):
+ caption += f"\n[🤖AI导读]({telegraph_url})"
media = (
- [
- {
- "audio": info["asr_path"],
- "title": entry["title"],
- "performer": feed_title,
- "thumb": info["thumb"],
- }
- ]
+ [{"audio": info["asr_path"], "title": entry["title"], "performer": feed_title, "thumb": info["thumb"]}]
if Path(info["path"]).suffix in [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
else [{"video": info["path"], "thumb": info["thumb"]}]
)
messages = await send2tg(client, message, texts=caption, media=media, reply_msg_id=-1)
- processed_xml = await update_xml_desc(feed_url, processed_xml, entry, summary=ai_res.get("texts", ""), audio_path=info["asr_path"])
+ processed_xml = await update_xml_desc(feed_url, processed_xml, entry, summary=summary.get("texts", ""), audio_path=info["asr_path"])
if isinstance(messages[0], Message):
await set_cf_r2(entry["db_key"], data={"title": entry["title"], "url": entry["link"]})
has_update = True
src/preview/wechat.py
@@ -50,7 +50,7 @@ async def preview_wechat(client: Client, message: Message, url: str = "", db_key
sent_messages.extend(await send2tg(client, message, texts=texts, **kwargs))
else: # 无图片长文
texts = f"{post_info['header']}"
- telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url, **kwargs)
+ telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url)
if telegraph_url:
texts += f"\n⚡️[即时预览]({telegraph_url})"
sent_messages.extend(await send2tg(client, message, texts=texts, media=[{"document": post_info["html_path"]}], **kwargs))
@@ -59,7 +59,7 @@ async def preview_wechat(client: Client, message: Message, url: str = "", db_key
sent_messages.extend(await send2tg(client, message, texts=texts, media=post_info["media"], **kwargs))
else: # 有图片长文
texts = f"{post_info['header']}"
- telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url, **kwargs)
+ telegraph_url = await publish_telegraph(title=post_info["title"], html=post_info["html"], author=post_info["author"], url=url)
if telegraph_url:
texts += f"\n⚡️[即时预览]({telegraph_url})"
sent_messages.extend(await send2tg(client, message, texts=texts, media=[{"document": post_info["path"]}], **kwargs))
src/subtitles/subtitle.py
@@ -1,14 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import contextlib
-import re
from io import BytesIO
-from typing import Literal
from glom import Coalesce, glom
from loguru import logger
from pyrogram.client import Client
-from pyrogram.types import Message
+from pyrogram.types import InputMediaDocument, LinkPreviewOptions, Message
from ai.summary import summarize
from asr.voice_recognition import asr_file
@@ -16,13 +13,12 @@ from config import AI, ASR, DOWNLOAD_DIR, PREFIX, READING_SPEED, TEXT_LENGTH, ca
from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
-from messages.utils import blockquote, delete_message, equal_prefix
+from messages.utils import blockquote, count_without_entities, equal_prefix
from networking import match_social_media_link
from preview.bilibili import get_bilibili_vinfo
from preview.youtube import get_youtube_vinfo
-from publish import publish_telegraph
from subtitles.base import fetch_subtitle, match_url
-from utils import count_subtitles, readable_time, to_int
+from utils import count_subtitles, readable_time
from ytdlp.download import ytdlp_download
HELP = f"""📃**提取字幕**
@@ -43,15 +39,12 @@ async def get_subtitle(
client: Client,
message: Message,
*,
- to_telegraph: bool = True,
ai_summary: bool = True,
summary_model_id: str = AI.SUBTITLE_SUMMARY_MODEL_ALIAS,
- send_subtitle_as: Literal["file", "str", "none"] = "file",
- enable_corrector: bool = True,
+ enable_corrector: bool = False,
**kwargs,
):
"""Get YouTube Subtitle."""
- target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
# send docs if message == "/subtitle", without reply
if equal_prefix(message.text, prefix=[PREFIX.SUBTITLE]) and not message.reply_to_message:
await send2tg(client, message, texts=HELP, **kwargs)
@@ -77,9 +70,8 @@ async def get_subtitle(
description = glom(vinfo, Coalesce("description", "desc"), default="")
caption = f"{vinfo['emoji']}[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['pubdate']}\n📝[{vinfo['title']}]({url})"
msg = f"🔍**正在获取字幕:**\n{caption}"[:TEXT_LENGTH]
- if kwargs.get("show_progress"):
- status_msg = (await send2tg(client, message, texts=msg, **kwargs))[0]
- kwargs["progress"] = status_msg
+ status_msg: Message = (await send2tg(client, message, texts=msg, **kwargs))[0] # ty:ignore[invalid-assignment]
+ kwargs["progress"] = status_msg
this_info = parse_msg(message, silent=True)
reply_info = parse_msg(message.reply_to_message, silent=True) if message.reply_to_message else {}
@@ -117,34 +109,43 @@ async def get_subtitle(
res |= {"subtitles": res["texts"], "num_chars": count_subtitles(res["texts"]), "reading_minutes": count_subtitles(res["texts"]) / READING_SPEED}
# Send subtitle
- subtitles = glom(res, Coalesce("full", "subtitles", "summary"), default="")
+ subtitles = res.get("subtitles", "")
if not subtitles:
await modify_progress(del_status=True, **kwargs)
return
+
logger.success(subtitles)
caption = f"{vinfo['emoji']}[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['pubdate']}\n"
caption += f"📝[{vinfo['title']}]({url})\n#️⃣字符数: {res['num_chars']}\n⏳阅读时长: {readable_time(60 * res['reading_minutes'])}"
- if send_subtitle_as == "file":
- if to_telegraph:
- html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")])
- if telegraph_url := await publish_telegraph(title=vinfo["title"], html=html, author=vinfo["author"], url=url, **kwargs):
- caption += f"\n⚡️[即时预览]({telegraph_url})"
- with BytesIO(subtitles.encode("utf-8")) as f:
- subtitle_msg = await client.send_document(to_int(target_chat), f, file_name=f"{vinfo['title']}.txt", caption=caption)
- elif send_subtitle_as == "str":
- subtitle_msg = (await send2tg(client, message, texts=f"{caption}\n{subtitles}", **kwargs))[0]
- else:
- subtitle_msg = message
- if ai_summary and isinstance(subtitle_msg, Message):
+ full = glom(res, Coalesce("full", "subtitles", "summary"), default="")
+ # Send subtitle txt
+ with BytesIO(full.encode("utf-8")) as f:
+ status_msg = await status_msg.edit_media(file_name=f"{vinfo['title']}.txt", media=InputMediaDocument(f, caption=caption))
+
+ if ai_summary and isinstance(status_msg, Message):
# use real subtitle (without AI summary by Bilibili)
- subtitles = re.sub(r"(.*?)AI总结(B站版):", "", subtitles, flags=re.DOTALL).strip() # noqa: RUF001
- prompt = f"该转录稿对应于{matched['platform'].title()}视频作者【{vinfo['author']}】的一期节目,节目详情如下:\n"
- prompt += f"节目标题: {vinfo['title']}\n发布日期: {vinfo['pubdate']}\n"
+ prompt = f"该转录稿对应于{matched['platform'].title()}视频作者【{vinfo['author']}】的一期节目,节目详情:\n标题: {vinfo['title']}\n日期: {vinfo['pubdate']}\n"
+ prompt += f"标题: {vinfo['title']}\n日期: {vinfo['pubdate']}\n"
if description.strip():
prompt += f"节目简介: {description}"
- res = await summarize(subtitles, reference=prompt, model=summary_model_id)
- if res.get("texts"):
- await send2tg(client, subtitle_msg, texts=res["prefix"] + blockquote(res["texts"]), **kwargs)
- with contextlib.suppress(Exception):
- [await delete_message(msg) for msg in res.get("sent_messages", [])]
- await delete_message(kwargs.get("progress"))
+ summary = await summarize(
+ transcripts=subtitles,
+ reference=prompt,
+ model=summary_model_id,
+ title=vinfo["title"],
+ author=vinfo["author"],
+ url=url,
+ date=vinfo["pubdate"],
+ description=description,
+ )
+
+ if not summary.get("texts"):
+ return
+ telegraph_url = summary.get("telegraph_url") or ""
+ link_preview = LinkPreviewOptions(is_disabled=False, url=telegraph_url) if telegraph_url else LinkPreviewOptions(is_disabled=True)
+ if await count_without_entities(summary["texts"]) <= TEXT_LENGTH:
+ await status_msg.reply_text(blockquote(summary["texts"]), quote=True, link_preview_options=link_preview)
+ elif telegraph_url:
+ await status_msg.reply_text(telegraph_url, link_preview_options=link_preview, quote=True)
+ else:
+ await send2tg(client, status_msg, texts=summary["texts"])
src/ytdlp/main.py
@@ -1,18 +1,17 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import io
import warnings
from pathlib import Path
from typing import Literal
-import markdown
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
+from glom import Coalesce, glom
from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
from ai.summary import summarize
-from config import AI, ASR, CAPTION_LENGTH, DB, MAX_FILE_BYTES, READING_SPEED, YTDLP_RE_ENCODING_MAX_FILE_BYTES
+from config import AI, ASR, CAPTION_LENGTH, DB, MAX_FILE_BYTES, YTDLP_RE_ENCODING_MAX_FILE_BYTES
from database.database import get_db
from messages.database import copy_messages_from_db, save_messages
from messages.preprocess import preprocess_media
@@ -23,9 +22,9 @@ from multimedia import convert_to_h264
from preview.bilibili import get_bilibili_comments, get_bilibili_vinfo, make_bvid_clickable
from preview.youtube import get_youtube_comments, get_youtube_vinfo
from publish import publish_telegraph
-from utils import count_subtitles, readable_size, readable_time, soup_to_text, to_int, true, ts_to_dt, unicode_to_ascii
+from utils import readable_size, soup_to_text, to_int, true, ts_to_dt, unicode_to_ascii
from ytdlp.download import ytdlp_download
-from ytdlp.utils import append_subtitle, cleanup_ytdlp, generate_prompt, get_subtitles, platform_emoji
+from ytdlp.utils import append_tag, cleanup_ytdlp, generate_prompt, get_subtitles, platform_emoji
async def preview_ytdlp(
@@ -46,11 +45,9 @@ async def preview_ytdlp(
proxy: str | None = None,
ytdlp_video_target: str | int | None = None,
ytdlp_audio_target: str | int | None = None,
- ytdlp_subtitle_target: str | int | None = None,
ytdlp_send_subtitle: bool = False,
ytdlp_send_summary: bool = False,
summary_model_id: str = AI.SUBTITLE_SUMMARY_MODEL_ALIAS,
- to_telegraph: bool = True,
enable_corrector: bool = True,
show_author: bool = True,
show_title: bool = True,
@@ -143,31 +140,29 @@ async def preview_ytdlp(
if true(ytdlp_send_subtitle) or true(ytdlp_send_summary):
fpath = info["audio_path"] if info["audio_path"].is_file() else info["video_path"]
asr_engine = kwargs.get("asr_engine", "uncensored") if platform == "youtube" else ASR.DEFAULT_ENGINE
- if sub := await get_subtitles(fpath, url, asr_engine, info, enable_corrector=enable_corrector):
- subtitles = f"🔤<b>字幕:</b>\n{sub}"
+ subtitles = await get_subtitles(fpath, url, asr_engine, info, enable_corrector=enable_corrector)
# get ai summary
- summary = ""
+ telegraph_ai = ""
if subtitles and true(ytdlp_send_summary):
- aires = await summarize(sub, reference=generate_prompt(info), model=summary_model_id)
- if aires.get("texts"):
- summary = f"🤖<b>{aires['model_name']}总结:</b>\n{markdown.markdown(aires['texts'])}\n"
+ summary = await summarize(
+ transcripts=subtitles,
+ reference=generate_prompt(info),
+ model=summary_model_id,
+ title=info.get("title"),
+ description=info.get("description"),
+ author=info.get("author"),
+ url=url,
+ date=glom(info, Coalesce("pubdate", "upload_date"), default=""),
+ )
+ telegraph_ai = summary.get("telegraph_url", "")
- if summary_with_subtitle := f"{summary}{subtitles}":
- telegraph_name = "🤖总结 & 🔤字幕" if summary and subtitles else "🔤字幕" if subtitles else "🤖AI总结"
- caption = f"{captions['caption_without_comments']}\n"
- caption += f"#️⃣字符数: {count_subtitles(summary_with_subtitle)}\n"
- caption += f"⏳阅读时长: {readable_time(60 * count_subtitles(summary_with_subtitle) / READING_SPEED)}"
- html = "\n".join([f"<p>{s}</p>" for s in summary_with_subtitle.split("\n")]).replace("<p></p>", "")
- if true(to_telegraph) and (telegraph_url := await publish_telegraph(title=info["title"], html=html, author=info["author"], url=url, **kwargs)):
- caption += f"\n⚡️[即时预览]({telegraph_url})"
- sent_messages = await append_subtitle(f'<a href="{telegraph_url}">{telegraph_name}</a>', sent_messages)
- else:
- subtitle_target = ytdlp_subtitle_target or kwargs.get("target_chat") or message.chat.id
- with io.BytesIO(subtitles.encode("utf-8")) as f:
- subtitle_msg = await client.send_document(to_int(subtitle_target), f, file_name=f"{info['title']}.txt", caption=caption)
- if isinstance(subtitle_msg, Message):
- sent_messages["caption"] = subtitle_msg
+ if telegraph_ai: # ai summary with subtitles
+ sent_messages = await append_tag(f'<a href="{telegraph_ai}">🤖AI导读</a>', sent_messages)
+ elif subtitles: # subtitles only
+ html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")]).replace("<p></p>", "")
+ if telegraph_url := await publish_telegraph(title=info["title"], html=html, author=info["author"], url=url):
+ sent_messages = await append_tag(f'<a href="{telegraph_url}">🔤字幕</a>', sent_messages)
# save messages when video is uploaded
messages = [msg for msgs in sent_messages.values() for msg in (msgs if isinstance(msgs, list) else [msgs])]
src/ytdlp/utils.py
@@ -202,12 +202,12 @@ async def get_subtitles(audio_path: str | Path, url: str, asr_engine: str, vinfo
if not subtitles:
res = await asr_file(audio_path, asr_engine, corrector_reference=reference, enable_corrector=enable_corrector, silent=True)
subtitles = res.get("texts", "")
- if count_subtitles(subtitles) < 20:
+ if count_subtitles(subtitles) < 200:
subtitles = "" # ignore too short transcription
return subtitles
-async def append_subtitle(name: str, sent_messages: dict) -> dict:
+async def append_tag(name: str, sent_messages: dict) -> dict:
"""Add subtitle to sent messages.
sent_message:
src/networking.py
@@ -552,7 +552,7 @@ async def flatten_rediercts(
return texts.replace(url, rediercted_url)
-async def shorten_url(url: str, services: list[str] | None = None) -> str:
+async def shorten_url(url: str, alias: str | None = None, services: list[str] | None = None) -> str:
"""Shorten URL."""
if not url:
return url
@@ -568,7 +568,11 @@ async def shorten_url(url: str, services: list[str] | None = None) -> str:
if TOKEN.SPOOME:
headers |= {"Authorization": f"Bearer {TOKEN.SPOOME}"}
payload |= {"private_stats": False}
- resp = await hx_req("https://spoo.me/api/v1/shorten", "POST", headers=headers, json_data=payload, check_kv={"status": "ACTIVE"})
+ if alias:
+ payload |= {"alias": alias}
+ resp = await hx_req("https://spoo.me/api/v1/shorten", "POST", headers=headers, json_data=payload, check_kv={"status": "ACTIVE"}, max_retry=0)
+ if glom(resp, "hx_raw.code", default="") == "conflict":
+ return f"https://spoo.me/{alias}"
if short_url := glom(resp, "short_url", default=""):
return short_url
if service == "cleanuri.com":
@@ -581,7 +585,7 @@ async def shorten_url(url: str, services: list[str] | None = None) -> str:
if __name__ == "__main__":
import asyncio
- asyncio.run(shorten_url("https://www.google.com"))
+ asyncio.run(shorten_url("https://www.google.com", alias="test"))
check_data(json.dumps({"foo": "bar", "baz": {"qux": "quux"}, "lst": ["1", "2", "3"]}), check_keys=["baz.qux"], check_kv={"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]})
# asyncio.run(match_social_media_link("https://b23.tv/3MSgT4q/", flatten_first=True))
# print(asyncio.run(match_social_media_link("https://mp.weixin.qq.com/s/bd_giuPEyPBu9LTOtC2VHw", flatten_first=True)))
src/publish.py
@@ -1,81 +1,92 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import contextlib
import io
+import re
import tempfile
+from datetime import UTC
from pathlib import Path
+from typing import Literal
from urllib.parse import quote_plus
+from zoneinfo import ZoneInfo
import anyio
-import markdown
-from glom import Coalesce, glom
+from glom import glom
from httpx import AsyncClient
from loguru import logger
from telegraph.aio import Telegraph
+from telegraph.utils import html_to_nodes
-from config import DB, TOKEN, TZ
+from asr.utils import split_transcripts
+from config import DB, DOWNLOAD_DIR, TOKEN, TZ
from database.r2 import set_cf_r2
-from utils import nowdt, rand_string
+from networking import download_file
+from schema import AIPage, Section
+from utils import convert_html, convert_md, digest, nowdt, rand_string, remove_consecutive_newlines
+
+
+def adjust_tags(s: str | None) -> str:
+ # Revise Telegraph Tags
+ s = str(s).replace("<h1>", "<h3>").replace("</h1>", "</h3>")
+ return s.replace("<h2>", "<h3>").replace("</h2>", "</h3>")
async def publish_telegraph(
- title: str,
- texts: str | None = None,
- html: str = "",
+ title: str | None = None,
+ html: str | None = None,
+ nodes: list[dict] | None = None,
author: str | None = None,
url: str | None = None,
- ttl: str = "forever", # 12h, 7d, 1M, ...
- **kwargs, # noqa: ARG001
+ aipage: AIPage | None = None,
+ ttl: str | None = None, # 12h, 7d, 1M, ...
+ *,
+ fallback_r2: bool = True,
) -> str:
- """Publish to Telegraph."""
+ """Publish to Telegraph.
- def clean_html(s: str | None) -> str:
- # Revise Telegraph Tags
- s = str(s).replace("<h1>", "<h3>").replace("</h1>", "</h3>")
- return s.replace("<h2>", "<h3>").replace("</h2>", "</h3>")
+ Available tags:
+ a, aside, b, blockquote, br, code, em, figcaption, figure, h3, h4, hr, i, iframe, img, li, ol, p, pre, s, strong, u, ul, video.
- if not (texts or html):
- return ""
- if not TOKEN.TELEGRAPH:
- return await publish_cf_r2(title, texts=texts, html=html, author=author, url=url)
- if texts and not html:
- html = markdown.markdown(texts)
- telegraph = Telegraph(access_token=TOKEN.TELEGRAPH)
- if not (author and url):
- with contextlib.suppress(Exception):
- account_info = await telegraph.get_account_info()
- if not author:
- author = glom(account_info, Coalesce("result.short_name", "result.author_name"), default=None)
- if not url:
- url = glom(account_info, "result.author_url", default=None)
- # sanitize
+ """
+ ttl = ttl or "365d"
+ title = title or "Telegraph"
+ if aipage is not None:
+ return await telegraph_aipage(aipage, ttl=ttl)
+
+ # limit title, author, url length
title = title[:256]
- if isinstance(author, str):
+ if author:
author = author[:128]
- if isinstance(url, str):
+ if url:
url = url[:512]
try:
- page = await telegraph.create_page(title=title[:256], author_name=author, author_url=url, html_content=clean_html(html))
+ telegraph = Telegraph(access_token=TOKEN.TELEGRAPH)
+ page = await telegraph.create_page(title=title, author_name=author, author_url=url, content=nodes, html_content=adjust_tags(html))
logger.info(f"⚡️Telegraph: {page['url']}")
return page["url"]
except Exception as e:
logger.error(f"Telegraph publish error: {e}")
- return await publish_cf_r2(title, texts=texts, html=html, author=author, url=url, ttl=ttl)
+ if fallback_r2:
+ return await publish_cf_r2(title, html=html, author=author, url=url, ttl=ttl)
+ return ""
async def publish_cf_r2(
title: str,
- texts: str | None = None,
- html: str = "",
+ html: str | None = None,
author: str | None = None,
url: str | None = None,
- ttl: str = "forever",
+ aipage: AIPage | None = None,
+ ttl: str = "365d",
) -> str:
"""Publish to CF R2."""
- if not (texts or html):
+ if html is None and aipage is None:
+ logger.error("`html` or `aipage` parameter is required")
+ return ""
+ if html is not None and aipage is not None:
+ logger.error("`html` and `aipage` parameter cannot be both provided")
+ return ""
+ if not html:
return ""
- if texts and not html:
- html = markdown.markdown(texts)
now = nowdt(TZ)
today = f"{now:%Y-%m-%d}"
key = f"InstantView/{today}-{rand_string(8)}.html" if ttl == "forever" else f"TTL/{ttl}/{today}-{rand_string(8)}.html"
@@ -90,17 +101,15 @@ async def publish_cf_r2(
pub_url = f"{DB.CF_R2_PUBLIC_URL.rstrip('/')}/{key}"
logger.info(f"⚡️CF R2: {pub_url}")
return f"https://t.me/iv?url={quote_plus(pub_url)}&rhash={TOKEN.R2_IV_HASH}"
- return await publish_neocities(title, texts=texts, html=html, author=author, url=url)
+ return await publish_neocities(title, html=html, author=author, url=url)
-async def publish_neocities(title: str, texts: str | None = None, html: str = "", author: str | None = None, url: str | None = None) -> str:
+async def publish_neocities(title: str, html: str | None = None, author: str | None = None, url: str | None = None) -> str:
"""Publish to neocities.org ."""
if not TOKEN.NEOCITIES:
return ""
- if not (texts or html):
+ if not html:
return ""
- if texts and not html:
- html = markdown.markdown(texts)
base_url = "https://neocities.org/api/upload"
username, password = TOKEN.NEOCITIES.split(",")
now = nowdt(TZ)
@@ -133,3 +142,190 @@ async def publish_neocities(title: str, texts: str | None = None, html: str = ""
return ""
return f"https://t.me/iv?url={quote_plus(pub_url)}&rhash={TOKEN.NEOCITIES_IV_HASH}" if TOKEN.NEOCITIES_IV_HASH else pub_url
+
+
+async def telegraph_aipage(page: AIPage, ttl: str | None = None) -> str:
+ """Publish AI Page to Telegraph."""
+ anchor = lambda s: s.replace(" ", "-")
+
+ nodes = []
+ overview = glom(page, "summary.overview", default="")
+ transcripts = page.transcripts or []
+ if isinstance(transcripts, str):
+ transcripts = split_transcripts(transcripts)
+ sections: list[Section] = glom(page, "summary.sections", default=[])
+
+ # TOC
+ ul = [{"tag": "li", "children": [{"attrs": {"href": f"#{anchor(section.emoji + section.title)}"}, "children": [section.emoji + section.title], "tag": "a"}]} for section in sections]
+ if transcripts:
+ ul.append({"tag": "li", "children": [{"attrs": {"href": "#🔤完整字幕"}, "children": ["🔤完整字幕"], "tag": "a"}]})
+ if page.mermaid_url:
+ ul.append({"tag": "li", "children": [{"attrs": {"href": page.mermaid_url}, "children": ["🧠思维导图"], "tag": "a"}]})
+ if ul:
+ toc = {"tag": "ul", "children": ul}
+ nodes.append(toc)
+
+ # Overview
+ if overview:
+ nodes.append({"tag": "h3", "children": ["🤖AI导读"]})
+ nodes.append({"tag": "p", "children": [overview]})
+
+ # Description
+ if page.description:
+ desc = convert_md(html=page.description)
+ desc_html = convert_html(remove_consecutive_newlines(desc, newline_level=2))
+ desc_nodes = html_to_nodes(desc_html)
+ nodes.append({"tag": "h4", "children": ["📖原始简介"]})
+ nodes.extend(desc_nodes)
+
+ # Sections
+ for section in sections:
+ nodes.append({"tag": "h4", "children": [section.emoji + section.title]})
+ nodes.append({"tag": "p", "children": [section.content]})
+
+ # Transcript
+ if transcripts:
+ nodes.append({"tag": "h4", "children": ["🔤完整字幕"]})
+ nodes.extend([{"children": [f"[{t.start}] {t.content}"], "tag": "p"} for t in transcripts])
+ if not nodes:
+ logger.warning("No Telegraph nodes to publish")
+ return ""
+ telegraph_url = await publish_telegraph(title=page.title, nodes=nodes, author=page.author, url=page.url, ttl=ttl, fallback_r2=False)
+ if not telegraph_url:
+ return await r2_aipage(page, ttl=ttl, rformat="url")
+ return telegraph_url
+
+
+async def r2_aipage(page: AIPage, ttl: str | None = None, *, expand_transcript: bool = False, rformat: Literal["url", "html"] = "url") -> str:
+ """Publish AI Page to R2."""
+ ttl = ttl or "365d"
+ today = f"{nowdt(TZ):%Y-%m-%d}"
+ r2_prefix = f"InstantView/{today}" if ttl == "forever" else f"TTL/{ttl}/{today}"
+ r2_key = f"{r2_prefix}-{digest(page)}.html" # noqa: S324
+ r2_url = f"{DB.CF_R2_PUBLIC_URL}/{r2_key}"
+ if TOKEN.R2_IV_HASH:
+ r2_url = f"https://t.me/iv?url={quote_plus(r2_url)}&rhash={TOKEN.R2_IV_HASH}"
+ date = page.date or nowdt("UTC")
+ utc_date = date.astimezone(UTC)
+ tz_date = utc_date.astimezone(ZoneInfo(TZ))
+ url = page.url or "https://instantview.telegram.org"
+ author_tag = f'<div class="header-author"><span class="header-author">{page.author}</span><span class="header-date"> | {tz_date:%Y-%m-%d %H:%M:%S}</span></div>' if page.author else ""
+ overview = glom(page, "summary.overview", default="")
+ if overview:
+ overview = f'<div class="card summary"><div class="card-label" id="summary" >🤖AI导读</div>{convert_html(overview)}</div>'
+
+ sections: list[Section] = glom(page, "summary.sections", default=[])
+
+ sidebars = '<nav class="sidebar" id="sidebar"><ul>'
+ if overview:
+ sidebars += '<li><a href="#summary" onclick="navClick(event)"><span class="sidebar-icon">🤖</span><span class="sidebar-label">AI导读</span></a></li>'
+
+ desc_tag = ""
+ desc_head = ""
+ if page.description:
+ sidebars += """<li><a href="#description" onclick="navClick(event)"><span class="sidebar-icon">📖</span><span class="sidebar-label">原始简介</span></a></li>"""
+ desc_html = page.description if page.description.startswith("<") else convert_html(page.description)
+ desc_tag = f'<div class="card description"><div class="card-label" id="description">📖原始简介</div>{desc_html}</div>'
+ if page.description and overview:
+ desc_head = f"""<meta property="og:description" content="{glom(page, "summary.overview", default="")}">"""
+
+ sections_tag = ""
+
+ for idx, section in enumerate(sections):
+ sidebars += f'<li><a href="#s{idx + 1}" onclick="navClick(event)"><span class="sidebar-icon">{section.emoji}</span><span class="sidebar-label">{section.title}</span>'
+ sections_tag += f'<section class="section" id="s{idx + 1}"><div class="section-header"><span class="section-icon">{section.emoji}</span><h2 class="section-title">{section.title}</h2>'
+ if section.start:
+ start = section.start.removeprefix("00:") if len(section.start) > 5 else section.start
+ sidebars += f'<span class="sidebar-time">{start}</span>'
+ sections_tag += f'<span class="section-time">{start}</span>'
+ sidebars += "</a></li>"
+ sections_tag += f"</div>{convert_html(section.content)}</section>"
+
+ transcripts = page.transcripts or []
+ if isinstance(transcripts, str):
+ transcripts = split_transcripts(transcripts)
+
+ transcriptions = ""
+ if transcripts:
+ sidebars += """<li><a href="#transcript" onclick="navClick(event)"><span class="sidebar-icon">🔤</span><span class="sidebar-label">完整字幕</span></a></li>"""
+ transcriptions += '<div class="card" id="transcript" style="margin-top: 24px;"><button class="transcript-toggle" aria-expanded="false" onclick="toggleTranscript(this)">展开字幕 <span class="arrow">▾</span></button><div class="transcript-content" id="transcriptions">'
+ for sentence in transcripts:
+ transcriptions += f'<p><span class="ts">{sentence.start}</span>{sentence.content}</p>'
+ transcriptions += "</div></div>"
+
+ sidebars += "</ul></nav>"
+ sidebar_icon = '<button class="icon-toc" id="icon-toc" onclick="toggleSidebar()" aria-label="菜单"><svg class="icon-open" viewBox="0 0 24 24"><line x1="3" y1="6" x2="21" y2="6" /><line x1="3" y1="12" x2="21" y2="12" /><line x1="3" y1="18" x2="21" y2="18" /></svg><svg class="icon-close" viewBox="0 0 24 24" style="display:none"><polyline points="13,4 5,12 13,20" /><polyline points="20,4 12,12 20,20" /></svg></button>'
+ if sidebars == '<nav class="sidebar" id="sidebar"><ul></ul></nav>': # empty sidebar
+ sidebars = ""
+ sidebar_icon = ""
+
+ if not any([overview, page.mermaid_img, sections_tag, desc_tag, transcriptions]):
+ logger.warning("No AIPage contents to publish")
+ return ""
+
+ theme_icon = '<button class="icon-theme" id="icon-theme" onclick="toggleTheme()" aria-label="切换主题"><svg class="icon-sun" viewBox="0 0 24 24"><circle cx="12" cy="12" r="5" /><path d="M12 1v2M12 21v2M4.22 4.22l1.42 1.42M18.36 18.36l1.42 1.42M1 12h2M21 12h2M4.22 19.78l1.42-1.42M18.36 5.64l1.42-1.42" /></svg><svg class="icon-moon" viewBox="0 0 24 24" style="display:none"><path d="M21 12.79A9 9 0 1 1 11.21 3 7 7 0 0 0 21 12.79z" /></svg></button>'
+ mermaid_icon = '<button class="icon-mindmap" id="icon-mindmap" onclick="toggleMindmapPanel()" aria-label="思维导图"><svg viewBox="0 0 24 24"><circle cx="4" cy="12" r="2" /><path d="M6 12h6M12 12l8-8M12 12h8M12 12l8 8" /></svg></button>'if page.mermaid_img else "" # fmt: skip
+
+ mermaid_desktop = f'<div class="mindmap-panel" id="mindmap-panel"><div class="mindmap-panel-content"><img src="{page.mermaid_img}" alt="思维导图"><a href="{page.mermaid_url}" target="_blank" class="mindmap-link">查看完整思维导图</a></div></div>' if page.mermaid_img else "" # fmt: skip
+
+ mermaid_mobile = f'<div class="card mindmap-card mindmap-mobile" id="mindmap-mobile"><button class="transcript-toggle" aria-expanded="false" onclick="toggleMindmap(this)">展开思维导图 <span class="arrow">▾</span></button><div class="mindmap-body" id="mindmap-body"><img src="{page.mermaid_img}" alt="思维导图"><a href="{page.mermaid_url}" target="_blank" class="mindmap-link">查看完整思维导图</a></div></div>' if page.mermaid_img else "" # fmt: skip
+
+ html_str = f"""<!DOCTYPE html><html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta property="article:published_time" content="{utc_date:%Y-%m-%dT%H:%M:%SZ}"><meta property="og:title" content="{page.title}">{desc_head}<title>{page.title}</title><link rel="stylesheet" href="{DB.CF_R2_PUBLIC_URL}/telegraph.css"><script src="{DB.CF_R2_PUBLIC_URL}/telegraph.js" defer></script></head><body>
+
+ <!-- Icon -->
+ {sidebar_icon}
+
+ {mermaid_icon}
+
+ {theme_icon}
+
+ {sidebars}
+ <div class="container">
+ <header class="header"><h1 class="header-title"><a href="{url}" target="_blank">{page.title}</a></h1>{author_tag}</header>
+
+ <!-- AI Summary -->
+ {overview}
+
+ {mermaid_mobile}
+
+ <!-- Description -->
+ {desc_tag}
+
+ <!-- Sections -->
+ {sections_tag}
+
+
+ <!-- Transcript -->
+ {transcriptions}
+
+ </div>
+
+ {mermaid_desktop}
+
+</body>
+
+</html>"""
+ # simplify html
+ html_str = html_str.replace(f'<a href="" target="_blank">{page.title}</a>', page.title)
+ html_str = html_str.replace('<footer class="footer header-title"><a href="" target="_blank" ></a></footer>', "")
+ html_str = re.sub(r"<!--[\s\S]*?-->", "", html_str)
+ html_str = re.sub(r"\s+", " ", html_str).strip()
+ if expand_transcript:
+ html_str = html_str.replace('class="transcript-content"', 'class="transcript-content open"')
+ html_str = html_str.replace('aria-expanded="false"', 'aria-expanded="true"')
+ html_str = html_str.replace(">展开字幕 <", ">收起字幕 <")
+ if rformat == "url":
+ return r2_url if await set_cf_r2(key=r2_key, data=html_str, metadata={"title": page.title, "author": page.author or "BennyBot", "url": page.url}, mime_type="text/html") else ""
+
+ # return html, embed css and js in head
+ css_path = Path(DOWNLOAD_DIR) / "HTML/telegraph.css"
+ js_path = Path(DOWNLOAD_DIR) / "HTML/telegraph.js"
+ await download_file(DB.CF_R2_PUBLIC_URL + "/telegraph.css", css_path, skip_exist=True)
+ await download_file(DB.CF_R2_PUBLIC_URL + "/telegraph.js", js_path, skip_exist=True)
+ if css_path.exists():
+ css = css_path.read_text()
+ html_str = html_str.replace(f'<link rel="stylesheet" href="{DB.CF_R2_PUBLIC_URL}/telegraph.css">', f"<style>\n{css}\n</style>")
+ if js_path.exists():
+ js = js_path.read_text()
+ html_str = html_str.replace(f'<script src="{DB.CF_R2_PUBLIC_URL}/telegraph.js" defer></script>', f"<script defer>\n{js}\n</script>")
+ return html_str
src/schema.py
@@ -0,0 +1,163 @@
+from datetime import UTC, datetime
+from typing import Literal
+
+import jsonref
+from pydantic import BaseModel, ConfigDict, Field
+
+
+def mermaid_syntax() -> str:
+ return """
+# Mermaid Graph - Basic Syntax
+
+Graph is composed of **nodes** (geometric shapes) and **edges** (arrows or lines). The Mermaid code defines how nodes and edges are made and accommodates different arrow types, multi-directional arrows, and any linking to and from subgraphs.
+
+## A node (default)
+
+```mermaid
+graph LR
+ id
+```
+
+```note
+The id is what is displayed in the box.
+```
+
+### A node with text
+
+It is also possible to set text in the box that differs from the id. If this is done several times, it is the last text
+found for the node that will be used. Also if you define edges for the node later on, you can omit text definitions. The
+one previously defined will be used when rendering the box.
+
+```mermaid
+graph LR
+ id1[This is the text in the box]
+```
+
+## Node shapes
+
+### A node with round edges
+
+```mermaid
+graph LR
+ id1(This is the text in the box)
+```
+
+## Links between nodes
+
+Nodes can be connected with links/edges. It is possible to have different types of links or attach a text string to a link.
+
+### A link with arrow head
+
+```mermaid
+graph LR
+ A-->B
+```
+
+### An open link
+
+```mermaid
+graph LR
+ A --- B
+```
+
+### Text on links
+
+```mermaid
+graph LR
+ A---|This is the text|B
+```
+
+### A link with arrow head and text
+
+```mermaid
+graph LR
+ A-->|text|B
+```
+
+### Dotted link
+
+```mermaid
+graph LR
+ A-.->B;
+```
+
+### Dotted link with text
+
+```mermaid
+graph LR
+ A-. text .-> B
+```
+
+### Thick link
+
+```mermaid
+graph LR
+ A ==> B
+```
+
+### Thick link with text
+
+```mermaid
+graph LR
+ A == text ==> B
+```
+""".strip()
+
+
+class Section(BaseModel):
+ """分片内容详情."""
+
+ model_config = ConfigDict(str_strip_whitespace=True)
+ title: str = Field(description="该片段的标题")
+ emoji: str = Field(description="匹配该片段的emoji,例如💡、💰、⚠️等")
+ content: str = Field(description="详细说明该片段的核心事件、具体观点或结论,禁止仅用1-2句话泛泛概括,需传递足够细节。")
+ start: str | None = Field(default=None, description="如果资料为含时间戳的文字稿(如播客/视频/音频的转录稿),需补充start字段HH:MM:SS或MM:SS;无时间戳则无需输出start字段。")
+
+
+class ContentExtraction(BaseModel):
+ model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
+ overview: str = Field(
+ title="全文总结",
+ description="需涵盖资料核心主题、关键观点和主要结论,采用连贯语言表述,若内容复杂可分段,但需逻辑清晰。禁止过于简略(如仅用一句话概括长文档),确保信息密度足够支撑用户理解。",
+ )
+ sections: list[Section] = Field(
+ title="分片内容",
+ description="需将文档划分为逻辑连贯的片段(如按章节、主题、时间线划分);每个片段需拟定**简洁准确**的标题(体现片段核心)、匹配1个相关emoji;并说明该片段的核心内容。",
+ )
+ mermaid: str = Field(
+ title="思维导图",
+ pattern=r"^graph LR",
+ description=f"以Mermaid graph格式表示的全文思维导图,以'graph LR'开头。需清晰呈现文档的逻辑结构(如核心主题→子主题→关键观点/结论),节点层级明确,便于用户快速梳理文档框架。\n{mermaid_syntax()}",
+ examples=[
+ "graph LR\n A[核心主题] --> B[子标题1]\n A --> C[子标题2]\n A --> D[子标题3]\n A --> E[子标题4]\n\n\n B --> B1[二级标题1-1]\n B --> B2[二级标题1-2]\n B1 --> B11[核心观点1-1-1]\n B1 --> B12[核心观点1-1-2]\n B2 --> B21[争议点1-2-1]\n B2 --> B22[争议点1-2-2]\n\n\n C --> C1[关键数据2-1]\n C --> C2[主要结论2-1]\n C --> C3[补充结论2-2]\n\n D --> D1[核心问题3-1]\n D --> D2[潜在风险3-2]\n D --> D3[影响因素3-3]\n\n E --> E1[发展趋势4-1]\n E --> E2[行动建议4-2]\n E --> E3[未来结论4-3]",
+ ],
+ )
+
+
+class Sentence(BaseModel):
+ model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
+ content: str = Field(description="句子内容")
+ start: str = Field(description="句子开始时间,格式为HH:MM:SS或MM:SS")
+
+
+class AIPage(BaseModel):
+ model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
+ title: str = Field(default="AI导读", description="标题")
+ url: str | None = Field(default=None, description="原始链接")
+ author: str | None = Field(default=None, description="作者")
+ description: str | None = Field(default=None, description="原始描述")
+ date: datetime | None = Field(default_factory=lambda: datetime.now(UTC), description="发布日期")
+ summary: ContentExtraction | None = Field(default=None, description="AI总结")
+ transcripts: str | list[Sentence] | None = Field(default=None, description="转录稿")
+ mermaid_img: str | None = Field(default=None, description="思维导图图片URL")
+ mermaid_url: str | None = Field(default=None, description="思维导图代码URL")
+
+
+def get_schema(name: Literal["content_extraction"] = "content_extraction") -> dict:
+ if name == "content_extraction":
+ schema = ContentExtraction.model_json_schema()
+ else:
+ return {}
+ inlined_schema = jsonref.replace_refs(schema, proxies=False)
+ inlined_schema.pop("$defs", None)
+ return inlined_schema
src/utils.py
@@ -1,6 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import base64
import contextlib
+import hashlib
import json
import os
import random
@@ -119,6 +121,13 @@ def true(value: Any) -> bool:
return True
+def digest(s: Any, length: int = 32) -> str:
+ raw_bytes = hashlib.shake_256(str(s).encode()).digest(length * 2)
+ b64_str = base64.urlsafe_b64encode(raw_bytes).decode("ascii")
+ b64_str = b64_str.replace("=", "").replace("-", "").replace("_", "")
+ return b64_str[:length]
+
+
def remove_none_values(d: dict | list) -> dict:
"""Recursively removes keys with None values from a nested dictionary.
@@ -243,6 +252,29 @@ def seconds_to_time(seconds: float) -> str:
return f"{m:02d}:{s:02d}"
+def to_dt(t: float | str | datetime | None, tz="UTC") -> datetime:
+ """Convert float, str, datetime to datetime."""
+ if isinstance(t, datetime):
+ return t
+ if isinstance(t, float):
+ ts = round(t)
+ ts = ts / 10**6 if ts > 10**14 else ts
+ ts = ts / 10**3 if ts > 10**11 else ts
+ return datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(tz))
+ if not isinstance(t, str):
+ return nowdt(tz)
+ t = str(t).strip()
+ if len(t) == 4: # 2026
+ return datetime.strptime(t, "%Y").astimezone(ZoneInfo(tz))
+ if len(t) == 7: # 2026-02
+ return datetime.strptime(t, "%Y-%m").astimezone(ZoneInfo(tz))
+ if len(t) == 10: # 2026-02-01
+ return datetime.strptime(t, "%Y-%m-%d").astimezone(ZoneInfo(tz))
+ if len(t) == 19: # 2026-02-01 12:46:40
+ return datetime.strptime(t, "%Y-%m-%d %H:%M:%S").astimezone(ZoneInfo(tz))
+ return nowdt(tz)
+
+
def readable_time(seconds: str | float) -> str:
"""Human readable time duration.
@@ -636,6 +668,7 @@ def zhcn(text: str) -> str:
if __name__ == "__main__":
+ print(digest("1"))
print(rand_string())
print(rand_number())
# print(cleanup_old_files())