Commit ece66f2
Changed files (9)
src
ai
texts
danmu
history
others
preview
src/ai/texts/contexts.py
@@ -23,7 +23,7 @@ from asr.utils import GEMINI_AUDIO_EXT, downsampe_audio
from config import AI, DOWNLOAD_DIR, PROXY, TID
from database.r2 import head_cf_r2, set_cf_r2
from messages.parser import parse_msg
-from utils import convert_md, read_text
+from utils import convert2md, read_text
if TYPE_CHECKING:
from io import BytesIO
@@ -90,7 +90,7 @@ async def get_openai_completion_contexts(client: Client, message: Message, *, ad
)
elif Path(info["file_name"]).suffix in MARKDOWN_EXT:
fpath: str = await client.download_media(message, media_path) # type: ignore
- text = convert_md(fpath)
+ text = convert2md(path=fpath)
Path(fpath).unlink(missing_ok=True)
context["content"].append(
{
@@ -218,7 +218,7 @@ async def single_openai_response_context(client: Client, message: Message, param
)
elif Path(info["file_name"]).suffix in MARKDOWN_EXT:
fpath: str = await client.download_media(message, media_path) # type: ignore
- text = convert_md(fpath)
+ text = convert2md(path=fpath)
Path(fpath).unlink(missing_ok=True)
context["content"].append(
{
@@ -319,7 +319,7 @@ async def get_gemini_contexts(client: Client, message: Message, gemini: genai.Cl
parts.append(Part.from_text(text=f"[filename]: {info['file_name']}\n[file content]:\n{read_text(fpath).strip()}"))
if Path(info["file_name"]).suffix in MARKDOWN_EXT:
fpath: str = await client.download_media(msg, media_path) # type: ignore
- text = convert_md(fpath)
+ text = convert2md(path=fpath)
Path(fpath).unlink(missing_ok=True)
parts.append(Part.from_text(text=f"[filename]: {info['file_name']}\n[file content]:\n{text.strip()}"))
texts = info["html"] or info["text"] if role == "user" and info["entity_urls"] else info["text"]
@@ -431,7 +431,7 @@ async def get_anthropic_contexts(
elif Path(info["file_name"]).suffix in MARKDOWN_EXT:
fpath: str = await client.download_media(msg, media_path) # type: ignore
- text = convert_md(fpath)
+ text = convert2md(path=fpath)
Path(fpath).unlink(missing_ok=True)
context["content"].append({"type": "text", "text": f"[filename]: {info['file_name']}\n[file content]:\n{text.strip()}"})
# user message has entity urls, use full html
src/danmu/entrypoint.py
@@ -20,7 +20,7 @@ from messages.sender import send2tg
from messages.utils import blockquote, delete_message, equal_prefix, smart_split, startswith_prefix
from others.emoji import CURRENCY
from publish import publish_telegraph
-from utils import convert_html, nowdt, number, strings_list
+from utils import convert2html, nowdt, number, strings_list
HELP = f"""📖**查询直播合订本**
`{PREFIX.DANMU}` 使用说明:
@@ -120,7 +120,7 @@ async def query_danmu(client: Client, message: Message, **kwargs):
caption += f"\n#️⃣{qtype}数: {count}"
caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
- html = convert_html(texts)
+ html = convert2html(texts)
if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=html, author=user, ttl="1d"):
caption += f"\n⚡️[即时预览]({telegraph_url})"
caption += blockquote(username_history)
src/history/query.py
@@ -21,7 +21,7 @@ from messages.sender import send2tg
from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
from others.emoji import MTYPE_EMOJI
from publish import publish_telegraph
-from utils import convert_html, myself, nowstr, slim_cid, strings_list, to_int
+from utils import convert2html, myself, nowstr, slim_cid, strings_list, to_int
HELP = f"""🗣**查询当前对话聊天记录**
`/hist` 使用说明:
@@ -119,7 +119,7 @@ async def query_chat_history(client: Client, message: Message, **kwargs):
if len(texts) < 1000000 and (
telegraph_url := await publish_telegraph(
title=f"【{chat_title}】{user}{match_time} {keyword}",
- html=convert_html(texts),
+ html=convert2html(texts),
author=user or chat_title,
ttl="1d",
)
src/others/download_external.py
@@ -14,7 +14,7 @@ from messages.utils import equal_prefix, get_reply_to, startswith_prefix
from multimedia import is_valid_video_or_audio, validate_img
from networking import download_file
from publish import publish_telegraph
-from utils import convert_html, convert_md, find_url, guess_mime, readable_size, to_int
+from utils import convert2html, convert2md, find_url, guess_mime, readable_size, to_int
HELP = f"""
⏬**下载文件**
@@ -80,7 +80,7 @@ async def download_url_in_message(client: Client, message: Message, extra_prefix
elif path.stat().st_size < MAX_FILE_BYTES:
await modify_progress(text=f"💾文件下载成功: {readable_size(path=path)}", force_update=True, **kwargs)
if suffix == ".html":
- html = convert_html(convert_md(path))
+ html = convert2html(convert2md(path=path))
if telegraph_url := await publish_telegraph(title="全文内容", html=html, author=info["full_name"], url=url):
caption += f"\n⚡️[即时预览]({telegraph_url})"
success = await client.send_document(target_chat, path.as_posix(), caption=caption, reply_parameters=reply_parameters)
src/podcast/asr.py
@@ -15,7 +15,7 @@ from podcast.utils import get_pubdate
from preview.bilibili import get_bilibili_vinfo
from preview.youtube import get_youtube_vinfo
from subtitles.base import fetch_subtitle
-from utils import convert_md, rand_string, readable_time, remove_consecutive_newlines, strings_list
+from utils import convert2md, rand_string, readable_time, remove_consecutive_newlines, strings_list
async def get_transcripts(
@@ -29,7 +29,7 @@ async def get_transcripts(
If the link of this entry has embedded subtitles (YouTube, Bilibili links), use it directly.
Otherwise, generate the transcript via ASR.
"""
- desc = convert_md(html=glom(entry, Coalesce("content.0.value", "summary"), default=""))
+ desc = convert2md(html=glom(entry, Coalesce("content.0.value", "summary"), default=""))
desc, _ = remove_img_tag(desc)
desc = remove_consecutive_newlines(desc, newline_level=2)
reference = f"本次转录稿为播客栏目《{feed_title}》的一期节目。\n该期节目标题: [{entry['title']}]({entry['link']})\n播出日期: {get_pubdate(entry):%Y-%m-%d}\n节目简介: {desc}"
src/podcast/xml.py
@@ -19,7 +19,7 @@ from database.r2 import set_cf_r2
from networking import hx_req
from podcast.utils import HEADERS, clean_feed_url, get_pubdate
from preview.youtube import get_youtube_channel_thumb
-from utils import bare_url, convert_html, https_url, nowdt
+from utils import bare_url, convert2html, https_url, nowdt
@cache.memoize(ttl=600)
@@ -111,7 +111,7 @@ async def update_xml_desc(feed_url: str, processed_xml: dict, entry: dict, summa
summary (str): AI summary
"""
original_desc = glom(entry, Coalesce("content.0.value", "summary"), default="")
- description = convert_html(summary) + "<p>----------------------------------</p>" + original_desc
+ description = convert2html(summary) + "<p>----------------------------------</p>" + original_desc
# try to find the item in feed_xml
feed_xml = await parse_feed(feed_url, raw_xml=True)
new_item = entry
src/preview/twitter.py
@@ -18,7 +18,7 @@ from messages.progress import modify_progress
from messages.sender import send2tg
from messages.utils import blockquote, remove_img_tag, summay_media
from networking import download_file, download_media, flatten_rediercts, hx_req
-from utils import convert_html, readable_count, remove_consecutive_newlines, remove_none_values, split_parts, true
+from utils import convert2html, readable_count, remove_consecutive_newlines, remove_none_values, split_parts, true
class APIError(Exception):
@@ -591,6 +591,6 @@ def parse_article(article: dict) -> dict:
"markdown": remove_consecutive_newlines(markdown).strip(),
"text": remove_consecutive_newlines(markdown_no_img).strip(),
"image_urls": image_urls,
- "html": convert_html(markdown),
+ "html": convert2html(markdown),
"media": {"all": [{"url": url, "type": "photo"} for url in image_urls]},
}
src/publish.py
@@ -21,7 +21,7 @@ from config import DB, DOWNLOAD_DIR, TOKEN, TZ
from database.r2 import set_cf_r2
from networking import download_file
from schema import AIPage, Section
-from utils import convert_html, convert_md, digest, nowdt, rand_string, remove_consecutive_newlines
+from utils import convert2html, convert2md, digest, nowdt, rand_string, remove_consecutive_newlines
def adjust_tags(s: str | None) -> str:
@@ -172,8 +172,8 @@ async def telegraph_aipage(page: AIPage, ttl: str | None = None) -> str:
# Description
if page.description:
- desc = convert_md(html=page.description)
- desc_html = convert_html(remove_consecutive_newlines(desc, newline_level=2))
+ desc = convert2md(html=page.description)
+ desc_html = convert2html(remove_consecutive_newlines(desc, newline_level=2))
desc_nodes = html_to_nodes(desc_html)
nodes.append({"tag": "h4", "children": ["📖原始简介"]})
nodes.extend(desc_nodes)
@@ -212,7 +212,7 @@ async def r2_aipage(page: AIPage, ttl: str | None = None, *, expand_transcript:
author_tag = f'<div class="header-author"><span class="header-author">{page.author}</span><span class="header-date"> | {tz_date:%Y-%m-%d %H:%M:%S}</span></div>' if page.author else ""
overview = glom(page, "summary.overview", default="")
if overview:
- overview = f'<div class="card summary"><div class="card-label" id="summary" >🤖AI导读</div>{convert_html(overview)}</div>'
+ overview = f'<div class="card summary"><div class="card-label" id="summary" >🤖AI导读</div>{convert2html(overview)}</div>'
sections: list[Section] = glom(page, "summary.sections", default=[])
@@ -224,7 +224,7 @@ async def r2_aipage(page: AIPage, ttl: str | None = None, *, expand_transcript:
desc_head = ""
if page.description:
sidebars += """<li><a href="#description" onclick="navClick(event)"><span class="sidebar-icon">📖</span><span class="sidebar-label">原始简介</span></a></li>"""
- desc_html = page.description if page.description.startswith("<") else convert_html(page.description)
+ desc_html = page.description if page.description.startswith("<") else convert2html(page.description)
desc_tag = f'<div class="card description"><div class="card-label" id="description">📖原始简介</div>{desc_html}</div>'
if page.description and overview:
desc_head = f"""<meta property="og:description" content="{glom(page, "summary.overview", default="")}">"""
@@ -239,7 +239,7 @@ async def r2_aipage(page: AIPage, ttl: str | None = None, *, expand_transcript:
sidebars += f'<span class="sidebar-time">{start}</span>'
sections_tag += f'<span class="section-time">{start}</span>'
sidebars += "</a></li>"
- sections_tag += f"</div>{convert_html(section.content)}</section>"
+ sections_tag += f"</div>{convert2html(section.content)}</section>"
transcripts = page.transcripts or []
if isinstance(transcripts, str):
src/utils.py
@@ -613,8 +613,8 @@ def cleanup_old_files(root: Path | str | None = None, duration: int = CLEAN_OLD_
path.unlink(missing_ok=True)
-def convert_md(path: str | Path | None = None, html: str | None = None) -> str:
- """Convert to markdown format."""
+def convert2md(*, html: str | None = None, path: str | Path | None = None) -> str:
+ """Convert html or local file to markdown format."""
md = MarkItDown()
if path is not None:
path = Path(path).expanduser().resolve()
@@ -631,13 +631,8 @@ def convert_md(path: str | Path | None = None, html: str | None = None) -> str:
return ""
-def convert_html(texts: str = "", path: str | Path | None = None) -> str:
- """Convert to markdown format."""
- if path is not None:
- path = Path(path).expanduser().resolve()
- if not path.is_file():
- return ""
- texts = read_text(path)
+def convert2html(texts: str = "") -> str:
+ """Convert texts to html format."""
texts = markdown.markdown(texts)
return texts.replace("\n", "<br>")