Commit cec3a0f

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-02-09 11:01:34
feat: add benny customization
1 parent fa4093b
.github/workflows/autobump.yml
@@ -1,31 +0,0 @@
----
-name: Audo bump dependencies
-
-on:
-  workflow_dispatch:
-  schedule:
-    - cron: 20 20 * * *
-
-jobs:
-  job:
-    if: ${{ github.ref == 'refs/heads/main' }}
-    runs-on: ubuntu-latest
-    steps:
-      - uses: actions/checkout@main
-      - uses: astral-sh/setup-uv@main
-      - name: Bump
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-        run: |-
-          set -e
-          git config --global user.email 49699333+dependabot[bot]@users.noreply.github.com
-          git config --global user.name "dependabot[bot]"
-          git config pull.rebase false
-          uv lock -U
-          git add uv.lock
-          git commit -m 'chore(deps): auto bump dependencies' || export NO_UPDATES=true
-          if [ "$NO_UPDATES" != "true" ] ; then
-          echo "push to main"
-          git push origin main
-          curl -X POST -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" https://api.github.com/repos/${{ github.repository }}/actions/workflows/baseimg.yml/dispatches -d '{"ref": "main"}'
-          fi
.github/workflows/docker.yml
@@ -31,6 +31,9 @@ jobs:
         with:
           fetch-depth: 0
 
+      # - name: Dump GitHub context
+      #   run: echo "${{ toJson(github) }}" || true
+
       - name: Docker meta
         id: meta
         uses: docker/metadata-action@master
@@ -65,7 +68,7 @@ jobs:
           build-args: |
             IMAGE_NAME=ghcr.io/${{ github.repository }}
             COMMIT_SHA=${{ github.sha }}
-            COMMIT_DATE=${{ github.event.head_commit.timestamp }}
+            COMMIT_DATE=${{ github.event.workflow_run.head_commit.timestamp || github.event.head_commit.timestamp }}
           platforms: linux/amd64
           push: true
           tags: ${{ steps.meta.outputs.tags }}
.github/dependabot.yml
@@ -1,26 +0,0 @@
----
-version: 2
-enable-beta-ecosystems: true
-
-updates:
-  - package-ecosystem: uv
-    directory: /
-    schedule:
-      interval: weekly
-    groups:
-      python-deps:
-        patterns:
-          - "*"
-
-  - package-ecosystem: docker
-    directory: /docker
-    schedule:
-      interval: monthly
-    groups:
-      docker-deps:
-        patterns:
-          - "*"
-    ignore:
-      - dependency-name: python
-        versions:
-          - ">= 3.14"
docker/cont-init.d/crontab
@@ -1,11 +1,21 @@
-#!/bin/bash
-mkdir -p /var/spool/cron/crontabs
+#!/command/with-contenv bash
 
-# restart bot service
-echo "30 22 * * * /command/s6-svc -r /run/service/bot" > /var/spool/cron/crontabs/root
-chmod 600 /var/spool/cron/crontabs/root
-chown root:crontab /var/spool/cron/crontabs/root
-chown root:crontab /var/spool/cron/crontabs
+if [[ "$ENABLE_AUTO_REBOOT" == "1" ]]; then
+    mkdir -p /var/spool/cron/crontabs
 
-# start cron
-/usr/sbin/cron
+    # restart bot service
+    REBOOT="${REBOOT_CRON:-"30 22 * * *"}"
+    echo "Set auto reboot via crontab: $REBOOT"
+    echo "$REBOOT /command/s6-svc -r /run/service/bot" > /var/spool/cron/crontabs/root
+    chmod 600 /var/spool/cron/crontabs/root
+    chown root:crontab /var/spool/cron/crontabs/root
+    chown root:crontab /var/spool/cron/crontabs
+
+    # start cron
+    /usr/sbin/cron
+fi
+
+
+if [ -f "/var/spool/cron/crontabs/root" ]; then
+    cat /var/spool/cron/crontabs/root
+fi
src/custom/ai_news.py
@@ -0,0 +1,415 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import json
+import tempfile
+from collections import defaultdict
+from contextlib import suppress
+from datetime import datetime, timedelta
+from pathlib import Path
+from zoneinfo import ZoneInfo
+
+import feedparser
+from feedgen.feed import FeedGenerator
+from glom import glom
+from loguru import logger
+from pyrogram.types import Chat, Message
+from pyrogram.types.messages_and_media.message import Str
+
+from ai.main import ai_text_generation
+from config import DB, PREFIX, TZ
+from database.d1 import query_d1
+from database.r2 import get_cf_r2, head_cf_r2, set_cf_r2
+from utils import convert_html, nowdt, rand_number
+
+DAILY_KEY = "AI-NEWS/daily.xml"
+WEEKLY_KEY = "AI-NEWS/weekly.xml"
+
+
+JSON_SCHEMA = {
+    "title": "List of News Summary",
+    "type": "array",
+    "items": {
+        "type": "object",
+        "title": "News",
+        "properties": {
+            "category": {"description": "News category", "enum": ["Model", "Tech", "Finance", "Policy", "Application", "Industry", "Others"], "title": "Category", "type": "string"},
+            "abstract": {"description": "A concise summary (50-100 words) in Chinese. Do NOT include URL strings here. Focus on facts and impact.", "title": "Abstract", "type": "string"},
+            "title": {"description": "A concise, objective Chinese title (max 20 chars). Remove clickbait words.", "title": "Title", "type": "string"},
+            "urls": {"description": "Select 1-2 most authoritative source URLs.", "items": {"type": "string"}, "title": "Urls", "type": "array"},
+        },
+        "required": ["category", "title", "abstract", "urls"],
+        "additionalProperties": False,
+    },
+}
+
+
+async def daily_ainews(num_days: int = 7):
+    """Get daily AI summary (num_days)."""
+    parsed = await parse_finished(DAILY_KEY)
+    if not parsed:
+        logger.warning("No Daily AI news parsed.")
+        return
+    now = nowdt(TZ)
+    begin = now - timedelta(days=num_days)
+    items = {}
+    for i in range(num_days):
+        day = begin + timedelta(days=i)
+        day_str = day.strftime("%Y-%m-%d")
+        if day_str in parsed:
+            continue
+        news_summary = await get_summary(day_str, day_str)
+        if not news_summary:
+            continue
+        await set_cf_r2(key=f"AI-NEWS/daily/{day_str}-summary.json", data=news_summary)  # save news summary
+        # publish the summary
+        title = f"AI日报 | {day_str}"
+        html = convert_news_summary_to_html(news_summary)
+        full_html = f'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>{title}</title><link rel="stylesheet" href="https://cdn.jsdmirror.cn/npm/water.css@2/out/water.css" integrity="sha512-GW7j11RXZmdio87hQsKNjomKEy/DwDjh6J2Z1JnI5Z4FNP791QfZP7Iut25vA+L+YJLZipI2BZhEpkvBkfr8cw==" crossorigin="anonymous"></head><body><h1>{title}</h1>{html}</body></html>'
+        html_key = f"AI-NEWS/daily/{day_str}.html"
+        await set_cf_r2(key=html_key, data=full_html, mime_type="text/html")
+        items[day_str] = {"title": title, "summary": html, "url": f"{DB.CF_R2_PUBLIC_URL}/{html_key}"}
+    if not items:
+        return
+    items |= parsed
+    fg = FeedGenerator()
+    fg.link(href=f"{DB.CF_R2_PUBLIC_URL}/{DAILY_KEY}", rel="self", type="application/rss+xml")
+    fg.title("AI News")
+    fg.language("zh-CN")
+    fg.category(term="AI")
+    fg.copyright("DNKT AI News")
+    fg.description("DNKT AI News")
+    fg.ttl(60)
+    for day_str, item in sorted(items.items()):
+        entry = fg.add_entry()
+        entry.title(item["title"])
+        entry.link(href=item["url"])
+        entry.guid(item["url"], permalink=True)
+        pub_dt = datetime.strptime(day_str, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=ZoneInfo(TZ))
+        entry.published(pub_dt)
+        entry.content(item["summary"], type="CDATA")
+
+    with tempfile.NamedTemporaryFile("w", suffix=".xml", delete=False) as tempf:
+        fg.rss_file(tempf.name, pretty=True)
+        # add rss beauty
+        xml_str = Path(tempf.name).read_text()
+        xml_str = xml_str.replace("<?xml version='1.0' encoding='UTF-8'?>", "<?xml version='1.0' encoding='UTF-8'?>\n<?xml-stylesheet type='text/xsl' href='/rss.xsl'?>")
+        await set_cf_r2(DAILY_KEY, xml_str, mime_type="application/xml", silent=True)
+
+
+async def get_summary(start: str, end: str) -> list[dict]:
+    """Get AI summary.
+
+    Args:
+        start (str): The start time in YYYY-MM-DD.
+        end (str): The end time in YYYY-MM-DD.
+
+    Returns:
+        list[dict]: list of news summary
+    """
+    start_dt = datetime.strptime(start, "%Y-%m-%d").replace(hour=0, minute=0, second=0, tzinfo=ZoneInfo(TZ))
+    end_dt = datetime.strptime(end, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=ZoneInfo(TZ))
+    start_ts = int(start_dt.timestamp())
+    end_ts = int(end_dt.timestamp())
+    d1 = await query_d1(sql=f"SELECT timestamp,title,url,summary FROM readhub WHERE timestamp >= {start_ts} AND timestamp <= {end_ts} ORDER BY timestamp ASC", db_name="dnkt", silent=True)
+    items = glom(d1, "result.0.results", default=[])  # From old to new
+    if not items:
+        return []
+    prompt = generate_ai_prompt(items)
+    # save sources to R2
+    if start == end:  # daily
+        await set_cf_r2(f"AI-NEWS/daily/{start}.json", items, silent=True)
+    ai_msg = Message(  # Construct a message for AI
+        id=rand_number(),
+        chat=Chat(id=rand_number()),
+        text=Str(f"{PREFIX.AI_TEXT_GENERATION} @ai-news {prompt}"),
+    )
+
+    ai_res = await ai_text_generation(
+        "fake-client",  # type: ignore
+        ai_msg,
+        openai_responses_config={
+            "instructions": daily_system_prompt(start),
+            "text": {
+                "format": {
+                    "type": "json_schema",
+                    "name": "NewsSummary",
+                    "strict": True,
+                    "description": "A list of news summary",
+                    "schema": JSON_SCHEMA,
+                }
+            },
+        },
+        gemini_generate_content_config={
+            "system_instruction": daily_system_prompt(start),
+            "responseMimeType": "application/json",
+            "responseJsonSchema": JSON_SCHEMA,
+        },
+        silent=True,
+    )
+    if not ai_res.get("texts", ""):
+        return []
+    return json.loads(ai_res.get("texts", ""))
+
+
+def convert_news_summary_to_html(news_summary: list[dict]) -> str:
+    """Convert news summary to html."""
+    emoji = {"Model": "🤖", "Tech": "🛠️", "Finance": "💰", "Policy": "📜", "Application": "💡", "Industry": "🏢", "Others": "✨"}
+    category_order = ["Model", "Tech", "Application", "Industry", "Finance", "Policy", "Others"]
+    category_map = {
+        "Model": "模型 (Models)",
+        "Tech": "技术 (Technologies)",
+        "Application": "应用 (Applications)",
+        "Industry": "行业动态 (Industry)",
+        "Finance": "投融资 (Finance)",
+        "Policy": "政策 (Policy)",
+        "Others": "其他 (Others)",
+    }
+
+    # Group by category
+    grouped = defaultdict(list)
+    for item in news_summary:
+        category = item.get("category", "Others")
+        if category not in category_order:
+            continue
+        grouped[category].append(item)
+
+    html = ""
+    for category in category_order:
+        items = grouped[category]
+        if not items:
+            continue
+        html += f"<h2>{emoji[category]} {category_map[category]}</h2>"
+        for idx, item in enumerate(items):
+            title = item["title"]
+            summary = item["abstract"]
+            if len(item["urls"]) == 1:
+                html += f'<p><strong>{idx + 1}. <a href="{item["urls"][0]}">{title}</a></strong><br>{summary}</p>'
+            else:
+                urls = ""
+                for uidx, url in enumerate(item["urls"]):
+                    urls += f'<a href="{url}">[link-{uidx + 1}]</a>,'
+                urls = urls.rstrip(",")
+                html += f"<p><strong>{idx + 1}. {title}</strong> {urls}<br>{summary}</p>"
+    return html
+
+
+async def parse_finished(key: str = DAILY_KEY) -> dict:
+    parsed = {}
+    with suppress(Exception):
+        feed_url = f"{DB.CF_R2_PUBLIC_URL.rstrip('/')}/{key}"
+        feed = feedparser.parse(feed_url)
+        for item in glom(feed, "entries", default=[]):
+            title = item["title"]
+            summary = item["summary"]
+            url = item["link"]
+            date = title[-10:]  # YYYY-MM-DD
+            parsed[date] = {"title": title, "summary": summary, "url": url}
+    return parsed
+
+
+def generate_ai_prompt(items: list[dict]) -> str:
+    """Generate AI prompt in json format."""
+    prompt = []
+    for idx, item in enumerate(items):  # noqa: B007
+        title = item["title"]
+        summary = item["summary"]
+        url = item["url"].removeprefix("?f=rss")
+        prompt.append({"title": title, "summary": summary, "url": url})
+    return json.dumps(prompt, ensure_ascii=False)
+
+
+def daily_system_prompt(day: str) -> str:
+    prompt = f"""# Role
+你是一位资深的人工智能行业分析师和主编。你具备敏锐的新闻嗅觉,擅长从海量、碎片化的源数据中识别关键信息,并将其转化为结构化、高价值的行业简报。
+
+# Context
+当前日期:{day}
+输入数据:一份包含本日 AI 相关新闻的 JSON 列表(包含标题 title、摘要 summary、链接 url)。
+输出目标:符合 JSON Schema 定义的清洗后的新闻列表。
+
+# Workflow & Guidelines (必须严格执行)
+
+## 1. 深度清洗与去重 (核心任务)
+   - **语义聚合**:不要仅仅比对标题。如果多条新闻报道了同一个核心事件(例如 "OpenAI 发布 Sora" 和 "Sora 正式官宣"),必须将其合并为一个条目。
+   - **去除噪音**:剔除毫无实质内容的标题党、重复的营销软文、极其边缘的小道消息。
+   - **选取最佳来源**:在合并时,保留 1-2 个最具信息量的 URL 放入 `urls` 字段。
+
+## 2. 智能分类 (Category)
+   请根据新闻核心内容,严格按以下标准分类:
+   - **Model**: 大模型发布、权重开源、算法更新 (如 GPT-5, Llama 3)。
+   - **Tech**: 具体的 AI 技术突破、学术论文、RAG/Agent 等技术架构 (非具体模型)。
+   - **Application**: AI 产品落地、新功能上线、具体场景应用 (如 Copilot 更新)。
+   - **Finance**: 融资、收购、上市、财报、股价大幅波动。
+   - **Industry**: 行业大事件、人事变动、监管与法律诉讼、算力芯片 (Nvidia/AMD)。
+   - **Policy**: 政府法规、AI 安全倡议、白皮书。
+   - **Others**: 无法归类的内容。
+
+## 3. 摘要撰写规范 (Abstract)
+   - **禁止包含链接**:摘要文本中**绝对不要**包含 http 链接,链接仅需放入 JSON 的 `urls` 列表即可。
+   - **客观陈述**:使用第三方新闻语调。例如:"OpenAI 宣布推出..." 而不是 "我们很高兴地宣布..."。
+   - **信息密度**:50-100 字。涵盖 "Who (谁) + What (做了什么) + Impact (有什么关键影响/参数)"。
+   - **中文输出**:即使输入是英文,摘要和标题也必须是流畅的中文。
+
+## 4. 标题重写 (Title)
+   - 必须简练(20字以内)。
+   - 去除 "突发"、"重磅" 等修饰词,直接陈述事实。
+
+请处理输入数据,并生成符合 Schema 的 JSON 输出。
+"""  # noqa: RUF001
+    return prompt.strip()
+
+
+def get_last_week() -> tuple[datetime, datetime]:
+    """Get last monday and sunday's date."""
+    today = nowdt(TZ)
+    this_monday = today - timedelta(days=today.weekday())
+    previous_monday = this_monday - timedelta(days=7)
+    previous_sunday = this_monday - timedelta(days=1)
+    return previous_monday.replace(hour=0, minute=0, second=0, tzinfo=ZoneInfo(TZ)), previous_sunday.replace(hour=23, minute=59, second=59, tzinfo=ZoneInfo(TZ))
+
+
+async def weekly_ainews():
+    last_monday, last_sunday = get_last_week()
+    html_key = f"AI-NEWS/weekly/{last_monday:%Y-%m-%d}~{last_sunday:%Y-%m-%d}.html"
+    if await head_cf_r2(html_key):
+        return
+
+    parsed = await parse_finished(WEEKLY_KEY)
+    # if not parsed:
+    #     logger.warning("No Weekly AI news parsed.")
+    #     return
+
+    # gather news
+    news = []
+    for i in range(7):
+        day = last_monday + timedelta(days=i)
+        list_news: list[dict] = await get_cf_r2(key=f"AI-NEWS/daily/{day:%Y-%m-%d}-summary.json", silent=True)  # ty:ignore[invalid-assignment]
+        news.extend(list_news)
+    for x in news:
+        x.pop("category", None)
+
+    ai_msg = Message(  # Construct a message for AI
+        id=rand_number(),
+        chat=Chat(id=rand_number()),
+        text=Str(f"{PREFIX.AI_TEXT_GENERATION} @doubao {json.dumps(news, ensure_ascii=False)}"),
+    )
+    ai_res = await ai_text_generation(
+        "fake-client",  # type: ignore
+        ai_msg,
+        openai_responses_config={"instructions": weekly_system_prompt()},
+        gemini_generate_content_config={"system_instruction": weekly_system_prompt()},
+        silent=True,
+    )
+    weekly_report = ai_res.get("texts", "")
+    if not weekly_report:
+        return
+    title = f"AI周报 | {last_monday:%Y-%m-%d}~{last_sunday:%Y-%m-%d}"
+    html = convert_html(weekly_report)
+    full_html = f'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>{title}</title><link rel="stylesheet" href="https://cdn.jsdmirror.cn/npm/water.css@2/out/water.css" integrity="sha512-GW7j11RXZmdio87hQsKNjomKEy/DwDjh6J2Z1JnI5Z4FNP791QfZP7Iut25vA+L+YJLZipI2BZhEpkvBkfr8cw==" crossorigin="anonymous"></head><body><h1>{title}</h1>{html}</body></html>'
+    await set_cf_r2(key=html_key, data=full_html, mime_type="text/html")
+
+    parsed[f"{last_sunday:%Y-%m-%d}"] = {"title": f"AI周报 | {last_monday:%Y-%m-%d}~{last_sunday:%Y-%m-%d}", "summary": html, "url": f"{DB.CF_R2_PUBLIC_URL}/{html_key}"}
+    fg = FeedGenerator()
+    fg.link(href=f"{DB.CF_R2_PUBLIC_URL}/{DAILY_KEY}", rel="self", type="application/rss+xml")
+    fg.title("AI周报")
+    fg.language("zh-CN")
+    fg.category(term="AI")
+    fg.copyright("DNKT AI News")
+    fg.description("DNKT AI News 周报")
+    fg.ttl(480)
+    for sunday, item in sorted(parsed.items()):
+        entry = fg.add_entry()
+        entry.title(item["title"])
+        entry.link(href=item["url"])
+        entry.guid(item["url"], permalink=True)
+        pub_dt = datetime.strptime(sunday, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=ZoneInfo(TZ))
+        entry.published(pub_dt)
+        entry.content(item["summary"], type="CDATA")
+
+    with tempfile.NamedTemporaryFile("w", suffix=".xml", delete=False) as tempf:
+        fg.rss_file(tempf.name, pretty=True)
+        # add rss beauty
+        xml_str = Path(tempf.name).read_text()
+        xml_str = xml_str.replace("<?xml version='1.0' encoding='UTF-8'?>", "<?xml version='1.0' encoding='UTF-8'?>\n<?xml-stylesheet type='text/xsl' href='/rss.xsl'?>")
+        await set_cf_r2(WEEKLY_KEY, xml_str, mime_type="application/xml", silent=True)
+
+
+def weekly_system_prompt() -> str:
+    prompt = """# Role Definition
+你是一名为顶级汽车软件公司服务的**首席AI技术战略官**,精通 **QNX、Linux (AGL)、Android** 混合架构,并对**智能座舱 HMI(仪表/中控)**开发有深刻理解。你的核心能力结合了对 AI 前沿技术(AGI、LLM、Multimodal)的敏锐嗅觉。
+
+# Context & Goal
+用户是服务于丰田系汽车的软件开发工程师,主要负责**仪表盘 (Instrument Cluster)** 和 **中控屏 (IVI)** 的软件开发。
+你需要根据用户提供的 JSON 数据(包含 title, summary, urls),撰写一份具有深度的《AI行业周报》。
+你的目标不是简单的“新闻搬运”,而是“情报分析”。你需要帮助用户回答两个核心问题:
+1. 本周 AI 行业发生了什么可能会改变未来的大事?
+2. 这些技术如何具体落地到我们的汽车软件代码、架构或用户体验中?
+
+# Capabilities & Constraints
+由于你具备**联网搜索能力**,请严格遵守以下工作流:
+
+## Phase 1: Data Ingestion & Enrichment (数据摄入与增强)
+1. **去重与聚类**:阅读输入的 JSON 数据。识别报道同一事件的多条新闻(例如多家媒体报道同一个模型发布),将其聚合为一个条目。
+2. **主动搜索 (关键步骤)**:
+   - 如果 JSON 中的 `summary` 信息量不足,或者该新闻通过标题判断极其重要(Tier 1级别),**请务必使用搜索工具搜索该新闻的最新深度解读或技术白皮书**。
+   - 不要仅依赖提供的 JSON,要确保你的分析是基于最准确、最全面的技术细节(例如:模型参数量、推理延迟数据、上下文长度等)。
+
+## Phase 2: Strategic Filtering (战略筛选)
+请将新闻分为以下三类,并按此优先级排序:
+
+- **Tier 1: 行业范式转移 (Paradigm Shifts)**
+  - *标准*:基座模型的重大迭代(如 GPT-5, Gemini 1.5 Ultra)、颠覆性的新架构(如 SSM, Mamba 变体)、关键开源项目、或可能重塑行业的政策/商业动态。
+  - *处理*:即使与汽车无关,也**必须收录**。这是周报的核心价值。
+
+- **Tier 2: 车载软件强相关 (Automotive Relevant)**
+  - *标准*:直接涉及智能座舱、语音交互、端侧推理 (On-device AI)、RAG 在边缘端的应用、AI 辅助编程 (Copilot for Devs)、实时渲染等。
+  - *处理*:重点分析落地可行性。
+
+- **Tier 3: 噪音过滤 (Drop)**
+  - *标准*:单纯的股价波动、无技术细节的营销通稿、重复的低质量内容。
+  - *处理*:直接丢弃。
+
+## Phase 3: Analysis & Writing (分析与撰写)
+在撰写每条新闻时,必须包含 **[深度技术解析]** 和 **[车机软件启示]**。
+
+**关于 [车机软件启示] 的特殊指令:**
+- **禁止**说空话(如“这将提升用户体验”)。
+- **必须**联系具体的软件开发场景。请思考以下维度:
+  - **性能与架构**:是否涉及 NPU 算力占用?模型量化后是否能在高通/英伟达车规芯片上跑通?
+  - **交互创新**:是否支持多模态(手势+语音)?能否用于生成动态 UI?
+  - **开发效率**:该工具能否帮助车企软件团队自动生成单元测试或重构代码?
+  - **隐私与安全**:端侧处理对用户隐私意味着什么?
+
+# Output Format (Markdown)
+
+## 🚨 行业头条 (Headline Events)
+*(针对 Tier 1 新闻,深入剖析)*
+
+### [1. 新闻标题]
+> **核心事实**:[融合了 JSON 信息和你联网搜索补充的关键技术参数]
+> **行业震级**:⭐⭐⭐⭐⭐ (简述为什么这是行业拐点)
+> **对车机软件的降维打击**:[发散思维,例如:*“虽然这是云端大模型,但其蒸馏版本可能在明年进入车机,改变现有的语音助手架构...”*]
+
+---
+
+## 🚘 智能座舱与工程应用 (Cockpit & Engineering)
+*(针对 Tier 2 新闻,侧重落地)*
+
+### [2. 新闻标题]
+- **技术摘要**:[简述]
+- **落地分析 (Actionable Insight)**:
+  - *场景*:[具体场景,如:导航搜索、儿童模式生成、DMS疲劳监测]
+  - *挑战*:[提及延迟、功耗或内存限制]
+
+---
+
+## 🔍 速览 (Quick Bites)
+*(值得注意但篇幅较短的更新)*
+- **[标题]**:一句话核心观点。🔗[Link]
+
+---
+
+## 📝 本周总结 (Weekly Synthesis)
+[用一段话总结本周趋势。直接对作为 AI 工程师的用户喊话,给出下周关注的技术栈建议。]
+"""  # noqa: RUF001
+    return prompt.strip()
src/custom/config.py
@@ -0,0 +1,25 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
+
+ACCOUNT_NAME = os.getenv("ACCOUNT_NAME", "benny")
+
+GROUP_DEV = int(os.getenv("GROUP_DEV", "-1002434113592"))
+GROUP_67373 = -1001744444199
+GROUP_JSQ = -1001852924912
+
+CHANNEL_CYF_BAK = int(os.getenv("CHANNEL_CYF_BAK", "-1001778084768"))
+CHANNEL_LILAOSHI_OFFICIAL = -1002155057858
+CHANNEL_LILAOSHI_PREVIEWED = int(os.getenv("CHANNEL_LILAOSHI_PREVIEWED", "-1002416354272"))
+CHANNEL_EMBY_SEARCH = int(os.getenv("CHANNEL_EMBY_SEARCH", "-1002446769616"))
+CHANNEL_FAVORITE = -1001755578122
+CHANNEL_PODCAST = -1002436072788
+CHANNEL_YOUTUBE = -1001654454492
+
+USER_BENNY = 111039943
+USER_XIAOHAO = 1070754043
+USER_CYF = 1639998668
+
+ALLOW_USERS_TO_CALL_RESTART = os.getenv("ALLOW_USERS_TO_CALL_RESTART", "1070754043,111039943")
+SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS = int(os.getenv("SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS", "48"))
+SYNC_YOUTUBE_FREQUENCY_SECONDS = int(os.getenv("SYNC_YOUTUBE_FREQUENCY_SECONDS", "300"))
src/custom/cyf_greeting.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import random
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import TZ, cache
+from custom.config import GROUP_67373, USER_CYF
+from utils import nowdt, readable_time
+
+
+async def cyf_greeting(client: Client, message: Message):
+    if message.chat.id != GROUP_67373:
+        return
+    cyf_last_dt = await get_cyf_last_message_dt(client, message)
+    if message.from_user.id == USER_CYF:
+        delta = (nowdt(TZ) - cyf_last_dt).total_seconds()
+        if delta > 6 * 3600:
+            logger.success("Send CYF Sticker!")
+            await client.send_sticker(chat_id=GROUP_67373, sticker="CAACAgIAAxkBAAKG3Gfmdspcxdb0VqSS5Zhs-tX-cLTeAAI1AQACjKo0A4kY1NyS3VKIHgQ")  # 发
+            stickers = [
+                "CAACAgIAAxkBAAKG3Gfmdspcxdb0VqSS5Zhs-tX-cLTeAAI1AQACjKo0A4kY1NyS3VKIHgQ",  # 麻将Fa
+                "CAACAgUAAxkBAAENSJZnfPgfxTgZDNOKW5sNl1p0iXKceQACowsAAs4nCFWXVnARM_crvR4E",  # fafa i love you
+                "CAACAgUAAxkDAAENY9Fnq1cFOZODLp5LmgYETBy_IVmrBwACPAoAArIAAfBUuI2fqU5ZVNEeBA",
+                "CAACAgUAAxkDAAENY9Vnq1eAod-PjwgopOtfLcxFpTLyhwACjg4AAk3YsVTlfzct_olxjR4E",
+                "CAACAgUAAxkDAAENY9Znq1gvvAhN9eOwFVxeGnCn3DbPGwACgQ0AAtVI-FTbCC5TkOSsHB4E",
+                "CAACAgUAAxkDAAENY9dnq1hHhfGXySrioHSf3rc0jPzstgACfgoAAnoy-VSHSpEED_QmBx4E",
+            ]
+            random.choice(stickers)
+            # sticker = random.choice(stickers)
+            # await client.send_sticker(chat_id=GROUP_67373, sticker=sticker)
+        else:
+            logger.trace(f"CYF is active {readable_time(delta)} ago")
+
+
+async def get_cyf_last_message_dt(client: Client, message: Message) -> datetime:
+    """Get the last message of CYF.
+
+    Pyrogram has a bug of the message date.
+    The returned message.date is timezone awareness, but the tzinfo is always set to UTC.
+    So we need to set the correct timezone info.
+    """
+    now = nowdt(TZ)
+    if last_dt := cache.get("cyf_last_message_dt"):
+        if message.from_user.id == USER_CYF:
+            logger.success(f"Record CYF last message: {now:%Y-%m-%d %H:%M:%S}")
+            cache.set("cyf_last_message_dt", now, ttl=0)
+        return last_dt
+    try:
+        async for msg in client.search_messages(chat_id=GROUP_67373, from_user=USER_CYF, limit=1):  # type: ignore
+            mdate = msg.date.astimezone(ZoneInfo(TZ))
+            delta = (now - mdate).total_seconds()
+            logger.success(f"Retrieval CYF last message: {mdate:%Y-%m-%d %H:%M:%S} ({readable_time(delta)})")
+            cache.set("cyf_last_message_dt", mdate, ttl=0)
+            return msg.date
+    except Exception as e:
+        logger.error(e)
+
+    cache.set("cyf_last_message_dt", now, ttl=0)
+    return now
src/custom/cyf_quote.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from glom import glom
+from pyrogram.client import Client
+from pyrogram.types import Message
+from pyrogram.types.messages_and_media.message import Str
+
+from config import DEVICE_NAME, PREFIX
+from custom.config import CHANNEL_FAVORITE, GROUP_67373, GROUP_DEV, GROUP_JSQ, USER_BENNY, USER_CYF, USER_XIAOHAO
+from messages.utils import startswith_prefix
+from quotly.quotly import quote_message
+
+
+async def fafa_quote(client: Client, message: Message, **kwargs):
+    """模拟发姐发言sticker: 发送 `/fa + 文字`."""
+    cid = message.chat.id
+    if (
+        cid not in [GROUP_DEV, CHANNEL_FAVORITE, USER_BENNY, USER_XIAOHAO, GROUP_67373, GROUP_JSQ]
+        or startswith_prefix(message.content, prefix="/fafa,/fav")  # 忽略这些
+        or not startswith_prefix(message.content, prefix="/fa")
+    ):
+        return
+    # VPS上的账号不响应 开发Group 的消息
+    if cid == GROUP_DEV and DEVICE_NAME in ["BennyBot-JP", "BennyBot-US", "BennyBot-CN"]:
+        return
+    uid = glom(message, "from_user.id", default=0) or 0
+    if uid not in [USER_BENNY, USER_XIAOHAO]:  # 只允许我自己使用
+        return
+    text = message.content.removeprefix("/fa").strip()
+    text = f"{PREFIX.QUOTLY} cid={GROUP_67373} uid={USER_CYF} {text}"
+    message.text = Str(text)
+    await quote_message(client, message, **kwargs)
src/custom/cyf_rss_monitor.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.enums import MessageEntityType
+from pyrogram.types import Chat, Message
+
+from messages.main import process_message
+from utils import rand_number
+
+
+async def chenyifa_social_rss(client: Client, message: Message):
+    """Forward RSS from 67373 channel to my channel."""
+    if message.chat.id != -1001957128386:
+        return
+    # media link is in caption_entities
+    logger.trace(message)
+    url = ""
+    if entities := message.caption_entities:
+        for entity in entities:
+            if entity.type == MessageEntityType.TEXT_LINK:
+                logger.debug(f"CYF social media RSS: {entity.url}")
+                url = entity.url
+                break
+    if entities := message.entities:
+        for entity in entities:
+            if entity.type == MessageEntityType.TEXT_LINK:
+                logger.debug(f"CYF social media RSS: {entity.url}")
+                url = entity.url
+                break
+    if not url:
+        return
+    logger.success(f"Forwarding RSS: {url}")
+    message = Message(id=rand_number(), chat=Chat(id=0), text=url)  # type: ignore
+    await process_message(
+        client,
+        message,
+        target_chat=-1001433673794,
+        reply_msg_id=-1,
+        need_prefix=False,
+        show_progress=False,
+        douyin=False,
+        tiktok=False,
+        instagram=False,
+        weibo=False,
+        xhs=False,
+        ytdlp=False,
+        douyin_comments_provider=False,
+        weibo_comments_provider=False,
+        # twitter_comments_provider=False,
+        twitter_provider="tikhub-fxtwitter",
+        twitter_comments_provider="tikhub",
+        instagram_comments_provider=False,
+    )
src/custom/cyf_twitter_rss.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Chat, Message
+from pyrogram.types.messages_and_media.message import Str
+
+from config import PROXY, cache
+from database.r2 import get_cf_r2, set_cf_r2
+from messages.main import process_message
+from networking import hx_req
+from utils import rand_number
+
+
+async def fafa_twitter_rss(client: Client):
+    """发姐推特RSS."""
+    if cache.get("fafa_twitter_rss"):
+        return
+    cache.set("fafa_twitter_rss", 1, ttl=600)
+    api = "https://api.fxtwitter.com/2/profile/yifaer_chen/statuses"
+    resp = await hx_req(
+        api,
+        proxy=PROXY.TWITTER,
+        timeout=10,
+        check_kv={"code": 200},
+        check_keys=["results"],
+        silent=True,
+    )
+    if error := resp.get("hx_error"):
+        logger.error(error)
+        return
+    posts = sorted(resp["results"], key=lambda x: x["created_timestamp"], reverse=True)[:5]  # latest 5 tweets
+    for tweet in posts[::-1]:  # old to new
+        if not tweet.get("id"):
+            continue
+        post_id = tweet["id"]
+        url = f"https://x.com/yifaer_chen/status/{post_id}"
+        # check if in cache
+        if cache.has(f"fafa-x-{post_id}"):
+            continue
+        # check if in database
+        if await get_cf_r2(key=f"RSS/陈一发儿/x.com/yifaer_chen/status/{post_id}", silent=True):
+            cache.set(f"fafa-x-{post_id}", 1)
+            continue
+        message = Message(id=rand_number(), chat=Chat(id=rand_number()), text=Str(url))
+        await process_message(
+            client,
+            message,
+            target_chat=-1001433673794,
+            reply_msg_id=-1,
+            need_prefix=False,
+            show_progress=False,
+            ytdlp=False,
+            douyin=False,
+            tiktok=False,
+            instagram=False,
+            weibo=False,
+            twitter_comments_provider=False,
+            show_statistics=False,
+            twitter_provider="fxtwitter-vxtwitter",
+        )
+        cache.set(f"fafa-x-{post_id}", 1)
+        await set_cf_r2(key=f"RSS/陈一发儿/x.com/yifaer_chen/status/{post_id}", data={"finished": "1"})
src/custom/d1_daily_backup_msg.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+
+from config import HISTORY
+from database.d1 import query_d1
+from history.d1 import backup_chat_history_to_d1
+from utils import nowdt, strings_list, to_int
+
+
+async def daily_backup_history_to_d1(client: Client) -> None:
+    """由于D1每日写入有次数限制, 每天晚上最后10分钟备份所有对话的数据.
+
+    start_from 设置为 "oldest" 时, 从数据库中最旧的记录往前备份.
+    """
+    chats = os.getenv("BACKUP_CHAT_HISTORY_DAILY_CHATS", "disabled")
+    hour = int(os.getenv("BACKUP_CHAT_HISTORY_DAILY_HOUR", "23"))
+    minute_start = int(os.getenv("BACKUP_CHAT_HISTORY_DAILY_MINUTE_START", "50"))
+    minute_end = int(os.getenv("BACKUP_CHAT_HISTORY_DAILY_MINUTE_END", "59"))
+    if chats == "disabled":
+        return
+
+    def is_sync_time() -> bool:
+        now = nowdt("UTC")
+        return now.hour == hour and minute_start <= now.minute <= minute_end
+
+    if not HISTORY.D1_ENABLE:
+        return
+    if not is_sync_time():
+        return
+    if chats == "full_table":
+        resp = await query_d1("SELECT * FROM 'chatinfo';", db_name=HISTORY.D1_DATABASE, silent=True)
+        tables = glom(resp, "result.0.results", default=[])
+        cids = [x["chandle"] or int(x["cid"]) for x in tables]
+    else:
+        cids = [to_int(x) for x in strings_list(chats)]
+
+    for cid in cids:
+        if not is_sync_time():  # 每个会话都检查一遍时间
+            break
+        logger.info(f"Backup chat history to D1: {cid}")
+        await backup_chat_history_to_d1(client, cid, hours=99999, start_from="oldest", max_sync=1000)  # 每个会话最大同步1000条
src/custom/del_msg.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import contextlib
+
+from pyrogram.client import Client
+from pyrogram.enums import MessageServiceType
+from pyrogram.types import Message
+
+from messages.utils import delete_message
+
+
+async def del_unwanted_message(client: Client, message: Message):
+    if message.service == MessageServiceType.MESSAGE_AUTO_DELETE_TIMER_CHANGED:
+        await delete_message(message)  # Delete the timer change message
+    if message.content.strip() in ["/del", "/d", "del"]:
+        await delete_message(message)
+        if message.reply_to_message:
+            reply_msg = message.reply_to_message
+            with contextlib.suppress(Exception):
+                if reply_msg.media_group_id:
+                    messages = await client.get_media_group(reply_msg.chat.id, reply_msg.id)
+                    [await delete_message(msg) for msg in messages]
+                else:
+                    await delete_message(reply_msg)
src/custom/dnkt_attendance.py
@@ -0,0 +1,340 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import asyncio
+import os
+from collections import Counter
+from pathlib import Path
+
+import feedparser
+from glom import flatten, glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.errors import FloodWait
+
+from config import cutter
+from custom.rss import HEADERS
+from database.d1 import insert_d1, query_d1
+from database.turso import insert_statement, turso_exec, turso_parse_resp
+from history.utils import TURSO_KWARGS
+from networking import hx_req
+
+
+async def dnkt_attendance(client: Client):
+    """DNKT related."""
+    if os.getenv("DNKT_ATTENDANCE_DISABLED", "0") == "1":
+        return
+    remote_content = await hx_req("https://dl.zydou.me/d/checkin.atom", rformat="text", headers=HEADERS, timeout=60, max_retry=0, silent=True)
+    if not remote_content.get("text"):
+        return
+    parsed = feedparser.parse(remote_content["text"])  # do not parse feed url, because it doesn't support timeout.
+    for entry in await get_new_entries(parsed["entries"]):  # old to latest
+        if not entry["texts"]:
+            continue
+        logger.info(entry["标题"])
+        await sync_dnkt_checkin_to_d1(entry)
+        await sync_dnkt_checkin_to_turso(entry)
+
+        tids = {"补卡": 967, "请假": 961, "调休": 965, "加班": 963}
+        await send_with_tid(client, entry["texts"], tids.get(entry["类型"], 1))
+
+
+async def send_with_tid(client: Client, texts: str, tid: int):
+    try:
+        await client.send_message(
+            chat_id=-1002820954593,
+            text=texts,
+            disable_notification=True,
+            message_thread_id=tid,
+        )
+        await asyncio.sleep(3)
+    except FloodWait as e:
+        logger.warning(e)
+        await asyncio.sleep(e.value)  # type: ignore
+        return await send_with_tid(client, texts, tid)
+
+
+async def get_new_entries(entries: list[dict]) -> list[dict]:
+    """Get new entries from feed."""
+    max_processed_id = await get_max_processed_id()
+    try:
+        new_entries = []
+        for x in entries:
+            guid = int(x["id"])
+            if guid > max_processed_id:
+                entry = await gen_caption(x["contributors"], link=x["link"])
+                entry["db_key"] = f"TTL/30d/DNKT/考勤/{guid}"
+                new_entries.append(entry)
+    except Exception as e:
+        logger.error(f"Failed to get new entries: {e}")
+        new_entries = []
+    return sorted(new_entries, key=lambda x: int(x["id"]), reverse=False)
+
+
+async def get_max_processed_id() -> int:
+    resp = await query_d1(sql="SELECT id FROM 考勤管理 ORDER BY id DESC LIMIT 1", db_name="dnkt", silent=True)
+    max_id = int(glom(resp, "result.0.results.0.id", default=0))
+    if not max_id:
+        max_id = 0
+        params = TURSO_KWARGS | {"db_name": "dnkt"}
+        resp = await turso_exec(
+            [
+                {
+                    "type": "execute",
+                    "stmt": {"sql": "SELECT id FROM 考勤管理 ORDER BY id DESC LIMIT 1"},
+                }
+            ],
+            retry=2,
+            silent=True,
+            **params,
+        )
+        max_id = int(glom(resp, "results.0.response.result.rows.0.0.value", default=0))
+    return max_id
+
+
+async def gen_caption(info_list: list[dict], link: str) -> dict:
+    info = {
+        "id": Path(link).stem,
+        "类型": "",
+        "姓名": "",
+        "部门": "",
+        "审批人": "",
+        "日期": "",
+        "原因": "",
+        "创建日期": "",
+        "开始时间": "",
+        "结束时间": "",
+        "补卡时间": "",
+        "标题": "",
+    }  # employee info
+    info |= {x["href"]: x["name"] for x in info_list}  # merge employee info and checkin info
+
+    # guess department
+    if not info["部门"]:
+        department = await guess_department_from_d1(info["姓名"])
+        if not department:
+            department = await guess_department_from_turso(info["姓名"])
+        info["部门"] = department
+
+    texts = ""
+    category = info["类型"]
+    if "调休" in str(info):
+        category = "调休"
+        info["类型"] = "调休"
+    category_emoji = {"补卡": "💳", "请假": "❌", "调休": "🔄", "加班": "📚"}.get(category, "")
+    if info["部门"]:
+        texts += f"{category_emoji}{category}: #{info['部门']}"
+    else:
+        texts += f"{category_emoji}{category}"
+    if info["姓名"]:
+        texts += f"\n👤姓名: #{info['姓名']}"
+    if info["审批人"]:
+        texts += f"\n✏️审批: {info['审批人']}"
+    if info["日期"]:
+        texts += f"\n📅日期: [{info['日期']}]({link})"
+    texts += "\n🕒时间: "
+    if info["开始时间"]:
+        texts += f"{info['开始时间']}"
+    if info["补卡时间"] and not info["开始时间"] and not info["结束时间"]:
+        texts += f"{info['补卡时间']}"
+    if info["结束时间"]:
+        texts += f" - {info['结束时间']}"
+    if info["原因"]:
+        texts += f"\n📋原因: {info['原因']}"
+    info["texts"] = texts.strip()
+    return info
+
+
+async def sync_dnkt_checkin_to_d1(entry: dict) -> None:
+    """Sync checkin to D1 database."""
+    # from database.d1 import create_d1_table
+    # columns = "id INTEGER PRIMARY KEY, 类型 TEXT, 姓名 TEXT, 部门 TEXT, 审批人 TEXT, 日期 TEXT, 原因 TEXT, 开始时间 TEXT, 结束时间 TEXT, 补卡时间 TEXT, 创建日期 TEXT, 标题 TEXT, search_column TEXT"
+    # indexes = ["类型", "姓名", "部门", "审批人", "日期"]
+    # await create_d1_table(
+    #     "考勤管理",
+    #     columns,
+    #     idx_cols=indexes,
+    #     fts_on_col="id",
+    #     fts_index_col="search_column",
+    #     db_name="dnkt",
+    #     silent=True,
+    # )
+    records = {
+        "id": int(entry["id"]),
+        "类型": entry["类型"],
+        "姓名": entry["姓名"],
+        "部门": entry["部门"],
+        "审批人": entry["审批人"],
+        "日期": entry["日期"],
+        "原因": entry["原因"],
+        "创建日期": entry["创建日期"],
+        "开始时间": entry["开始时间"],
+        "结束时间": entry["结束时间"],
+        "补卡时间": entry["补卡时间"],
+        "标题": entry["标题"],
+        "search_column": " ".join(cutter.cutword(entry["标题"] + "\n" + entry["原因"])),
+    }
+    await query_d1(**insert_d1("考勤管理", records, update_on_conflict="id"), db_name="dnkt", silent=True)
+
+
+async def sync_dnkt_checkin_to_turso(entry: dict) -> None:
+    """Sync checkin to Turso database."""
+    # from database.turso import turso_create_table
+    # columns = "id INTEGER PRIMARY KEY, 类型 TEXT, 姓名 TEXT, 部门 TEXT, 审批人 TEXT, 日期 TEXT, 原因 TEXT, 开始时间 TEXT, 结束时间 TEXT, 补卡时间 TEXT, 创建日期 TEXT, 标题 TEXT, search_column TEXT"
+    # indexes = ["类型", "姓名", "部门", "审批人", "日期"]
+    params = TURSO_KWARGS | {"db_name": "dnkt"}
+    # await turso_create_table(
+    #     "考勤管理",
+    #     columns,
+    #     idx_cols=indexes,
+    #     fts_on_col="id",
+    #     fts_index_col="search_column",
+    #     **params,
+    # )
+    records = {
+        "id": int(entry["id"]),
+        "类型": entry["类型"],
+        "姓名": entry["姓名"],
+        "部门": entry["部门"],
+        "审批人": entry["审批人"],
+        "日期": entry["日期"],
+        "原因": entry["原因"],
+        "创建日期": entry["创建日期"],
+        "开始时间": entry["开始时间"],
+        "结束时间": entry["结束时间"],
+        "补卡时间": entry["补卡时间"],
+        "标题": entry["标题"],
+        "search_column": " ".join(cutter.cutword(entry["标题"] + "\n" + entry["原因"])),
+    }
+    await turso_exec([insert_statement("考勤管理", records, update_on_conflict="id")], retry=2, silent=True, **params)
+
+
+async def guess_department_from_turso(name: str) -> str:
+    if not name:
+        logger.warning("姓名为空")
+        return ""
+    params = TURSO_KWARGS | {"db_name": "dnkt"}
+    resp = await turso_exec(
+        [
+            {
+                "type": "execute",
+                "stmt": {"sql": f"SELECT 部门 FROM 考勤管理 WHERE 姓名 = '{name}' ORDER BY id DESC LIMIT 100"},
+            }
+        ],
+        retry=2,
+        silent=True,
+        **params,
+    )
+    values = glom(resp, "results.0.response.result.rows.**.value", default=[])
+    counter = Counter(values)
+    department = ""
+    for x in counter.most_common():
+        if x[0]:
+            department = x[0]
+            logger.success(f"猜测【{name}】的部门为【{department}】")
+            return department
+    logger.warning(f"未找到【{name}】的部门")
+    return department
+
+
+async def guess_department_from_d1(name: str) -> str:
+    if not name:
+        logger.warning("姓名为空")
+        return ""
+    resp = await query_d1(sql=f"SELECT 部门 FROM 考勤管理 WHERE 姓名 = '{name}' ORDER BY id DESC LIMIT 100", db_name="dnkt", silent=True)
+    values = glom(resp, "result.0.results.*.部门", default=[])
+    counter = Counter(values)
+    department = ""
+    for x in counter.most_common():
+        if x[0]:
+            department = x[0]
+            logger.success(f"猜测【{name}】的部门为【{department}】")
+            return department
+    logger.warning(f"未找到【{name}】的部门")
+    return department
+
+
+async def sync_d1_to_turso():
+    params = TURSO_KWARGS | {"db_name": "dnkt"}
+    resp = await turso_exec(
+        [
+            {
+                "type": "execute",
+                "stmt": {"sql": "SELECT * FROM 考勤管理"},
+            }
+        ],
+        retry=2,
+        silent=True,
+        **params,
+    )
+    turso_rows = turso_parse_resp(resp)
+    turso_ids = {x["id"] for x in turso_rows}
+    resp = await query_d1(sql="SELECT * FROM 考勤管理", db_name="dnkt", silent=True)
+    d1_rows = resp["result"][0]["results"]
+    statements = []
+    for row in sorted(d1_rows, key=lambda x: x["id"]):
+        if row["id"] in turso_ids:
+            continue
+        statements.append(insert_statement("考勤管理", row, update_on_conflict="id"))
+        if len(statements) == 1024:
+            resp = await turso_exec(statements, silent=True, retry=2, **params)
+            statements = []
+
+    if statements:
+        resp = await turso_exec(statements, silent=True, retry=2, **params)
+
+
+async def fill_missing_department_in_turso():
+    params = TURSO_KWARGS | {"db_name": "dnkt"}
+    resp = await turso_exec(
+        [
+            {
+                "type": "execute",
+                "stmt": {"sql": "SELECT * FROM 考勤管理"},
+            }
+        ],
+        retry=2,
+        silent=True,
+        **params,
+    )
+    rows = turso_parse_resp(resp)
+    tasks = []
+    for row in sorted(rows, key=lambda x: x["id"]):
+        if row["部门"]:
+            continue
+        row["部门"] = await guess_department_from_turso(row["姓名"])
+        if not row["部门"]:
+            continue
+        tasks.append(turso_exec([insert_statement("考勤管理", row, update_on_conflict="id")], retry=2, silent=True, **params))
+        if len(tasks) == 100:
+            resp = await asyncio.gather(*tasks, return_exceptions=True)
+            num_success = sum([1 for x in flatten(glom(resp, "*.results.*.type")) if x == "ok"]) // 2
+            logger.success(f"Synced {num_success} messages to Turso")
+            tasks = []
+
+    if tasks:
+        resp = await asyncio.gather(*tasks, return_exceptions=True)
+        num_success = sum([1 for x in flatten(glom(resp, "*.results.*.type")) if x == "ok"]) // 2
+        logger.success(f"Synced {num_success} messages to Turso")
+
+
+async def fill_missing_department_in_d1():
+    resp = await query_d1(sql="SELECT * FROM 考勤管理", db_name="dnkt", silent=True)
+    rows = resp["result"][0]["results"]
+    tasks = []
+    for row in sorted(rows, key=lambda x: x["id"]):
+        if row["部门"]:
+            continue
+        row["部门"] = await guess_department_from_d1(row["姓名"])
+        if not row["部门"]:
+            continue
+        tasks.append(query_d1(**insert_d1("考勤管理", row, update_on_conflict="id"), db_name="dnkt", silent=True))
+        if len(tasks) == 32:
+            resp = await asyncio.gather(*tasks, return_exceptions=True)
+            num_success = sum([1 for x in glom(resp, "*.success") if x is True])
+            logger.success(f"Synced {num_success} messages to Turso")
+            tasks = []
+
+    if tasks:
+        resp = await asyncio.gather(*tasks, return_exceptions=True)
+        num_success = sum([1 for x in glom(resp, "*.success") if x is True])
+        logger.success(f"Synced {num_success} messages to Turso")
src/custom/dnkt_email.py
@@ -0,0 +1,339 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import asyncio
+import hashlib
+import os
+import re
+import smtplib
+import ssl
+from email.headerregistry import Address
+from email.message import EmailMessage
+from io import BytesIO
+from urllib.parse import quote
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.errors import FloodWait
+from pyrogram.types import InputMediaDocument
+
+from ai.utils import remove_consecutive_newlines
+from config import CAPTION_LENGTH, DB, cutter
+from database.d1 import insert_d1, query_d1
+from database.r2 import set_cf_r2
+from database.turso import insert_statement, turso_exec
+from history.utils import TURSO_KWARGS
+from messages.utils import count_without_entities, remove_img_tag, smart_split
+from networking import hx_req
+from utils import convert_html, convert_md
+
+
+# ruff: noqa: PLW2901,RUF001,RUF002,RUF003
+async def dnkt_email(client: Client):
+    """DNKT email."""
+    if os.getenv("DNKT_EMAIL_DISABLED", "0") == "1":
+        return
+    emails: list[dict] = await hx_req("https://dl.zydou.me/d/emails.json", timeout=60, silent=True)  # ty:ignore[invalid-assignment]
+
+    if not emails:
+        return
+    last_time = await get_last_time()
+    entries = [x for x in emails if x["ReceivedTime"] > last_time]
+    entries = sorted(entries, key=lambda x: x["ReceivedTime"], reverse=False)  # old to latest
+
+    for entry in entries:
+        logger.info(entry["Subject"])
+        await sync_to_d1(entry)
+        await sync_to_turso(entry)
+        caption, html, markdown = await parse_entry(entry)
+        await send_email_notice(client, entry, caption, html, markdown)
+        await forward_email(entry, html, markdown)
+
+
+async def forward_email(
+    entry: dict,
+    html: str,
+    markdown: str,
+    target_email: str = "douzy94@gmail.com",
+    riseup_email: str = os.getenv("RISEUP_EMAIL", ""),
+    riseup_pass: str = os.getenv("RISEUP_PASSWORD", ""),
+    alias_email: str | None = os.getenv("RISEUP_ALIAS"),
+):
+    """解析 .msg 文件并通过 Riseup SMTP 转发给目标邮箱."""
+    if not riseup_email or not riseup_pass:
+        logger.error("Riseup 邮箱或密码为空")
+        return
+
+    try:
+        eml = EmailMessage()
+        eml["Subject"] = entry["Subject"]
+        # 发件人必须是你的 Riseup 邮箱,否则 SMTP 会拒绝投递 (Relay Access Denied)
+        eml["From"] = format_addresses(entry["Sender"], alias_email or riseup_email or "")
+        eml["To"] = format_addresses(entry["To"], target_email)
+        logger.info(f"主题: {entry['Subject']}")
+        logger.info(f"发件人: {entry['Sender']}")
+        logger.info(f"收件人: {entry['To']}")
+
+        eml.set_content(markdown or "")
+        eml.add_alternative(html, subtype="html")
+
+        # 处理 ICS 附件
+        if ics := entry.get("ICS"):
+            eml.add_attachment(
+                ics.encode("utf-8"),
+                maintype="text",
+                subtype="calendar",
+                filename="invite.ics",
+                params={"method": "PUBLISH"},  # 使用 PUBLISH 权限更轻,不容易被拦
+            )
+            logger.debug("已检测到会议邀请,成功注入了 ICS 日历组件")
+
+        # 6. 连接 Riseup SMTP 服务器并发送
+        # Riseup 官方 SMTP 要求:SSL/TLS 加密,端口 465
+        smtp_server = "mail.riseup.net"
+        smtp_port = 465
+        context = ssl.create_default_context()
+        with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
+            # server.set_debuglevel(1)  # 如果发送失败,取消注释这行可以打印底层的网络通信日志
+            server.login(riseup_email, riseup_pass)
+            server.send_message(eml, from_addr=alias_email or riseup_email, to_addrs=[target_email])
+        logger.success(f"成功将邮件 [{entry['Subject']}] 发送至: {target_email}")
+    except smtplib.SMTPAuthenticationError:
+        logger.error("SMTP 认证失败:请检查你的 Riseup 用户名和密码。")
+    except smtplib.SMTPException as e:
+        logger.error(f"SMTP 通信或发信被拒: {e}")
+    except Exception as e:
+        logger.error(f"邮件重组或发送过程中发生未知错误: {e}")
+
+
+async def send_email_notice(client: Client, entry: dict, caption: str, html: str, markdown: str):
+    try:
+        if await count_without_entities(f"{caption}\n\n{markdown}") < CAPTION_LENGTH:
+            caption = f"{caption}\n\n{markdown}"
+        caption = (await smart_split(caption, CAPTION_LENGTH))[0]
+        fname = entry["Subject"][:64]
+        ics = entry.get("ICS", "")
+        media = []
+        with BytesIO(markdown.encode("utf-8")) as f_md, BytesIO(html.encode("utf-8")) as f_html, BytesIO(ics.encode("utf-8")) as f_ics:
+            media.append(InputMediaDocument(f_md, file_name=f"{fname}.txt"))
+            media.append(InputMediaDocument(f_html, caption=caption, file_name=f"{fname}.html"))
+            if ics:
+                media.append(InputMediaDocument(f_ics, file_name=f"{fname}.ics"))
+            await client.send_media_group(chat_id=-1003768991336, message_thread_id=1204, media=media)
+        await asyncio.sleep(3)
+    except FloodWait as e:
+        logger.warning(e)
+        await asyncio.sleep(e.value)  # type: ignore
+        return await send_email_notice(client, entry, caption, html, markdown)
+
+
+async def get_last_time() -> str:
+    resp = await query_d1(sql="SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1", db_name="dnkt", silent=True)
+    last_time = glom(resp, "result.0.results.0.ReceivedTime", default="")
+    if not last_time:
+        params = TURSO_KWARGS | {"db_name": "dnkt"}
+        resp = await turso_exec(
+            [
+                {
+                    "type": "execute",
+                    "stmt": {"sql": "SELECT ReceivedTime FROM 邮件 ORDER BY ReceivedTime DESC LIMIT 1"},
+                }
+            ],
+            retry=2,
+            silent=True,
+            **params,
+        )
+        last_time = glom(resp, "results.0.response.result.rows.0.0.value", default="")
+    return last_time or "9999-12-31 23:59:59"
+
+
+async def parse_entry(entry: dict) -> tuple[str, str, str]:
+    """Parse entry."""
+    caption = ""
+    r2_key = f"TTL/365d/{entry['ReceivedTime']}-{entry['Subject']}.html"
+    url = f"{DB.CF_R2_PUBLIC_URL.rstrip('/')}/{quote(r2_key)}"
+    if "Start" not in entry:  #  普通邮件
+        caption += f"📧**[{entry['Subject']}]({url})**\n🕒{entry['ReceivedTime']}"
+        if entry["Sender"]:
+            caption += f"\n发件人: {combine_names(entry['Sender'])}"
+        if entry["To"]:
+            caption += f"\n收件人: {combine_names(entry['To'])}"
+        if entry["CC"]:
+            caption += f"\n抄送: {combine_names(entry['CC'])}"
+    else:  #  会议邮件
+        caption += f"📅**[{entry['Subject']}]({url})**"
+        if entry.get("IsRecurring"):
+            caption += "🔄"
+        if entry.get("Start"):
+            caption += f"\n▶️{entry['Start']}"
+        if entry.get("End"):
+            caption += f"\n⏸️{entry['End']}"
+        if entry.get("Location"):
+            caption += f"\n📍{combine_names(entry['Location'])}"
+        if entry.get("Sender"):
+            caption += f"\n组织人: {combine_names(entry['Sender'])}"
+        if entry.get("To"):
+            caption += f"\n必需: {combine_names(entry['To'])}"
+        if entry.get("CC"):
+            caption += f"\n可选: {combine_names(entry['CC'])}"
+        if entry.get("IsConflict"):
+            caption += "\n⚠️此会议时间有冲突"
+    body = entry["Body"].replace("\r\n", "\n")
+    body = body.replace("CONFIDENTIAL<br>", "").replace("CONFIDENTIAL", "")
+    html = body if body.startswith("<") else convert_html(body)
+    if "<head>" not in html:
+        html = f"<head></head>{html}"
+    html = html.replace("\n", "<br>")
+    html = html.replace("<head>", '<head><meta charset="UTF-8">')
+    markdown = convert_md(html=body) if body.startswith("<") else body
+    markdown, _ = remove_img_tag(markdown)
+    markdown = remove_consecutive_newlines(markdown)
+    markdown = markdown.replace("\\_", "")
+    for cid, b64 in entry.get("Images", {}).items():
+        html = html.replace(f"cid:{cid}", b64)
+    await set_cf_r2(r2_key, html, mime_type="text/html")
+    return caption, html, markdown
+
+
+async def sync_to_d1(entry: dict) -> None:
+    """Sync to D1 database."""
+    # from database.d1 import create_d1_table
+
+    # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, BCC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, Attachments TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
+    # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
+    # await create_d1_table(
+    #     "邮件",
+    #     columns,
+    #     idx_cols=indexes,
+    #     fts_on_col="MD5",
+    #     fts_index_col="search_column",
+    #     db_name="dnkt",
+    #     silent=False,
+    # )
+    records = {
+        "Subject": entry.get("Subject", ""),
+        "Sender": combine_names(entry.get("Sender", "")),
+        "ReceivedTime": entry.get("ReceivedTime", ""),
+        "Receiver": combine_names(entry.get("To", "")),
+        "CC": combine_names(entry.get("CC", "")),
+        "BCC": combine_names(entry.get("BCC", "")),
+        "StartTime": entry.get("Start", ""),
+        "EndTime": entry.get("End", ""),
+        "Location": combine_names(entry.get("Location", "")),
+        "Attachments": entry.get("Attachments", ""),
+        "RRULE": entry.get("RRULE", ""),
+        "Body": entry.get("Body", ""),
+        "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
+        "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
+    }
+    await query_d1(**insert_d1("邮件", records, update_on_conflict="MD5"), db_name="dnkt", silent=True)
+
+
+async def sync_to_turso(entry: dict) -> None:
+    """Sync to Turso database."""
+    params = TURSO_KWARGS | {"db_name": "dnkt"}
+    # from database.turso import turso_create_table
+
+    # columns = "ReceivedTime TEXT, Subject TEXT, Sender TEXT, Receiver TEXT, CC TEXT, BCC TEXT, StartTime TEXT, EndTime TEXT, Location TEXT, Attachments TEXT, RRULE TEXT, Body TEXT, search_column TEXT, MD5 INTEGER PRIMARY KEY"
+    # indexes = ["Subject", "ReceivedTime", "Sender", "Receiver", "CC"]
+    # await turso_create_table(
+    #     "邮件",
+    #     columns,
+    #     idx_cols=indexes,
+    #     fts_on_col="MD5",
+    #     fts_index_col="search_column",
+    #     **params,
+    # )
+    records = {
+        "Subject": entry.get("Subject", ""),
+        "Sender": combine_names(entry.get("Sender", "")),
+        "ReceivedTime": entry.get("ReceivedTime", ""),
+        "Receiver": combine_names(entry.get("To", "")),
+        "CC": combine_names(entry.get("CC", "")),
+        "BCC": combine_names(entry.get("BCC", "")),
+        "StartTime": entry.get("Start", ""),
+        "EndTime": entry.get("End", ""),
+        "Location": combine_names(entry.get("Location", "")),
+        "Attachments": entry.get("Attachments", ""),
+        "RRULE": entry.get("RRULE", ""),
+        "Body": entry.get("Body", ""),
+        "search_column": " ".join(cutter.cutword(f"{entry['Subject']}\n{entry.get('Body', '')}")),
+        "MD5": md5(f"{entry['Subject']}{entry['ReceivedTime']}"),
+    }
+    await turso_exec([insert_statement("邮件", records, update_on_conflict="MD5")], retry=2, silent=True, **params)
+
+
+def md5(s: str) -> int:
+    """Get md5 hash."""
+    h = hashlib.md5(s.encode("utf-8")).hexdigest()  # noqa: S324
+    return int(h[:12], 16)
+
+
+def combine_names(texts: str) -> str:
+    """Get raw names from texts.
+
+    Input: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
+    Output: "张三; 李四"
+    """
+    names = texts.split(";")
+    final_names = []
+    for item in names:
+        item = item.strip()
+        if item == "Microsoft Teams 会议" and len(names) > 1:
+            continue
+        english, chinese, _ = parse_email(item)
+        final_names.append(chinese or english)
+    return "; ".join(final_names)
+
+
+def format_addresses(contacts: str | None, email: str = "email@domain.com") -> Address:
+    """Format contacts to Address.
+
+    Example:
+    contacts: "San Zhang (张三) <zhangsan@example.com>; Li Si (李四)"
+    email: "email@domain.com"
+    return: Address(display_name="张三; 李四", addr_spec="email@domain.com")
+    """
+    if not contacts:
+        return Address(addr_spec=email)
+    result_parts = []
+    # 按分号分割(处理存在多个收件人的情况)
+    for contact in contacts.split(";"):
+        english, chinese, _ = parse_email(contact.strip())
+        if chinese:
+            result_parts.append(chinese or english)
+    # 用分号和空格将多个联系人重新拼接起来
+    full_name = "; ".join(result_parts).strip()
+    return Address(display_name=full_name, addr_spec=email)
+
+
+def parse_email(s: str) -> tuple[str, str, str]:
+    """解析邮箱字符串,返回英文名、中文名、邮箱.
+
+    Example:
+    'San Zhang (张三) <zhangsan@example.com>' -> ('San Zhang', '张三', 'zhangsan@example.com')
+    'San Zhang <zhangsan@example.com>' -> ('San Zhang', '', 'zhangsan@example.com')
+    'Li Si (李四)' -> ('Li Si', '李四', '')
+    'Li Si' -> ('Li Si', '', '')
+    '(王五) <wangwu@example.com>' -> ('', '王五', 'wangwu@example.com')
+    """
+    s = s.strip()
+    # 提取邮箱: 匹配 <...>
+    email_match = re.search(r"<(.*?)>", s)
+    email = email_match.group(1).strip() if email_match else ""
+
+    # 提取中文名: 匹配 (...)
+    chinese_match = re.search(r"\((.*?)\)", s)
+    chinese_name = chinese_match.group(1).strip() if chinese_match else ""
+
+    # 提取英文名:
+    # 先移除已匹配的邮箱和中文名部分,剩下的即为英文名
+    english_name = s
+    if email_match:
+        english_name = english_name.replace(email_match.group(0), "")
+    if chinese_match:
+        english_name = english_name.replace(chinese_match.group(0), "")
+
+    # 清理英文名两端的空格和特殊符号
+    english_name = english_name.strip()
+    return (english_name, chinese_name, email)
src/custom/email2md.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import email
+import email.policy
+import re
+from email.header import decode_header
+from email.message import EmailMessage
+from io import BytesIO
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import InputMediaDocument, Message, ReplyParameters
+
+from config import CAPTION_LENGTH, DB, TEXT_LENGTH, TZ
+from database.r2 import set_cf_r2
+from messages.utils import count_without_entities
+from utils import convert_html, convert_md, myself, nowstr, readable_size
+
+
+async def eml2md(client: Client, message: Message):
+    """EML文件转Markdown."""
+    if glom(message, "document.mime_type", default="") != "message/rfc822":
+        return
+    # 确保是SMTP机器人或自己发送的邮件
+    if not (glom(message, "from_user.username", default="") == "smtpbot" or glom(message, "from_user.id", default=0) == (await myself(client)).id):
+        return
+    data: BytesIO = await client.download_media(message, in_memory=True)  # type: ignore
+    logger.debug(f"Downloading Email file: {data.name}")
+    msg = email.message_from_binary_file(data, policy=email.policy.default)
+    # 提取邮件元信息
+    subject = decode(msg.get("Subject"))
+    from_ = decode(msg.get("From"))
+    to = decode(msg.get("To"))
+    reply_to = decode(msg.get("Reply-To"))
+    date = decode(msg.get("Date")) or nowstr(TZ)
+    cc = decode(msg.get("Cc"))
+    bcc = decode(msg.get("Bcc"))
+    html, md = extract_body(msg)
+    attachments = extract_attachments(msg)
+    r2_key = f"TTL/90d/{message.chat.id}-{message.id}.html"
+    await set_cf_r2(key=r2_key, data=html, mime_type="text/html")
+    url = f"{DB.CF_R2_PUBLIC_URL}/TTL/90d/{message.chat.id}-{message.id}.html"
+    header = ""
+    if subject:
+        header += f"**[{subject}]({url})**\n"
+    if from_:
+        header += f"**From**: {refine_email(from_)}\n"
+    if to:
+        header += f"**To**: {refine_email(to)}\n"
+    if reply_to:
+        header += f"**Reply-To**: {refine_email(reply_to)}\n"
+    if date:
+        header += f"**Date**: {date}\n"
+    if cc:
+        header += f"**CC**: {refine_email(cc)}\n"
+    if bcc:
+        header += f"**BCC**: {refine_email(bcc)}\n"
+    if attachments:
+        header += f"{attachments}\n"
+
+    async def sent_file(caption: str = ""):
+        with BytesIO(html.encode("utf-8")) as fhtml, BytesIO(md.encode("utf-8")) as fmd:
+            await client.send_media_group(
+                message.chat.id,
+                message_thread_id=message.message_thread_id,
+                media=[
+                    InputMediaDocument(fhtml, file_name=f"{subject}.html"),
+                    InputMediaDocument(fmd, caption=caption, file_name=f"{subject}.md"),
+                ],
+                reply_parameters=ReplyParameters(message_id=message.id),
+            )
+
+    length = await count_without_entities(header + md)
+    if length <= CAPTION_LENGTH:
+        await sent_file(header + md)
+    elif CAPTION_LENGTH < length <= TEXT_LENGTH:
+        await message.reply_text(header + md, quote=True)
+        await sent_file(header)
+    else:
+        await sent_file(header)
+    await client.mark_chat_unread(message.chat.id)
+
+
+def decode(texts: str | None) -> str:
+    # ruff: noqa: PLW2901
+    if not texts:
+        return ""
+    try:
+        decoded_parts = decode_header(texts)
+        result_parts = []
+        for part, encoding in decoded_parts:
+            if isinstance(part, bytes):
+                if encoding:
+                    try:
+                        part = part.decode(encoding)
+                    except (UnicodeDecodeError, LookupError):
+                        part = part.decode("utf-8", errors="replace")
+                else:
+                    part = part.decode("utf-8", errors="replace")
+            result_parts.append(str(part))
+        return "".join(result_parts)
+    except Exception:
+        return str(texts)
+
+
+def refine_email(email: str) -> str:
+    return email.replace("<", "(").replace(">", ")")
+
+
+def extract_body(msg: EmailMessage) -> tuple[str, str]:
+    """Extract the email body to HTML & Markdown."""
+    html_part: str | None = None
+    text_part: str | None = None
+
+    if msg.is_multipart():
+        for part in msg.walk():
+            content_type = part.get_content_type()
+            disposition = str(part.get("Content-Disposition", ""))
+
+            if "attachment" in disposition:
+                continue
+
+            if content_type == "text/html" and html_part is None:
+                html_part = part.get_content()
+            elif content_type == "text/plain" and text_part is None:
+                text_part = part.get_content()
+    else:
+        content_type = msg.get_content_type()
+        if content_type == "text/html":
+            html_part = msg.get_content()
+        elif content_type == "text/plain":
+            text_part = msg.get_content()
+
+    if html_part:
+        md = convert_md(html=html_part)
+        return html_part, md
+    if text_part:
+        text_part = re.sub(r"\n\s*\n", "\n\n", text_part).strip()
+        html = convert_html(texts=text_part)
+        return html, text_part
+
+    return "", ""
+
+
+def extract_attachments(msg: EmailMessage) -> str:
+    """List attachments with their filenames and sizes."""
+    attachments = ""
+
+    if not msg.is_multipart():
+        return ""
+
+    for part in msg.walk():
+        disposition = str(part.get("Content-Disposition", ""))
+        if "attachment" not in disposition:
+            continue
+
+        filename = part.get_filename() or "unnamed"
+        payload = part.get_payload(decode=True)
+        size = len(payload) if payload else 0
+        attachments += f"- `{filename}` ({readable_size(size)})\n"
+
+    return f"**Attachments**\n{attachments}" if attachments else ""
src/custom/events.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import re
+
+from glom import glom
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+# ruff: noqa: SIM102, RUF001
+
+SLEEP_TIME = 600
+
+
+async def lottery(client: Client, message: Message) -> None:
+    return
+    # 仅限于群聊
+    if glom(message, "chat.type.name", default="") != "SUPERGROUP":
+        return
+
+    # 抽奖机器人 @XiaoCaiLotteryBot
+    if glom(message, "from_user.id", default=0) == 6461022460:
+        if str(message.text).startswith("新的抽奖已经创建\n"):
+            if matched := re.search(r"参与关键词:「(.*?)」", str(message.text)):
+                keyword = matched.group(1)
+                await asyncio.sleep(SLEEP_TIME)
+                await client.send_message(message.chat.id, keyword)
+
+    # 抽奖机器人 @Lottery8Bot
+    for url in glom(message, "reply_markup.inline_keyboard.**.url", default=[]):
+        if str(url).startswith("https://t.me/Lottery8Bot?start="):
+            await asyncio.sleep(SLEEP_TIME)
+            await client.send_message("@Lottery8Bot", "/start " + url.removeprefix("https://t.me/Lottery8Bot?start="))
src/custom/history_alias.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from pyrogram.client import Client
+from pyrogram.types import Message, User
+from pyrogram.types.messages_and_media.message import Str
+
+from config import DEVICE_NAME
+from custom.config import ACCOUNT_NAME, CHANNEL_FAVORITE, GROUP_67373, GROUP_DEV, GROUP_JSQ, USER_BENNY, USER_XIAOHAO
+from history.query import query_chat_history
+from messages.utils import startswith_prefix
+
+
+async def tg_history_alias(client: Client, message: Message, **kwargs):
+    """TG发言快捷查询.
+
+    `/wild`: 发姐
+    `/dbb`: 电报部
+    `/jsq`: 下班不关电脑群
+    """
+    cid = message.chat.id
+    if cid not in [GROUP_DEV, CHANNEL_FAVORITE, USER_BENNY, USER_XIAOHAO, GROUP_67373, GROUP_JSQ] or not startswith_prefix(
+        message.content, prefix=["/alias", "/wild", "/li", "/67373", "/jsq", "/pods", "/bili", "/weibo", "/fxq", "/zaihua", "/solidot", "/heike", "/hkdg"]
+    ):
+        return
+    if cid == GROUP_DEV and DEVICE_NAME in ["BennyBot-JP", "BennyBot-US", "BennyBot-CN"]:
+        return
+    if cid in [GROUP_67373, GROUP_JSQ, USER_XIAOHAO] and ACCOUNT_NAME != "xiaohao":  # 只使用小号响应67373相关群
+        return
+    if message.content == "/alias":
+        docs = "`/wild`: 发姐\n`/li`: 李老师\n`/67373`: 电报部\n`/jsq`: 技术群\n`/pods`: 播客\n`/bili`: B站\n`/weibo`: 微博\n`/fxq`: 风向旗\n`/zaihua`: 在花\n`/solidot`: Solidot\n`/heike`: 黑客帝国\n"
+        await message.reply(text=docs.strip(), quote=True)
+        return
+    text = ""
+    if startswith_prefix(message.content, prefix="/wild"):
+        text = message.content.removeprefix("/wild").strip()
+        text = f"/history #1744444199 @1639998668 {text}"
+    elif startswith_prefix(message.content, prefix="/dbb,/67373"):
+        text = message.content.removeprefix("/dbb").removeprefix("/67373").strip()
+        text = f"/history #1744444199 {text}"
+    elif startswith_prefix(message.content, prefix="/jsq,/jsb"):
+        text = message.content.removeprefix("/jsq").removeprefix("/jsb").strip()
+        text = f"/history #1852924912 {text}"
+    elif startswith_prefix(message.content, prefix="/pods"):
+        text = message.content.removeprefix("/pods").strip()
+        text = f"/history #2436072788 {text}"
+    elif startswith_prefix(message.content, prefix="/bili"):
+        text = message.content.removeprefix("/bili").strip()
+        text = f"/history #2178783945 {text}"
+    elif startswith_prefix(message.content, prefix="/weibo"):
+        text = message.content.removeprefix("/weibo").strip()
+        text = f"/history #2328010080 {text}"
+    elif startswith_prefix(message.content, prefix="/li"):
+        text = message.content.removeprefix("/li").strip()
+        text = f"/history #2416354272 {text}"
+    elif startswith_prefix(message.content, prefix="/zaihua"):
+        text = message.content.removeprefix("/zaihua").strip()
+        text = f"/history #1125107539 {text}"
+    elif startswith_prefix(message.content, prefix="/fxq"):
+        text = message.content.removeprefix("/fxq").strip()
+        text = f"/history #1472283197 {text}"
+    elif startswith_prefix(message.content, prefix="/solidot"):
+        text = message.content.removeprefix("/solidot").strip()
+        text = f"/history #1104204833 {text}"
+    elif startswith_prefix(message.content, prefix="/heike,/hkdg"):
+        text = message.content.removeprefix("/heike").removeprefix("/hkdg").strip()
+        text = f"/history #1812023706 {text}"
+    if not text:
+        return
+    message.text = Str(text)
+    message.from_user = User(id=USER_BENNY)  # Use `benny` to bypass permission check
+    await query_chat_history(client, message, **kwargs)
src/custom/lilaoshi.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import re
+from datetime import timedelta
+
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import TZ, cache
+from custom.config import ACCOUNT_NAME, CHANNEL_LILAOSHI_OFFICIAL, CHANNEL_LILAOSHI_PREVIEWED
+from database.r2 import list_cf_r2
+from messages.chat_history import get_history_info_list_via_telegram
+from messages.parser import parse_msg
+from messages.utils import delete_message
+from networking import match_social_media_link
+from utils import nowdt
+
+
+async def handle_lilaoshi(client: Client, message: Message):
+    if ACCOUNT_NAME != "xiaohao" or message.chat.id != CHANNEL_LILAOSHI_OFFICIAL:
+        return
+    info = parse_msg(message, silent=True)
+    if info["entity_urls"] and (matched := await match_social_media_link(info["entity_urls"][0])) and matched["platform"] in ["twitter", "x", "fxtwitter", "fixupx"]:
+        text = f"{matched['url']}\n#set_target_chat={CHANNEL_LILAOSHI_PREVIEWED} #no_show_statistics #no_twitter_comments_provider #set_twitter_provider=fxtwitter-vxtwitter #set_reply_msg_id=-1 #no_show_progress"
+        msg = await client.send_message("@bennydou_bot", text)
+        await asyncio.sleep(3)
+        await delete_message(msg)
+
+
+async def preview_lilaoshi_history_message(client: Client, hours: int = 25):
+    if cache.get("preview_lilaoshi_history_message"):
+        return
+    cache.set("preview_lilaoshi_history_message", 1, ttl=12 * 3600)  # backup every 12 hours
+    r2 = await list_cf_r2(prefix="x.com/whyyoutouzhele/status/")
+    logger.trace(f"Get {len(r2.get('Contents', []))} 李老师 entries from R2")
+    processed_pids = {entry["Key"].removeprefix("x.com/whyyoutouzhele/status/") for entry in r2.get("Contents", [])}
+
+    begin_time = nowdt(TZ) - timedelta(hours=hours)
+    offical_info_list = await get_history_info_list_via_telegram(client, chat_id="lilaoshibushinilaoshi", begin_time=begin_time, limit=999999)
+    for info in offical_info_list:  # old to new
+        if not info["entity_urls"]:
+            continue
+        matched = re.findall(r"https://(:?twitter|x|fxtwitter|fixupx|vxtwitter)\.com/whyyoutouzhele/status/(\d+)", info["entity_urls"][0])
+        pids = [pid for _, pid in matched]
+        for pid in pids:
+            if pid not in processed_pids:
+                url = f"https://x.com/whyyoutouzhele/status/{pid}"
+                logger.info(f"解析李老师频道链接: {url} | {info['text'][:30]}")
+                text = f"{url}\n#set_target_chat={CHANNEL_LILAOSHI_PREVIEWED} #no_show_statistics #no_twitter_comments_provider #set_twitter_provider=fxtwitter-vxtwitter #set_reply_msg_id=-1 #no_show_progress"
+                msg = await client.send_message("@bennydou_bot", text)
+                await asyncio.sleep(3)
+                await delete_message(msg)
src/custom/msg_backup.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from asr.voice_recognition import voice_to_text
+from config import cache
+from custom.config import ACCOUNT_NAME, CHANNEL_CYF_BAK, GROUP_67373, USER_CYF
+from messages.parser import parse_msg
+from utils import to_int
+
+
+async def message_backup(client: Client, message: Message):
+    async def copy_msg(target_cid: int | str):
+        info = parse_msg(message, silent=True)
+        if info["mtype"] == "voice":
+            await voice_to_text(client, message, target_chat=to_int(target_cid), reply_msg_id=-1, asr_need_prefix=False)
+        elif message.media_group_id:
+            if not cache.get(f"{message.media_group_id}"):
+                cache.set(f"{message.media_group_id}", "1", ttl=120)
+                caption = f"{info['text']}\n{info['message_url']}".strip()
+                await client.copy_media_group(chat_id=to_int(target_cid), from_chat_id=message.chat.id, message_id=message.id, captions=caption)
+        elif message.media:
+            caption = f"{info['text']}\n{info['message_url']}".strip()
+            await client.copy_message(chat_id=to_int(target_cid), from_chat_id=message.chat.id, message_id=message.id, caption=caption)
+        elif info["entity_urls"]:
+            text = f"{info['text']}\n{info['message_url']}".strip()
+            await client.send_message(chat_id=to_int(target_cid), text=text)
+        else:
+            text = f"[{info['text']}]({info['message_url']})".strip()
+            await client.send_message(chat_id=to_int(target_cid), text=text)
+
+    if message.chat.id == GROUP_67373 and message.from_user.id == USER_CYF and ACCOUNT_NAME == "xiaohao":
+        info = parse_msg(message, silent=True)
+        logger.success(info["summary"])
+        await copy_msg(CHANNEL_CYF_BAK)
+
+    # 风哥
+    if message.chat.id == 2049194302 and message.from_user.id == 2049194302:
+        info = parse_msg(message, silent=True)
+        logger.success(info["summary"])
+        await copy_msg(-1002465020912)  # 开发专用频道
src/custom/readhub.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import hashlib
+import os
+from datetime import UTC, datetime
+
+from glom import Coalesce, glom
+from loguru import logger
+
+from config import cache
+from database.d1 import create_d1_table, insert_d1, query_d1
+from networking import hx_req
+
+TABLE = "readhub"
+
+
+async def readhub():
+    if os.getenv("READHUB_DISABLED", "0") == "1" or cache.get("readhub"):
+        return
+    cache.set("readhub", 1, ttl=1200)  # every 20 minutes
+    await create_d1_table(
+        table_name=TABLE,
+        columns="uid TEXT PRIMARY KEY, title TEXT, timestamp INTEGER, url TEXT, summary TEXT",
+        idx_cols=["uid", "timestamp"],
+        db_name="dnkt",
+        silent=True,
+    )
+    d1 = await query_d1(sql=f"SELECT timestamp,uid FROM {TABLE} ORDER BY timestamp DESC LIMIT 100", db_name="dnkt", silent=True)
+    finished_uids = set(glom(d1, "result.**.uid", default=[]))
+    if not finished_uids:
+        logger.error("ReadHub: No finished uids")
+        return
+    for page in range(1, 6):
+        await sync_page(finished_uids, page=page)
+
+
+async def sync_page(finished_uids: set[str], page: int = 1):
+    resp = await hx_req(
+        f"https://api.readhub.cn/news/list?page={page}&size=20&type=8",
+        headers={
+            "accept": "*/*",
+            "accept-language": "zh-CN,zh;q=0.9",
+            "authorization": "bearer",
+            "cache-control": "no-cache",
+            "content-type": "application/json",
+            "dnt": "1",
+            "origin": "https://readhub.cn",
+            "pragma": "no-cache",
+            "priority": "u=1, i",
+            "referer": "https://readhub.cn/",
+            "sec-ch-ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
+            "sec-ch-ua-mobile": "?0",
+            "sec-ch-ua-platform": '"Windows"',
+            "sec-fetch-dest": "empty",
+            "sec-fetch-mode": "cors",
+            "sec-fetch-site": "same-site",
+            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
+        },
+        check_kv={"data.pageIndex": page},
+        timeout=5,
+        silent=True,
+    )
+    items = glom(resp, "data.items", default=[])
+    tasks = []
+    for item in items[::-1]:
+        title = item.get("title", "")
+        summary = item.get("summary", "")
+        url = item.get("url", "")
+        uid = item.get("uid", 0)
+        if not url or not title:
+            continue
+        if not uid:
+            uid = hashlib.sha256(f"{title}{url}{summary}".encode()).hexdigest()
+        if uid in finished_uids:
+            continue
+        logger.info(f"ReadHub: [{title}]({url})")
+        records = {
+            "uid": uid,
+            "title": title,
+            "timestamp": get_utc_timestamp(item),
+            "url": url,
+            "summary": summary,
+        }
+        tasks.append(query_d1(**insert_d1(TABLE, records, update_on_conflict="uid"), db_name="dnkt", silent=True))
+    await asyncio.gather(*tasks)
+
+
+def get_utc_timestamp(item: dict) -> int:
+    time_str = glom(item, Coalesce("publishDate", "createdAt"), default="")  # 2026-01-20T04:41:35.415Z
+    if not time_str:
+        return round(datetime.now(UTC).timestamp())
+    time_dt = datetime.fromisoformat(time_str)
+    return round(time_dt.timestamp())
src/custom/restart.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""在Docker中退出时, 会自动重启Bot服务."""
+
+import os
+
+from glom import glom
+from loguru import logger
+from pyrogram.types import Message
+
+from config import DEVICE_NAME
+from custom.config import ALLOW_USERS_TO_CALL_RESTART
+from utils import slim_cid, strings_list
+
+
+async def restart_bot(message: Message):
+    # VPS上的账号不响应 开发Group 的消息
+    if message.chat.id == -1002434113592 and DEVICE_NAME in ["BennyBot-JP", "BennyBot-US", "BennyBot-CN"]:
+        return
+    if message.content.strip().lower() != "/restart":
+        return
+    uid = glom(message, "from_user.id", default=0) or 0
+    if slim_cid(uid) not in strings_list(ALLOW_USERS_TO_CALL_RESTART):
+        return
+    logger.error(f"Restart command received from {uid=}")
+    os._exit(0)
src/custom/rss.py
@@ -0,0 +1,164 @@
+#!/venv/bin/python
+# -*- coding: utf-8 -*-
+import os
+from urllib.parse import unquote_plus
+
+import feedparser
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Chat, Message
+
+from config import cache
+from database.d1 import create_d1_table, insert_d1, query_d1
+from messages.main import process_message
+from messages.sender import send2tg
+from networking import hx_req
+from podcast.utils import get_pubdate
+from utils import bare_url, https_url, nowdt, rand_number, true
+
+HEADERS = {
+    "User-Agent": "feedparser/6.0.11 +https://github.com/kurtmckee/feedparser/",
+    "Accept": "application/atom+xml,application/rdf+xml,application/rss+xml,application/x-netcdf,application/xml;q=0.9,text/xml;q=0.2,*/*;q=0.1",
+}
+TABLE = "rss"
+
+RSS: list[dict] = [
+    {
+        "title": "Blibili时间线",
+        "feed_url": "https://rsshub.zydou.me/bilibili/followings/video/280035751",
+        "interval": 60,
+        "target_chat": -1002178783945,
+        "suffix": " #set_ytdlp_video_target=-1002178783945 #no_ytdlp_send_audio #with_ytdlp_send_subtitle #with_to_telegraph #with_ytdlp_send_summary #set_reply_msg_id=-1 #no_bilibili_comments #no_show_statistics #no_show_progress #set_asr_engine=tencent #set_summary_model_id=bilibili",
+        "is_social_link": 1,
+    },
+    {
+        "title": "华中师范大学",
+        "feed_url": "https://rss.zydou.me/public.php?op=rss&id=30&is_cat=1&key=a8gt976079ed4023481",  # gitleaks:allow
+        "guid_key": ["link", "title"],
+        "target_chat": "@CCNU_EDU",
+        "interval": 14400,
+    },
+    {
+        "title": "微博时间线",
+        "feed_url": "https://rss.zydou.me/public.php?op=rss&id=22&is_cat=1&key=wc73rk679236153c269",  # gitleaks:allow
+        "target_chat": -1002328010080,
+        "interval": 300,
+        "is_social_link": 1,
+    },
+    {"title": "陈一发儿", "feed_url": "https://rsshub.zydou.me/weibo/user/7357828611", "target_chat": -1001433673794, "interval": 60, "is_social_link": 1},
+]
+
+
+async def update_rss(client: Client):
+    if os.getenv("UPDATE_RSS_DISABLED", "0") == "1":
+        return
+    await create_d1_table(
+        table_name=TABLE,
+        columns="key TEXT PRIMARY KEY, timestamp INTEGER, feed_title TEXT, title TEXT, url TEXT",
+        idx_cols=["key", "timestamp", "feed_title"],
+        silent=True,
+    )
+    for feed in RSS:
+        if cache.get(f"rss-{feed['feed_url']}"):
+            continue
+        interval = int(feed.get("interval", 3600))
+        cache.set(f"rss-{feed['feed_url']}", "1", ttl=interval)
+        feed_title = feed.get("title", "")
+        remote_content = await hx_req(feed["feed_url"], rformat="text", headers=HEADERS, timeout=60, max_retry=0, silent=True)
+        if not remote_content.get("text"):
+            continue
+        parsed = feedparser.parse(remote_content["text"])  # do not parse feed url, because it doesn't support timeout.
+        for entry in await get_new_entries(feed, parsed):  # old to latest
+            logger.info(f"RSS【{feed_title}】: {entry['title']} {entry['link']}")
+            if true(feed.get("is_social_link")):
+                texts = feed.get("prefix", "") + entry["link"] + feed.get("suffix", "")
+                message = Message(id=rand_number(), chat=Chat(id=0), text=texts)
+                options = (
+                    {
+                        "ytdlp_send_audio": False,
+                        "douyin_comments_provider": False,
+                        "twitter_comments": False,
+                        "bilibili_comments": False,
+                        "youtube_comments": False,
+                        "twitter_provider": "fxtwitter-vxtwitter",
+                        "show_statistics": False,
+                        "need_prefix": False,
+                        "show_progress": False,
+                        "reply_msg_id": -1,
+                    }
+                    | feed.get("options", {})
+                    | {"target_chat": feed["target_chat"]}
+                )
+                await process_message(client, message, **options)
+                records = {
+                    "timestamp": entry["timestamp"],
+                    "feed_title": feed_title,
+                    "title": entry["title"],
+                    "url": entry["link"],
+                    "key": entry["db_key"],
+                }
+                await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
+            else:
+                texts = feed.get("prefix", "")
+                if entry["title"]:
+                    texts += f"[{entry['title']}]({entry['link']})"
+                else:
+                    texts += entry["link"]
+                while "\n\n" in texts:
+                    texts = texts.replace("\n\n", "\n")
+                if feed.get("suffix"):
+                    texts += feed.get("suffix", "")
+                await send2tg(
+                    client,
+                    message=Message(id=rand_number(), chat=Chat(id=0)),
+                    texts=texts,
+                    target_chat=feed["target_chat"],
+                    reply_msg_id=-1,
+                    cooldown=int(feed.get("sleep", 1)),
+                )
+                records = {
+                    "timestamp": entry["timestamp"],
+                    "feed_title": feed_title,
+                    "title": entry["title"],
+                    "url": entry["link"],
+                    "key": entry["db_key"],
+                }
+                await query_d1(**insert_d1(TABLE, records, update_on_conflict="key"), silent=True)
+
+
+async def get_new_entries(feed_config: dict, parsed: dict) -> list[dict]:
+    """Get new entries from feed."""
+    feed_title = feed_config.get("title", "FeedTitle")
+    d1 = await query_d1(sql=f"SELECT timestamp,key FROM {TABLE} WHERE feed_title = '{feed_title}' ORDER BY timestamp DESC LIMIT 100", silent=True)
+    if not d1.get("success"):
+        return []
+    finished_keys = set(glom(d1, "result.**.key", default=[]))
+    if len(finished_keys) == 0:
+        return []
+    try:
+        guid_keys = feed_config.get("guid_key", ["link"])
+        now = nowdt()
+        new_entries = []
+        for entry in sorted(parsed["entries"], key=lambda x: x.get("published", x.get("updated", now)), reverse=False):  # old to latest
+            key = "".join([bare_url(unquote_plus(entry.get(key, ""))) for key in guid_keys])
+            if key in finished_keys:
+                continue
+            # check again
+            d1 = await query_d1(sql=f"SELECT timestamp FROM {TABLE} WHERE key = '{key}'", silent=True)
+            if glom(d1, "result.0.results.0.timestamp", default=0):
+                continue
+            entry["title"] = entry.get("title", "")
+            entry["link"] = https_url(entry.get("link", ""))
+            entry["timestamp"] = round(get_pubdate(entry).timestamp())
+            entry["db_key"] = key
+            new_entries.append(entry)
+        if new_entries:
+            logger.warning(f"New entries for {feed_title}: {len(new_entries)}")
+    except Exception as e:
+        logger.error(f"Failed to get new entries: {e}")
+        new_entries = []
+    limit = int(feed_config.get("limit", 0))
+    if limit > 0:
+        return new_entries[:limit]
+    return new_entries
src/custom/summary_video.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import io
+import re
+from io import BytesIO
+
+from glom import Coalesce, glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message, ReplyParameters
+from pyrogram.types.messages_and_media.message import Str
+
+from ai.main import ai_text_generation
+from asr.voice_recognition import asr_file
+from config import DEVICE_NAME, PREFIX, READING_SPEED, cache
+from custom.config import ACCOUNT_NAME, GROUP_DEV
+from database.memory import del_memory_cache
+from messages.parser import parse_msg
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import blockquote
+from networking import match_social_media_link
+from preview.bilibili import get_bilibili_vinfo
+from preview.youtube import get_youtube_vinfo
+from publish import publish_telegraph
+from subtitles.base import fetch_subtitle
+from utils import count_subtitles, rand_number, readable_time
+from ytdlp.download import ytdlp_download
+
+
+async def summary_videos(client: Client, message: Message, retry: int = 0):
+    if retry > 3:
+        return
+    videogram_channel = -1001627687975
+    if message.chat.id not in [videogram_channel, GROUP_DEV]:
+        return
+    # VPS上的账号不响应 开发Group 的消息
+    if message.chat.id == GROUP_DEV and DEVICE_NAME in ["BennyBot-JP", "BennyBot-US", "BennyBot-CN"]:
+        return
+    this_cid = message.chat.id
+    this_mid = message.id
+    if ACCOUNT_NAME != "bot":
+        return
+    info = parse_msg(message, silent=True)
+    text = " ".join(info["entity_urls"]) + " " + info["text"]
+    matched = await match_social_media_link(text)
+    if matched["platform"] not in ["bilibili", "youtube"]:
+        return
+    del_memory_cache(f"parse_msg-{message.chat.id}-{message.id}")
+    url = matched["url"]
+    vid = matched.get("vid", matched.get("bvid", url))
+    if cache.get(f"summary_videos-{message.chat.id}-{vid}-{retry}"):
+        return
+    cache.set(f"summary_videos-{message.chat.id}-{vid}-{retry}", "1", ttl=120)
+    vinfo = await get_youtube_vinfo(vid) if matched["platform"] == "youtube" else await get_bilibili_vinfo(vid)
+    description = vinfo.get("description", vinfo.get("desc", ""))
+    logger.debug(f"收到{info['mtype']}信息, 尝试从API下载字幕: [{vinfo['title']}]({url})")
+    reference = f"本次转录稿为{matched['platform'].title()}平台作者【{vinfo['author']}】的一期节目。\n该期节目标题: [{vinfo['title']}]({url})\n播出日期: {vinfo['pubdate']}\n节目简介: {description}"
+    subtitle_msg = message
+    if info["mime_type"] == "text/plain":  # 收到的本身就是字幕文件了
+        data: BytesIO = await client.download_media(message, in_memory=True)  # type: ignore
+        subtitles = data.getvalue().decode("utf-8")
+        res = {"num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
+        status = await client.send_message(this_cid, f"✅已下载字幕\n📝{vinfo['title']}\n#️⃣字符数: {res['num_chars']}", reply_parameters=ReplyParameters(message_id=this_mid))
+    else:  # 该消息并非字幕消息, 首先尝试直接通过API下载
+        res = await fetch_subtitle(url, reference)
+        asr_engine = "tencent"
+        if matched["platform"] == "youtube":  # bypass censorship
+            asr_engine = "uncensored"
+        if subtitles := glom(res, Coalesce("full", "subtitles"), default=""):  # API成功获取字幕
+            status = await client.send_message(this_cid, "✅通过API成功获取字幕", reply_parameters=ReplyParameters(message_id=this_mid))
+        elif info["mtype"] == "audio":  # 直接下载音频后ASR
+            logger.warning(f"API下载字幕失败, 直接下载音频后ASR获取字幕: [{vinfo['title']}]({url})")
+            status = await client.send_message(this_cid, f"⚠️API下载字幕失败, 直接下载音频后ASR获取字幕\n📝{vinfo['title']}", reply_parameters=ReplyParameters(message_id=this_mid))
+            fpath: str = await client.download_media(message)  # type: ignore
+            prompt = f"请转录{matched['platform'].title()}视频作者【{vinfo['author']}】的一期节目的音频。\n该期节目标题: {vinfo['title']}\n节目简介: {description}"
+            res.pop("error", None)
+            res = await asr_file(fpath, prompt=prompt, engine=asr_engine, message=message, corrector_reference=reference, silent=True)
+            if res.get("error") or len(res.get("texts", "")) == 0:
+                await modify_progress(message=status, del_status=True)
+                # await summary_videos(client, message, retry + 1)
+                return
+            subtitles = res.get("texts", "")
+            res |= {"num_chars": count_subtitles(subtitles), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
+        else:  # 失败了, 使用ytdlp
+            logger.warning(f"API下载字幕失败, 通过yt-dlp下载音频后ASR获取字幕: [{vinfo['title']}]({url})")
+            status = await client.send_message(this_cid, f"⚠️API下载字幕失败, 通过yt-dlp下载音频后ASR获取字幕\n📝{vinfo['title']}", reply_parameters=ReplyParameters(message_id=this_mid))
+            downloaded = await ytdlp_download(url, matched["platform"], ytdlp_download_video=False)
+            if not downloaded["audio_path"].is_file():
+                await modify_progress(message=status, text="❌下载音频失败", force_update=True)
+                return
+            prompt = f"请转录{matched['platform'].title()}视频作者【{vinfo['author']}】的一期节目的音频。\n该期节目标题: {vinfo['title']}\n节目简介: {description}"
+            res.pop("error", None)
+            res = await asr_file(downloaded["audio_path"], prompt=prompt, engine=asr_engine, message=message, corrector_reference=reference, silent=True)
+            if res.get("error") or len(res.get("texts", "")) == 0:
+                await modify_progress(message=status, del_status=True)
+                # await summary_videos(client, message, retry + 1)
+                return
+            subtitles = res.get("texts", "")
+            res |= {"num_chars": count_subtitles(res["texts"]), "reading_minutes": count_subtitles(subtitles) / READING_SPEED}
+
+        if count_subtitles(subtitles) < 30:
+            await modify_progress(message=status, del_status=True)
+            return
+
+        caption = f"{vinfo['emoji']}[{vinfo['author']}]({vinfo['channel']})\n🕒{vinfo['pubdate']}\n"
+        caption += f"📝[{vinfo['title']}]({url})\n#️⃣字符数: {res['num_chars']}\n⏳阅读时长: {readable_time(60 * res['reading_minutes'])}"
+        html = "\n".join([f"<p>{s}</p>" for s in subtitles.split("\n")])
+        if telegraph_url := await publish_telegraph(title=vinfo["title"], html=html, author=vinfo["author"], url=url):
+            caption += f"\n⚡️[即时预览]({telegraph_url})"
+        with io.BytesIO(subtitles.encode("utf-8")) as f:
+            subtitle_msg = await client.send_document(info["cid"], f, file_name=f"{vinfo['title']}.txt", caption=caption, reply_parameters=ReplyParameters(message_id=info["mid"]))
+        if not isinstance(subtitle_msg, Message):
+            subtitle_msg = message
+
+    subtitles = re.sub(r"(.*?)AI总结(B站版):", "", subtitles, flags=re.DOTALL).strip()  # noqa: RUF001
+    prompt = f"以上是{matched['platform'].title()}视频作者【{vinfo['author']}】的一期节目的文字稿。该期节目详情如下:\n"
+    prompt += f"节目标题: {vinfo['title']}\n发布日期: {vinfo['pubdate']}\n"
+    if description.strip():
+        prompt += f"节目简介: {description}\n"
+    prompt += "\n请解读本期节目内容。要求: 直接输出节目内容解读, 以“该节目讲述了”开头"
+    # Construct a message to call GPT
+    ai_msg = Message(
+        id=subtitle_msg.id,
+        chat=subtitle_msg.chat,
+        text=Str(f"{PREFIX.AI_TEXT_GENERATION} {prompt}"),
+        reply_to_message=Message(id=rand_number(), chat=subtitle_msg.chat, text=Str(subtitles)),
+    )
+    await modify_progress(message=status, text="🤖字幕总结中...", force_update=True)
+    response = await ai_text_generation(client, ai_msg, silent=True)
+    if texts := response.get("texts"):
+        await send2tg(client, subtitle_msg, texts=f"{response['prefix']}{blockquote(texts)}")
+    await modify_progress(message=status, del_status=True)
+
+
+async def get_last_messages(client: Client, chat_id: int, start_id: int = 1, window_size: int = 10) -> list[Message]:
+    """二分法查找chat中最后的消息."""
+    last_mid = cache.get(f"lastmid-{chat_id}", start_id)
+    low = float("inf")
+    high = max(start_id, int(last_mid))
+    window_size = min(window_size, 200)
+    while True:
+        logger.trace(f"Retrieval message of {chat_id=}, mid=[{high}, {high + window_size}]")
+        messages: list[Message] = await client.get_messages(chat_id, message_ids=range(high, high + window_size))  # type: ignore
+        if all(m.empty for m in messages):
+            break
+        low = [m.id for m in messages if not m.empty][-1]
+        high = low * 2
+
+    while low <= high:
+        mid = (low + high) // 2
+        logger.trace(f"二分查找, chat: {chat_id}: {low=}, {mid=}, {high=}")
+        messages = await client.get_messages(chat_id, message_ids=range(mid, mid + window_size))  # type: ignore
+        if all(m.empty for m in messages):
+            high = mid - 1
+        else:
+            low = [m.id for m in messages if not m.empty][-1] + 1
+    last_id = min(low, high)
+
+    messages = await client.get_messages(chat_id, message_ids=range(last_id - window_size, last_id + 1))  # type: ignore
+    valid_msgs = [m for m in messages if not m.empty]
+    cache.set(f"lastmid-{chat_id}", valid_msgs[-1].id, ttl=0)
+    return valid_msgs
src/custom/sync_youtube.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Chat, Message
+from pyrogram.types.messages_and_media.message import Str
+
+from config import TZ, cache
+from custom.config import CHANNEL_YOUTUBE, SYNC_YOUTUBE_FREQUENCY_SECONDS, SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS
+from database.d1 import create_d1_table, insert_d1, query_d1
+from database.kv import get_cf_kv
+from messages.main import preview_social_media
+from podcast.utils import get_pubdate
+from podcast.xml import parse_feed
+from preview.youtube import get_youtube_vinfo
+from utils import nowdt, rand_number
+
+
+async def sync_youtube(client: Client):
+    if os.getenv("SYNC_YOUTUBE_DISABLED", "0") == "1":
+        return
+    if cache.get("sync_youtube"):
+        return
+    cache.set("sync_youtube", 1, ttl=SYNC_YOUTUBE_FREQUENCY_SECONDS)
+    kv = await get_cf_kv("youtube", silent=True)
+    if not kv.get("data"):
+        return
+    await create_d1_table(
+        table_name="youtube",
+        columns="vid TEXT PRIMARY KEY, timestamp INTEGER, channel TEXT, title TEXT, url TEXT, status TEXT",
+        idx_cols=["vid", "timestamp", "channel", "status"],
+        silent=True,
+    )
+    for conf in kv["data"]:
+        await sync_one_channel(client, conf)
+
+
+async def sync_one_channel(client: Client, channel_conf: dict):
+    channel_id = channel_conf["channel_id"]
+    feed = await parse_feed(f"https://cors.zydou.me/https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}")
+    entries = feed.get("entries", [])
+    d1 = await query_d1(sql=f"SELECT * FROM youtube WHERE channel = '{channel_id}' ORDER BY timestamp DESC LIMIT 20", silent=True)
+    if "hx_error" in d1:
+        logger.error(f"【D1】查询YouTube视频失败: {channel_conf['title']} ({d1['hx_error']})")
+        return
+    records = glom(d1, "result.0.results", default=[])
+    await save_entry_to_d1(entries, records)
+    for entry in sorted(records, key=lambda x: x["timestamp"]):  # old to new
+        vid = entry["vid"]
+        if entry["status"] == "done":
+            continue
+        # check again
+        d1 = await query_d1(sql=f"SELECT timestamp FROM youtube WHERE vid = '{vid}'", silent=True)
+        if glom(d1, "result.0.results.0.status", default="") == "done":
+            continue
+        dt = datetime.fromtimestamp(entry["timestamp"], tz=ZoneInfo(TZ))
+        delta = nowdt(TZ) - dt
+        if delta.total_seconds() > SYNC_YOUTUBE_IGNORE_OLD_THAN_HOURS * 3600:
+            continue
+        info = await get_youtube_vinfo(vid)
+        if error := info.get("error_msg"):
+            if error in ["❌无法获取此视频信息", "❌私享视频不可下载"]:
+                entry["status"] = "done"
+                await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
+            logger.error(f"获取视频信息失败: {entry['url']} ({error})")
+            continue
+        logger.warning(f"开始下载【{info['author']}】新视频: {info['title']} ({entry['url']})")
+        params: dict = {
+            "url": entry["url"],
+            "platform": "youtube",
+            "vid": vid,
+            "use_db": True,
+            "ytdlp_download_video": True,
+            "ytdlp_send_video": True,
+            "ytdlp_send_audio": False,
+            "youtube_comments": False,
+            "reply_msg_id": -1,
+            "ytdlp_video_target": CHANNEL_YOUTUBE,
+            "ytdlp_audio_target": CHANNEL_YOUTUBE,
+            "ytdlp_send_subtitle": True,
+            "ytdlp_send_summary": True,
+            "asr_engine": "whisper",
+            "to_telegraph": True,
+            "show_author": True,
+            "show_title": True,
+            "show_pubdate": True,
+            "show_statistics": False,
+            "show_description": False,
+            "show_progress": False,
+        }
+        params.update(channel_conf.get("params", {}))
+        msg = Message(id=rand_number(), chat=Chat(id=rand_number()), text=Str("/dl " + entry["url"]))
+        sent_messages = await preview_social_media(client, msg, **params)
+        if any(m for m in sent_messages if m.video):  # video message is sent
+            logger.success(f"发送完成: {info['title']} ({entry['url']})")
+            entry["status"] = "done"
+            await query_d1(**insert_d1("youtube", entry, update_on_conflict="vid"), silent=True)
+        else:
+            logger.warning(f"下载失败: {info['title']} ({entry['url']})")
+
+
+async def save_entry_to_d1(entries: list[dict], records: list[dict]):
+    saved_vids = {x["vid"] for x in records}
+    for entry in entries:
+        vid = entry["yt_videoid"]
+        if vid in saved_vids:
+            continue
+        record = {
+            "timestamp": round(get_pubdate(entry).timestamp()),
+            "channel": entry["yt_channelid"],
+            "vid": vid,
+            "title": entry["title"],
+            "url": f"https://www.youtube.com/watch?v={vid}",
+            "status": "new",
+        }
+        logger.info(f"【D1】保存YouTube视频: {entry['title']}")
+        await query_d1(**insert_d1("youtube", record, update_on_conflict="vid"), silent=True)
+        records.append(record)
src/custom/tempmail.py
@@ -0,0 +1,28 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+
+async def tempmail(client: Client, message: Message) -> None:
+    if glom(message, "chat.id", default=0) != 182758196:
+        return
+    content = str(message.content)
+
+    # 我发了 username@n8.gs 这样的消息:
+    # 1. 修改用户名为 username
+    # 2. 发送 /start 到 bot
+    if content.endswith("@n8.gs") and glom(message, "from_user.id", default=0) != 182758196:
+        original_username = glom(message, "from_user.username", default="") or "bennydou"
+        username = content.removesuffix("@n8.gs")
+        try:
+            if await client.set_username(username):
+                await client.send_message("@smtpbot", "/me")
+                await asyncio.sleep(1)
+                await client.set_username(original_username)
+        except Exception as e:
+            logger.error(f"【Tempmail】{original_username} 修改用户名为 {username} 失败: {e}")
src/emby/account.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import json
+from pathlib import Path
+
+from httpx import AsyncHTTPTransport
+from loguru import logger
+
+from config import DOWNLOAD_DIR, cache
+from database.kv import get_cf_kv
+from emby.constant import DEVICE_ID, DEVICE_NAME, EMBY_PROXY, HEADERS, VERSION
+from networking import hx_req
+
+
+@cache.memoize(ttl=60)
+async def all_accounts() -> dict[str, dict]:
+    """Get all accounts from cloudflare KV.
+
+    Return:
+        dict[str, dict]:  {"account_name": {"bot": "bot_handle", "server": "https://example.com", "username": "username", "password": "password"}}
+    """
+    kv = await get_cf_kv("emby")
+    return {k: v for k, v in kv.items() if v.get("enable", True)}
+
+
+async def get_account(name: str) -> dict[str, str]:
+    """Get all accounts from cloudflare KV.
+
+    Return:
+        dict[str, str]:  {"bot": "bot_handle", "server": "https://example.com", "username": "username", "password": "password"}
+    """
+    accounts = await all_accounts()
+    return accounts.get(name, {})
+
+
+async def emby_login(account_name: str, *, refresh: bool = False) -> dict:
+    save_path = Path(DOWNLOAD_DIR) / "emby" / f"{account_name}.json"
+    if not refresh and save_path.exists():
+        return json.loads(save_path.read_text())
+    account = await get_account(account_name)  # {"bot": "bot_handle", "server": "https://example.com", "username": "username", "password": "password"}
+    if not account:
+        return {}
+    headers = HEADERS | {
+        "authorization": f'MediaBrowser Client="Hills Windows", Device="{DEVICE_NAME}", DeviceId="{DEVICE_ID}", Version="{VERSION}"',
+    }
+    params = {
+        "X-Emby-Authorization": f'Emby Client="Hills Windows", Device="{DEVICE_NAME}", DeviceId="{DEVICE_ID}", Version="{VERSION}"',
+        "X-Emby-Client": "Hills Windows",
+        "X-Emby-Device-Name": DEVICE_NAME,
+        "X-Emby-Device-Id": DEVICE_ID,
+        "X-Emby-Client-Version": VERSION,
+        "X-Emby-Language": "zh-cn",
+    }
+    resp = await hx_req(
+        f"{account['server']}/emby/System/Info/Public",
+        "GET",
+        headers=headers,
+        transport=AsyncHTTPTransport(),
+        params=params,
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        max_retry=1,
+    )
+    resp = await hx_req(
+        f"{account['server']}/emby/Users/AuthenticateByName",
+        "POST",
+        headers=headers | {"content-type": "application/json"},
+        transport=AsyncHTTPTransport(),
+        params=params,
+        json_data={"Username": account["username"].strip(), "Password": account["password"].strip(), "Pw": account["password"].strip()},
+        check_kv={"SessionInfo.DeviceId": DEVICE_ID},
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        max_retry=1,
+    )
+    if resp.get("hx_error"):
+        logger.error(f"Failed to login {account_name}:\n{resp['hx_error']}")
+        return {}
+    resp["Server"] = account["server"]
+    resp["ServerName"] = account_name
+    save_path.parent.mkdir(parents=True, exist_ok=True)
+    save_path.write_text(json.dumps(resp, ensure_ascii=False, indent=2, sort_keys=True))
+    return resp
src/emby/api.py
@@ -0,0 +1,251 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+
+from httpx import AsyncHTTPTransport
+from loguru import logger
+
+from emby.constant import DEVICE_ID, DEVICE_NAME, DEVICE_PROFILE, EMBY_PROXY, HEADERS, VERSION
+from networking import hx_req
+
+
+def default_headers(credentials: dict) -> dict:
+    return HEADERS | {"Authorization": f'MediaBrowser Client="Hills Windows", Device="{DEVICE_NAME}", DeviceId="{DEVICE_ID}", Version="{VERSION}", Token="{credentials["AccessToken"]}"'}
+
+
+def build_params(credentials: dict, params: dict | None = None, **kwargs) -> dict:
+    if params is None:
+        params = {}
+    params |= {
+        "X-Emby-Authorization": f'Emby Client="Hills Windows", Device="{DEVICE_NAME}", DeviceId="{DEVICE_ID}", Version="{VERSION}"',
+        "X-Emby-Client": "Hills Windows",
+        "X-Emby-Device-Name": DEVICE_NAME,
+        "X-Emby-Device-Id": DEVICE_ID,
+        "X-Emby-Client-Version": VERSION,
+        "X-Emby-Language": "zh-cn",
+    }
+    if credentials.get("AccessToken"):
+        params |= {"X-Emby-Token": credentials["AccessToken"]}
+    return params | kwargs
+
+
+async def get_user(credentials: dict) -> dict:
+    uid = credentials["User"]["Id"]
+    return await hx_req(
+        f"{credentials['Server']}/emby/Users/{uid}",
+        headers=default_headers(credentials),
+        params=build_params(credentials),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        check_kv={"Id": uid},
+        silent=True,
+        max_retry=0,
+    )
+
+
+async def get_resume(credentials: dict) -> dict:
+    uid = credentials["User"]["Id"]
+    params = {
+        "Fields": "ProductionYear,EndDate,Status,ProviderIds",
+        "ImageTypeLimit": 1,
+        "MediaTypes": "Video",
+        "ParentId": "",
+        "Limit": 50,
+        "StartIndex": 0,
+        "X-Emby-Token": credentials["AccessToken"],
+    }
+    return await hx_req(
+        f"{credentials['Server']}/emby/Users/{uid}/Items/Resume",
+        headers=default_headers(credentials),
+        params=build_params(credentials, params),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        check_keys=["Items"],
+        silent=True,
+        max_retry=1,
+    )
+
+
+async def get_item(credentials: dict, item_id: str | int) -> dict:
+    uid = credentials["User"]["Id"]
+    params = {
+        "EnableImageTypes": "Primary,Backdrop,Thumb,Logo",
+        "ImageTypeLimit": 1,
+        "Fields": "ProviderIds,ExternalUrls",
+    }
+    return await hx_req(
+        f"{credentials['Server']}/emby/Users/{uid}/Items/{item_id}",
+        headers=default_headers(credentials),
+        params=build_params(credentials, params),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        check_kv={"Id": item_id},
+        silent=True,
+        max_retry=1,
+    )
+
+
+async def get_items_count(credentials: dict) -> dict:
+    return await hx_req(
+        f"{credentials['Server']}/emby/Items/Counts",
+        headers=default_headers(credentials),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        check_keys=["MovieCount"],
+        silent=True,
+        max_retry=1,
+    )
+
+
+async def get_similar(credentials: dict, item_id: str | int) -> dict:
+    params = {
+        "UserId": credentials["User"]["Id"],
+        "Fields": "ProductionYear,EndDate,Status,CommunityRating,PrimaryImageAspectRatio,RecursiveItemCount,ProviderIds",
+        "Limit": 20,
+        "ImageTypeLimit": 1,
+        "EnableImageTypes": "Primary,Backdrop,Thumb,Logo",
+    }
+    return await hx_req(
+        f"{credentials['Server']}/emby/Items/{item_id}/Similar",
+        headers=default_headers(credentials),
+        params=build_params(credentials, params),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        check_keys=["Items"],
+        silent=True,
+        max_retry=1,
+    )
+
+
+async def get_playback_info(credentials: dict, item_id: str | int) -> dict:
+    return await hx_req(
+        f"{credentials['Server']}/emby/Items/{item_id}/PlaybackInfo",
+        "POST",
+        headers=default_headers(credentials) | {"content-type": "application/json"},
+        params=build_params(credentials, {"IsPlayback": "true"}),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        json_data=DEVICE_PROFILE,
+        check_keys=["PlaySessionId"],
+        silent=True,
+        max_retry=1,
+    )
+
+
+async def remove_from_resume(credentials: dict, item_id: str | int) -> dict:
+    server = credentials["Server"]
+    uid = credentials["User"]["Id"]
+    return await hx_req(
+        f"{server}/emby/Users/{uid}/Items/{item_id}/HideFromResume",
+        "POST",
+        headers=default_headers(credentials) | {"content-length": "0", "content-type": "application/json"},
+        params=build_params(credentials, {"Hide": "true"}),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        check_keys=["PlayCount"],
+        silent=True,
+        max_retry=1,
+    )
+
+
+async def get_items(credentials: dict) -> dict:
+    """Get items from emby server.
+
+    Returns:
+        dict: {item_id: item_dict}
+    """
+    server = credentials["Server"]
+    uid = credentials["User"]["Id"]
+
+    views = await hx_req(
+        f"{server}/emby/Users/{uid}/Views",
+        headers=default_headers(credentials),
+        params=build_params(credentials),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=5,
+        check_keys=["Items"],
+        silent=True,
+        max_retry=1,
+    )
+    if views.get("hx_error"):
+        logger.error(views.get("hx_error"))
+        return {}
+    parent_ids = [x["Id"] for x in views["Items"] if x.get("CollectionType", "").lower() in ["movies", "tvshows"]]
+    items = {}  # {item_id: item_dict}
+    for parent_id in parent_ids:
+        params = {
+            "ParentId": parent_id,
+            "Limit": 20,
+            "ImageTypeLimit": "1",
+            "EnableImageTypes": "Primary,Backdrop,Thumb,Logo",
+            "Fields": "ProductionYear,EndDate,Status,PrimaryImageAspectRatio,CommunityRating,RecursiveItemCount,ProviderIds",
+        }
+        resp: list[dict] = await hx_req(
+            f"{server}/emby/Users/{uid}/Items/Latest",
+            headers=default_headers(credentials),
+            params=build_params(credentials, params),
+            transport=AsyncHTTPTransport(),
+            proxy=EMBY_PROXY,
+            verify=False,
+            timeout=5,
+            check_keys=["0.Id"],
+            silent=True,
+            max_retry=1,
+        )  # type: ignore
+        if "hx_error" in resp:
+            continue
+        for item in resp:
+            if item.get("MediaType") != "Video" or not item.get("Id") or not item.get("RunTimeTicks"):
+                continue
+            items[item["Id"]] = item
+
+    if not items and parent_ids:
+        logger.warning("无法获取最新视频, 尝试从文件夹中读取.")
+        for parent_id in parent_ids:
+            await asyncio.sleep(4)
+            params = {
+                "EnableImageTypes": "Primary,Backdrop,Thumb,Logo",
+                "Fields": "BasicSyncInfo,CanDelete,PrimaryImageAspectRatio,ProductionYear",
+                "ImageTypeLimit": 1,
+                "IncludeItemTypes": "Movie",
+                "Limit": 50,
+                "ParentId": parent_id,
+                "Recursive": "true",
+                "SortBy": "SortName",
+                "SortOrder": "Ascending",
+                "StartIndex": 0,
+            }
+            resp: list[dict] = await hx_req(
+                f"{server}/emby/Users/{uid}/Items",
+                headers=default_headers(credentials),
+                params=build_params(credentials, params),
+                transport=AsyncHTTPTransport(),
+                proxy=EMBY_PROXY,
+                verify=False,
+                timeout=5,
+                silent=True,
+                max_retry=1,
+            )  # type: ignore
+            if "hx_error" in resp:
+                continue
+            for item in resp:
+                if item.get("MediaType") != "Video" or not item.get("Id") or not item.get("RunTimeTicks"):
+                    continue
+                items[item["Id"]] = item
+    return items
src/emby/checkin.py
@@ -0,0 +1,297 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import contextlib
+import json
+from pathlib import Path
+
+from glom import flatten, glom
+from google import genai
+from google.genai import types
+from google.genai.types import FileState, Part
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Chat, Message
+from pyrogram.types.messages_and_media.message import Str
+
+from ai.main import ai_text_generation
+from ai.utils import literal_eval
+from config import AI, PREFIX, PROXY, TZ, cache
+from custom.config import ACCOUNT_NAME
+from emby.account import all_accounts
+from messages.utils import delete_message
+from utils import nowdt, rand_number, strings_list
+
+MODEL_ID = "gemini-flash-latest"  # bypass captcha
+SKIP_CHECKIN_BOTS = {
+    "emby_hohai",
+    "EmbyPublicBot",  # 终点站, 已弃用
+    "sntp_lite_emby_bot",
+    "embyhellobot",
+    "yeziypsy_bot",
+    "liminallitebot",
+}
+
+
+async def daily_checkin_emby(client: Client) -> None:
+    if ACCOUNT_NAME not in ["xiaohao", "benny"]:
+        return
+    accounts = await all_accounts()
+    all_bots = glom(accounts, "*.bot")
+
+    benny_accounts = {"sntp_lite_emby_bot", "lilyembybot", "shrekpublic_bot"}
+    xiaohao_bots = {x for x in all_bots if x not in benny_accounts}
+
+    checkin_bots = benny_accounts if ACCOUNT_NAME == "benny" else xiaohao_bots
+    for bot_name in checkin_bots - SKIP_CHECKIN_BOTS:
+        if not bot_name.endswith("bot"):
+            continue
+        logger.info(f"Emby daily checkin start: {bot_name}")
+        with contextlib.suppress(Exception):
+            await client.send_message(f"@{bot_name}", "/start")
+            logger.success(f"Emby daily checkin done: {bot_name}")
+            await asyncio.sleep(1)
+
+
+async def checkin_emby(client: Client, message: Message) -> None:
+    # 只处理bot消息
+    if not glom(message, "from_user.is_bot", default=False) and glom(message, "chat.type.name", default="") != "BOT":
+        return
+    if str(message.text).startswith("🎉 签到成功"):
+        await delete_message(message)
+
+    # 回答签到问题 (无按钮)
+    await checkin_number_question(client, message)
+    await checkin_poetry(client, message)
+
+    # 只处理带按钮消息
+    reply_markup = glom(message, "reply_markup.inline_keyboard.**.callback_data", default=[])
+    reply_markup = [x for x in reply_markup if isinstance(x, str)]
+    if not reply_markup:
+        return
+
+    checkin_button = next((x for x in reply_markup if "checkin" in x or "check1n" in x), None)
+    bot_name = glom(message, "from_user.username", default="")
+    if bot_name == "EmbyPublicBot" and message.photo:
+        await checkin_terminus(client, message, reply_markup)
+    elif checkin_button and nowdt(TZ).hour in [6, 7]:
+        await client.request_callback_answer(message.chat.id, message.id, callback_data=checkin_button)
+
+
+async def checkin_terminus(client: Client, message: Message, reply_markup: list[str]) -> None:
+    """@EmbyPublicBot 终点站.
+
+    reply_markup:
+    ["checkin-选项A", "checkin-选项B", "checkin-选项C"]
+    """
+    options = [x.removeprefix("checkin-").strip() for x in reply_markup]
+    if not options:
+        return
+    fpath: str = await client.download_media(message)  # type: ignore
+    if not Path(fpath).exists():
+        return
+    for api_key in strings_list(AI.GEMINI_API_KEYS, shuffle=True):
+        try:
+            app = genai.Client(
+                api_key=api_key,
+                http_options=types.HttpOptions(
+                    base_url=AI.GEMINI_BASE_URL,
+                    headers=literal_eval(AI.GEMINI_DEFAULT_HEADERS),
+                    async_client_args={"proxy": PROXY.GOOGLE},
+                ),
+            )
+            photo = await app.aio.files.upload(file=fpath)
+            while photo.state == FileState.PROCESSING:
+                logger.trace("Waiting for upload to complete...")
+                await asyncio.sleep(1)
+                photo = await app.aio.files.get(name=photo.name)  # type: ignore
+            if photo.state == FileState.ACTIVE and photo.uri:
+                response = await app.aio.models.generate_content(
+                    model=MODEL_ID,
+                    contents=[Part.from_uri(file_uri=photo.uri, mime_type=photo.mime_type), Part.from_text(text="请识别图中的物体")],
+                    config={
+                        "response_mime_type": "application/json",
+                        "response_schema": {
+                            "type": "STRING",
+                            "enum": options,
+                        },
+                    },
+                )
+                await app.aio.aclose()
+                answer = glom(response, "candidates.0.content.parts.0.text", default="").strip('" ')
+                if answer in options:
+                    logger.success(f"终点站 answer: checkin-{answer}")
+                    await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin-{answer}")
+                    Path(fpath).unlink(missing_ok=True)
+                    return
+                logger.warning(f"终点站 wrong answer: {response}")
+        except Exception as e:
+            logger.exception(e)
+
+
+async def checkin_number_question(client: Client, message: Message) -> None:
+    if not str(message.text).startswith("🎯 每日签到\n"):
+        return
+    if nowdt(TZ).hour not in [6, 7]:
+        return
+    if glom(message, "from_user.username", default="").lower() not in {"es666_bot", "fouremby_bot"}:
+        return
+    query = str(message.text).removeprefix("🎯 每日签到\n")
+    query = query.removesuffix("⏱ 有效期: 1 分钟").strip()
+    ai_msg = Message(id=rand_number(), chat=Chat(id=0), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @emby {query}"))
+    ai_res = await ai_text_generation(
+        "fake-client",  # type: ignore
+        ai_msg,
+        silent=True,
+        gemini_generate_content_config={
+            "tools": [{"code_execution": {}}],
+            "responseMimeType": "application/json",
+            "responseJsonSchema": {
+                "title": "Math Question",
+                "type": "object",
+                "strict": True,
+                "properties": {"answer": {"type": "integer"}},
+                "required": ["answer"],
+                "additionalProperties": False,
+            },
+        },
+        openai_responses_config={
+            "tools": [{"type": "web_search"}],
+            "text": {
+                "format": {
+                    "type": "json_schema",
+                    "name": "MathQuestion",
+                    "strict": True,
+                    "description": "A simple math question.",
+                    "schema": {
+                        "title": "Math Question",
+                        "type": "object",
+                        "properties": {"answer": {"type": "integer"}},
+                        "required": ["answer"],
+                        "additionalProperties": False,
+                    },
+                }
+            },
+        },
+        openai_completions_config={
+            "response_format": {
+                "type": "json_schema",
+                "strict": True,
+                "json_schema": {
+                    "name": "MathQuestion",  # name must match ^[a-zA-Z0-9_-]+$
+                    "schema": {
+                        "type": "object",
+                        "properties": {"answer": {"type": "integer"}},
+                        "required": ["answer"],
+                        "additionalProperties": False,
+                    },
+                    "strict": True,
+                },
+            }
+        },
+    )
+    if not ai_res.get("texts"):
+        logger.error(f"【Emby】数学题签到: {ai_res}")
+        return
+    logger.success(f"【Emby】数学题签到: {ai_res['texts']}")
+    answer = json.loads(ai_res["texts"])["answer"]
+    await client.send_message(message.chat.id, text=str(answer))
+
+
+async def checkin_poetry(client: Client, message: Message) -> None:
+    # ruff: noqa: RUF001
+    if not str(message.content).startswith("🌸 防机器签到验证 🌸\n"):
+        return
+    # if nowdt(TZ).hour not in [6, 7]:
+    #     return
+    bot_name = glom(message, "from_user.username", default="")
+    if bot_name.lower() != "niubi2233_bot":
+        return
+    query = str(message.content).removeprefix("🌸 防机器签到验证 🌸").strip()
+    query = query.replace("░", "?").strip().replace("请依次点击下方按钮补全诗句:", "你是专业的诗词研究专家,核心任务是依据提供的诗词文本,精准补全其中缺失的汉字。以下是需要补全的诗句:")
+    query += '\n请根据你掌握的诗词知识,准确填充诗句中“?”处缺失的汉字。\n注意:必须严格匹配原诗词的用字,不得自行创作或修改。\n最终结果请以数组格式返回,数组元素为按缺失顺序排列的汉字。例如,若诗句为“?前明月?,疑是地上霜”,则返回 ["床", "光"]'
+    reply_markup = glom(message, "reply_markup.inline_keyboard.**.callback_data", default=[])
+    reply_markup = [x for x in reply_markup if isinstance(x, str)]
+    logger.debug(f"【Emby】诗词题签到: {reply_markup}")
+    if not reply_markup:
+        return
+
+    if chars := cache.get(f"checkin_poetry_{bot_name}"):  # 缓存中存在字符
+        if f"checkin_shici_{chars[0]}" not in reply_markup:
+            logger.error(f"【Emby】诗词题签到: {chars}")
+            return
+        if len(chars) == 1:  # 只用填一个字, 直接返回
+            await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
+            cache.delete(f"checkin_poetry_{bot_name}")  # 删除缓存
+            return
+        cache.set(f"checkin_poetry_{bot_name}", chars[1:], ttl=60)  # 缓存后面的字
+        await asyncio.sleep(1)  # 等待1秒, 确保缓存生效
+        await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
+
+    ai_msg = Message(id=rand_number(), chat=Chat(id=0), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @emby {query}"))
+    ai_res = await ai_text_generation(
+        "fake-client",  # type: ignore
+        ai_msg,
+        silent=True,
+        gemini_generate_content_config={
+            "tools": [{"google_search": {}}],
+            "responseMimeType": "application/json",
+            "responseJsonSchema": {
+                "title": "Fill Poetry",
+                "type": "array",
+                "items": {"type": "string"},
+                "description": "The missing characters in the poetry.",
+            },
+        },
+        openai_responses_config={
+            "tools": [{"type": "web_search"}],
+            "text": {
+                "format": {
+                    "type": "json_schema",
+                    "name": "FillPoetry",
+                    "strict": True,
+                    "description": "Fill the missing characters in the poetry.",
+                    "schema": {
+                        "title": "Fill Poetry",
+                        "type": "array",
+                        "items": {"type": "string"},
+                        "description": "The missing characters in the poetry.",
+                    },
+                }
+            },
+        },
+        openai_completions_config={
+            "response_format": {
+                "type": "json_schema",
+                "strict": True,
+                "json_schema": {
+                    "name": "FillPoetry",  # name must match ^[a-zA-Z0-9_-]+$
+                    "schema": {
+                        "type": "array",
+                        "items": {"type": "string"},
+                        "description": "The missing characters in the poetry.",
+                    },
+                    "strict": True,
+                },
+            }
+        },
+    )
+    if not ai_res.get("texts"):
+        logger.error(f"【Emby】诗词题签到: {ai_res}")
+        return
+    logger.success(f"【Emby】诗词题签到: {ai_res['texts']}")
+    chars = json.loads(ai_res["texts"])
+    if not chars:
+        logger.error(f"【Emby】诗词题签到: {chars}")
+        return
+    chars = flatten(chars)
+    # 回答第一个字
+    if f"checkin_shici_{chars[0]}" not in reply_markup:
+        logger.error(f"【Emby】诗词题签到: {chars}")
+        return
+    if len(chars) == 1:  # 只用填一个字, 直接返回
+        await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
+        return
+    cache.set(f"checkin_poetry_{bot_name}", chars[1:], ttl=60)  # 缓存后面的字
+    await asyncio.sleep(1)  # 等待1秒, 确保缓存生效
+    await client.request_callback_answer(message.chat.id, message.id, callback_data=f"checkin_shici_{chars[0]}")
src/emby/constant.py
@@ -0,0 +1,66 @@
+import os
+
+EMBY_PROXY = os.getenv("EMBY_PROXY", None)
+EMBY_KEEPALIVE_SECONDS = os.getenv("EMBY_KEEPALIVE_SECONDS", "300,600")  # "start,end" 保活时长
+
+VERSION = "0.2.3"
+UA = f"Hills Windows/{VERSION} (windows; 26100.ge_release.240331-1435)"
+HEADERS = {"user-agent": UA, "accept": "*/*", "accept-encoding": "gzip, br"}
+DEVICE_ID = "ccf4470161bb48d09dd6228b2b0d0e8b"  # str(uuid.uuid4()).replace("-", "")
+DEVICE_NAME = "Benny"
+ITEM_TYPE = {"Series": "📽", "Movie": "🎬"}
+DEVICE_PROFILE = {
+    "DeviceProfile": {
+        "MaxStaticBitrate": 200000000,
+        "MaxStreamingBitrate": 200000000,
+        "MusicStreamingTranscodingBitrate": 200000000,
+        "DirectPlayProfiles": [{"Type": "Video"}, {"Type": "Audio"}],
+        "TranscodingProfiles": [
+            {
+                "Container": "ts",
+                "Type": "Video",
+                "AudioCodec": "aac,mp3,wav,ac3,eac3,flac,opus",
+                "VideoCodec": "hevc,h264,h265,mpeg4",
+                "Context": "Streaming",
+                "Protocol": "hls",
+                "MaxAudioChannels": "6",
+                "MinSegments": "1",
+                "BreakOnNonKeyFrames": True,
+                "ManifestSubtitles": "vtt",
+            }
+        ],
+        "ContainerProfiles": [],
+        "SubtitleProfiles": [
+            {"Format": "vtt", "Method": "External"},
+            {"Format": "ass", "Method": "External"},
+            {"Format": "ssa", "Method": "External"},
+            {"Format": "srt", "Method": "External"},
+            {"Format": "sub", "Method": "External"},
+            {"Format": "subrip", "Method": "External"},
+            {"Format": "smi", "Method": "External"},
+            {"Format": "ttml", "Method": "External"},
+            {"Format": "webvtt", "Method": "External"},
+            {"Format": "dvdsub", "Method": "External"},
+            {"Format": "dvdsub", "Method": "Embed"},
+            {"Format": "vobsub", "Method": "Embed"},
+            {"Format": "vtt", "Method": "Embed"},
+            {"Format": "ass", "Method": "Embed"},
+            {"Format": "ssa", "Method": "Embed"},
+            {"Format": "srt", "Method": "Embed"},
+            {"Format": "sub", "Method": "Embed"},
+            {"Format": "pgssub", "Method": "Embed"},
+            {"Format": "pgs", "Method": "Embed"},
+            {"Format": "subrip", "Method": "Embed"},
+            {"Format": "smi", "Method": "Embed"},
+            {"Format": "ttml", "Method": "Embed"},
+            {"Format": "webvtt", "Method": "Embed"},
+            {"Format": "mov_text", "Method": "Embed"},
+            {"Format": "dvb_teletext", "Method": "Embed"},
+            {"Format": "dvb_subtitle", "Method": "Embed"},
+            {"Format": "dvbsub", "Method": "Embed"},
+            {"Format": "idx", "Method": "Embed"},
+            {"Format": "vtt", "Method": "Hls"},
+            {"Format": "vtt", "Method": "Hls"},
+        ],
+    }
+}
src/emby/keepalive.py
@@ -0,0 +1,99 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import contextlib
+import json
+import random
+from pathlib import Path
+
+from glom import glom
+from loguru import logger
+
+from config import DOWNLOAD_DIR, cache
+from database.r2 import list_cf_r2, set_cf_r2
+from emby.account import all_accounts, emby_login
+from emby.api import get_items, get_user, remove_from_resume
+from emby.constant import EMBY_KEEPALIVE_SECONDS
+from emby.play import play_item
+from utils import strings_list, to_int
+
+
+async def keepalive_emby():
+    if cache.get("keepalive_emby"):
+        return
+    cache.set("keepalive_emby", "1", 4 * 3600)  # every 4 hours
+    accounts = await all_accounts()
+    r2 = await list_cf_r2("TTL/")
+    keys = glom(r2, "Contents.*.Key", default=[]) or []
+    for acc_name, config in accounts.items():
+        keepalive_day = to_int(config.get("keepalive", 0))
+        if not isinstance(keepalive_day, int) or keepalive_day <= 0:
+            continue
+        if next((x for x in keys if x.endswith(f"/emby/{acc_name}")), None):
+            continue
+        with contextlib.suppress(Exception):
+            await keepalive_single(acc_name, config)
+
+
+async def keepalive_single(acc_name: str, config: dict):
+    credentials = await emby_login(acc_name)
+    if not credentials:
+        return
+    refresh_credentials(acc_name, config, credentials)
+    user = await get_user(credentials)
+    if user.get("hx_error"):
+        logger.warning(f"【{acc_name}】服务器错误: {user.get('hx_error')}")
+        return
+    items = await get_items(credentials)
+    if not items:
+        logger.warning(f"【{acc_name}】保活失败: 无法获取首页中的视频项目")
+        return
+    duration = [int(x) for x in strings_list(EMBY_KEEPALIVE_SECONDS)]
+    need_seconds = random.uniform(*duration)
+    logger.debug(f"【{acc_name}】成功获取{len(items)}个首页视频项目, 共需播放 {need_seconds:.0f} 秒")
+
+    played_seconds = 0
+    num_played = 0
+    failed_items = []
+
+    while True:
+        if played_seconds >= need_seconds - 1:
+            break
+        shuffled_items = list(items.items())
+        random.shuffle(shuffled_items)
+
+        for item_id, item in shuffled_items:
+            if item_id in failed_items:
+                continue
+            total_seconds = item["RunTimeTicks"] / 10000000
+            play_seconds = total_seconds if need_seconds - played_seconds > total_seconds else max(need_seconds - played_seconds, 10)
+            logger.trace(f"【{acc_name}】开始播放《{item.get('Name', '(未命名视频)')}》({play_seconds:.0f} 秒).")
+            succ = await play_item(credentials, acc_name, item, play_seconds)
+            await asyncio.sleep(random.uniform(5, 10))
+            await remove_from_resume(credentials, item_id)
+            if not succ:
+                failed_items.append(item_id)
+                continue
+            num_played += 1
+            played_seconds += play_seconds
+            if played_seconds >= need_seconds - 1:
+                logger.success(f"【{acc_name}】保活成功, 共播放 {num_played} 个视频.")
+                keepalive_hour = int(config.get("keepalive", 0)) * 24 + random.randint(-24, 24)
+                await set_cf_r2(f"TTL/{max(1, keepalive_hour)}h/emby/{acc_name}", "1")
+                break
+            logger.debug(f"【{acc_name}】还需播放 {need_seconds - played_seconds:.0f} 秒.")
+            rt = random.uniform(5, 15)
+            logger.debug(f"【{acc_name}】等待 {rt:.0f} 秒后播放下一个.")
+            await asyncio.sleep(rt)
+        if len(failed_items) == len(items):
+            logger.warning(f"【{acc_name}】所有视频均播放失败, 保活失败. ")
+            return
+
+
+def refresh_credentials(acc_name: str, acc_config: dict, credentials: dict):
+    if acc_config.get("server") != credentials.get("Server"):
+        logger.warning(f"【{acc_name}】服务器地址已更改, 更新本地凭据.")
+        credentials["Server"] = acc_config["server"]
+        save_path = Path(DOWNLOAD_DIR) / "emby" / f"{acc_name}.json"
+        save_path.parent.mkdir(parents=True, exist_ok=True)
+        save_path.write_text(json.dumps(credentials, ensure_ascii=False, indent=2, sort_keys=True))
src/emby/main.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+from typing import Literal
+from urllib.parse import urlparse
+
+from pyrogram.types import Message
+
+from config import DEVICE_NAME
+from custom.config import CHANNEL_EMBY_SEARCH, GROUP_DEV, USER_BENNY, USER_XIAOHAO
+from database.kv import get_cf_kv, set_cf_kv
+from emby.account import all_accounts, emby_login
+from emby.api import get_items_count
+from emby.constant import ITEM_TYPE
+from emby.search import emby_search
+from messages.progress import modify_progress
+from messages.utils import blockquote, delete_message, remove_prefix, startswith_prefix
+from utils import strings_list, to_int, true
+
+CREDENTIAL_HEADER = "emby,enable,bot,server,user,pass,keepalive,nsfw"
+
+
+async def emby_entrypoint(message: Message):
+    """Emby相关."""
+    cid = message.chat.id
+    # VPS上的账号不响应 开发Group 的消息
+    if cid == GROUP_DEV and DEVICE_NAME in ["BennyBot-JP", "BennyBot-US", "BennyBot-CN"]:
+        return
+
+    if str(message.text).startswith(CREDENTIAL_HEADER):
+        if await set_credentials(message):
+            await delete_message(message)
+        return
+
+    if not (cid == CHANNEL_EMBY_SEARCH or startswith_prefix(message.content, prefix="/emby")):
+        return
+    if message.content == "/emby":
+        await count_items(message)
+        return
+    if message.content == "/embyget":
+        if message.from_user.id in [USER_BENNY, USER_XIAOHAO]:
+            await show_credentials(message, formats="text")
+        else:
+            await message.reply("❌权限不足", quote=True)
+        return
+    if message.content == "/embyset":
+        if message.from_user.id in [USER_BENNY, USER_XIAOHAO]:
+            await show_credentials(message, formats="csv")
+        else:
+            await message.reply("❌权限不足", quote=True)
+        return
+
+    # search emby
+    query = remove_prefix(message.content, prefix="/emby")
+    if not query:
+        return
+    status = await message.reply(f"🔍正在搜索: {query}", quote=True)
+    if cid == CHANNEL_EMBY_SEARCH:
+        summary, _, telegraph_url = await emby_search(query, allow_nsfw=True)
+    else:
+        summary, _, telegraph_url = await emby_search(query)
+    if not summary:
+        await modify_progress(status, text="❌没有找到任何结果", force_update=True)
+        return
+    summary = f"⚡️[即时预览]({telegraph_url})\n{blockquote(summary)}" if telegraph_url else blockquote(summary)
+    await modify_progress(status, text=summary, force_update=True)
+
+
+async def count_items(message: Message):
+    async def count(acc_name: str, config: dict, *, refresh: bool = False) -> str:
+        credentials = await emby_login(acc_name, refresh=refresh)
+        if not credentials:
+            return f"🤖[{acc_name}](t.me/{config['bot']}): ❌"
+        resp = await get_items_count(credentials)
+        if resp.get("hx_error"):
+            return await count(acc_name, config, refresh=True) if not refresh else f"🤖[{acc_name}](t.me/{config['bot']}): ❌"
+        return f"🤖[{acc_name}](t.me/{config['bot']}): {ITEM_TYPE['Movie']}{resp['MovieCount']} {ITEM_TYPE['Series']}{resp['EpisodeCount']}"
+
+    accounts = await all_accounts()
+    tasks = [count(acc_name, config) for acc_name, config in accounts.items()]
+    res = await asyncio.gather(*tasks)
+    await message.reply("\n".join(res), quote=True)
+
+
+async def show_credentials(message: Message, formats: Literal["text", "csv"] = "text"):
+    if formats == "text":
+        accounts = await all_accounts()  # {"account_name": {"bot": "bot_handle", "server": "https://example.com", "username": "username", "password": "password"}}
+        msg = "⚠️除非特别说明, 否则所有服务器协议均为 **HTTPS**, 端口均为 **443**\n"
+        no_proto = ""
+        with_proto = ""
+        for acc_name, config in accounts.items():
+            r18 = "🔞" if config.get("nsfw") else ""
+            parsed = urlparse(config["server"])
+            no_proto += f"\n【{r18}`{acc_name}`】"
+            with_proto += f"\n【{r18}`{acc_name}`】"
+            if parsed.scheme == "http":
+                no_proto += "\n⚠️协议: **HTTP**"
+            no_proto += f"\n服务器: `{parsed.hostname}`"
+            with_proto += f"\n服务器: `{parsed.scheme}://{parsed.hostname}`"
+            if parsed.port is not None and parsed.port != 443:
+                no_proto += f"\n⚠️端口: `{parsed.port}`"
+                with_proto = with_proto[:-1] + f":{parsed.port}`"
+            no_proto += f"\n用户名: `{config['username']}`\n密码: `{config['password']}`\n"
+            with_proto += f"\n用户名: `{config['username']}`\n密码: `{config['password']}`\n"
+        await message.reply(msg + no_proto, quote=True)
+        await message.reply(msg + with_proto, quote=True)
+
+    elif formats == "csv":
+        kv = await get_cf_kv("emby")
+        msg = CREDENTIAL_HEADER
+        for name, c in sorted(kv.items(), key=lambda x: x[1]["enable"], reverse=True):
+            msg += f"\n{name},{c['enable']},@{c['bot']},{c['server']},{c['username']},{c['password']},{c.get('keepalive', '0')},{c.get('nsfw', '')}"
+        msg += f"\n{CREDENTIAL_HEADER}"
+        await message.reply(msg, quote=True)
+
+
+async def set_credentials(message: Message) -> bool:
+    if message.from_user.id not in [USER_BENNY, USER_XIAOHAO]:
+        return False
+    texts = message.text.split("\n")[1:]
+    credentials = {}
+    for line in texts:
+        name, enable, bot, server, user, passwd, keepalive, nsfw = line.split(",")
+        if name == strings_list(CREDENTIAL_HEADER)[0]:
+            continue
+        keepalive = to_int(keepalive)
+        credentials[name] = {
+            "enable": true(enable),
+            "bot": bot.lstrip("@").strip(),
+            "server": server.strip(),
+            "username": user.strip(),
+            "password": passwd.strip(),
+            "keepalive": keepalive,
+            "nsfw": true(nsfw),
+        }
+        if not isinstance(keepalive, int) or keepalive <= 0:
+            del credentials[name]["keepalive"]
+        if not true(nsfw):
+            del credentials[name]["nsfw"]
+    if credentials:
+        return await set_cf_kv("emby", credentials, silent=True)
+    return False
src/emby/play.py
@@ -0,0 +1,188 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import random
+import string
+import time
+from contextlib import suppress
+
+from glom import glom
+from httpx import AsyncClient, AsyncHTTPTransport
+from loguru import logger
+
+from emby.api import build_params, default_headers, get_item, get_playback_info, get_similar
+from emby.constant import EMBY_PROXY, UA
+from networking import hx_req
+from utils import nowdt
+
+
+async def play_item(credentials: dict, server_name: str, item: dict, play_seconds: float = 10) -> bool:
+    item_id = item["Id"]
+    await get_item(credentials, item_id)
+    await get_similar(credentials, item_id)
+
+    playback_info = await get_playback_info(credentials, item_id)
+    if playback_info.get("hx_error"):
+        return False
+
+    play_session_id = playback_info["PlaySessionId"]
+    random_id = "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(32))
+    media_source_id = glom(playback_info, "MediaSources.0.Id", default=random_id)
+    direct_stream_url = glom(playback_info, "MediaSources.0.DirectStreamUrl", default=None)
+    headers = default_headers(credentials) | {"accept": "application/json, text/plain, */*", "content-type": "application/json", "accept-encoding": "gzip, deflate", "accept-language": "en-US,*"}
+
+    def get_playing_data(tick: int, *, init: bool = False, start: bool = False, update: bool = False, stop: bool = False) -> dict:
+        data = {
+            "AudioStreamIndex": -1,
+            "CanSeek": "true",
+        }
+
+        if start:
+            data["EventName"] = "unpause"
+        elif update:
+            data["EventName"] = "timeupdate"
+        elif stop:
+            data["EventName"] = "pause"
+        return data | {
+            "IsMuted": "false",
+            "IsPaused": str(init or start or stop).lower(),
+            "ItemId": str(item_id),
+            "MaxStreamingBitrate": 140000000,
+            "MediaSourceId": str(media_source_id),
+            "PlayMethod": "DirectStream",
+            "PlaySessionId": str(play_session_id),
+            "PlaybackRate": 1,
+            "PlaybackStartTimeTicks": 0,
+            "PlaylistIndex": 0,
+            "PlaylistLength": 1,
+            "PositionTicks": tick,
+            "RepeatMode": "RepeatNone",
+            "Shuffle": "false",
+            "SubtitleOffset": 0,
+            "SubtitleStreamIndex": -1,
+            "VolumeLevel": 100,
+        }
+
+    async def stream():
+        stream_url = direct_stream_url or f"/Videos/{item_id}/stream"
+        length = 0
+        last_err_time = nowdt()
+        speed_limit = 2 * 1024 * 1024  # 2 MB
+        while True:
+            async with (
+                AsyncClient(headers=headers, proxy=EMBY_PROXY, verify=False, transport=AsyncHTTPTransport(), timeout=5) as client,  # noqa: S501
+                client.stream(
+                    "GET",
+                    f"{credentials['Server']}/emby/{stream_url}",
+                    follow_redirects=True,
+                    headers={"User-Agent": UA, "Accept": "*/*", "Range": f"bytes={length}-", "Icy-MetaData": "1"},
+                ) as resp,
+            ):
+                total_downloaded = 0
+                start_time = time.monotonic()
+                try:
+                    async for chunk in resp.aiter_bytes(chunk_size=1024):
+                        length += len(chunk)
+                        total_downloaded += len(chunk)
+                        del chunk
+                        expected_time = total_downloaded / speed_limit
+                        elapsed_time = time.monotonic() - start_time
+                        sleep_time = expected_time - elapsed_time
+                        if sleep_time > 0:
+                            await asyncio.sleep(sleep_time)
+                        await asyncio.sleep(random.random())
+                        if random.random() < 0.01:
+                            continue
+                except Exception as e:
+                    if (nowdt() - last_err_time).total_seconds() > 5:
+                        logger.debug(f"【{server_name}】《{item['Name']}》流媒体文件访问错误, 正在重试. {e}")
+                        last_err_time = nowdt()
+                        continue
+                finally:
+                    await resp.aclose()
+
+    stream_task = asyncio.create_task(stream())
+    rt = random.uniform(5, 10)
+    logger.trace(f"【{server_name}】《{item['Name']}》播放视频前等待 {rt:.0f} 秒")
+    await asyncio.sleep(rt)
+    succ = False
+    try:
+        resp = await hx_req(
+            f"{credentials['Server']}/emby/Sessions/Playing",
+            "POST",
+            headers=headers,
+            params=build_params(credentials, params={"UserId": credentials["User"]["Id"]}, reqformat="json"),
+            json_data=get_playing_data(0),
+            transport=AsyncHTTPTransport(),
+            proxy=EMBY_PROXY,
+            verify=False,
+            rformat="content",
+            timeout=5,
+            silent=True,
+        )
+        if resp.get("hx_error"):
+            logger.error(f"【{server_name}】《{item['Name']}》无法开始播放: {resp['hx_error']}")
+            return False
+        t = play_seconds
+
+        last_report_t = t
+        progress_errors = 0
+        report_interval = 10  # seconds
+        while t > 0:
+            if progress_errors > 20:
+                logger.error(f"【{server_name}】《{item['Name']}》播放状态设定错误次数过多")
+                return False
+            if last_report_t and last_report_t - t > report_interval:
+                logger.trace(f"【{server_name}】《{item['Name']}》正在播放 (还剩 {t:.0f} 秒).")
+                last_report_t = t
+            st = min(10, t)
+            await asyncio.sleep(st)
+            t -= st
+            tick = int((play_seconds - t) * 10000000)
+            try:
+                resp = await asyncio.wait_for(
+                    hx_req(
+                        f"{credentials['Server']}/emby/Sessions/Playing/Progress",
+                        "POST",
+                        headers=headers,
+                        json_data=get_playing_data(tick, update=True),
+                        transport=AsyncHTTPTransport(),
+                        proxy=EMBY_PROXY,
+                        verify=False,
+                        rformat="content",
+                        timeout=5,
+                        silent=True,
+                    ),
+                    10,
+                )
+                logger.info(f"【{server_name}】《{item['Name']}》回传成功")
+                if 200 <= int(resp.get("status_code", 0)) < 300:
+                    succ = True
+            except Exception:
+                logger.debug(f"【{server_name}】《{item['Name']}》播放状态设定错误")
+                progress_errors += 1
+        await asyncio.sleep(random.uniform(1, 3))
+    finally:
+        stream_task.cancel()
+        with suppress(asyncio.CancelledError):
+            await stream_task
+
+    try:
+        final_percentage = random.uniform(0.95, 1.0)
+        final_tick = int((play_seconds * final_percentage) // 10 * 10 * 10000000)
+        await hx_req(
+            f"{credentials['Server']}/emby/Sessions/Playing/Progress",
+            "POST",
+            headers=headers,
+            json_data=get_playing_data(final_tick, stop=True),
+            transport=AsyncHTTPTransport(),
+            proxy=EMBY_PROXY,
+            verify=False,
+            timeout=5,
+            rformat="content",
+            silent=True,
+        )
+        logger.success(f"【{server_name}】《{item['Name']}》播放完成, 共 {play_seconds:.0f} 秒.")
+    except Exception as e:
+        logger.error(f"【{server_name}】《{item['Name']}》由于连接错误或服务器错误无法停止播放: {e}")
+    return succ
src/emby/register.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import contextlib
+import random
+import re
+
+from glom import glom
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import TZ
+from custom.config import ACCOUNT_NAME, GROUP_JSQ, USER_BENNY
+from messages.utils import delete_message
+from utils import nowdt, strings_list
+
+EMBY_SKIP_REG_BOT = "lilyembybot"
+
+
+async def emby_register(client: Client, message: Message) -> None:
+    if not glom(message, "from_user.is_bot", default=False):
+        return
+    bot_name = glom(message, "from_user.username", default="")
+    # start register, send /start to bot
+    markup_text = glom(message, "reply_markup.inline_keyboard.0.0.text", default="")
+    markup_url = glom(message, "reply_markup.inline_keyboard.0.0.url", default="")
+    if ("自由注册" in message.content or "定时注册" in message.content) and markup_text == "👆🏻 点击注册" and markup_url:
+        reg_bot = markup_url.split("/")[-1]
+        if not reg_bot.endswith("bot"):
+            return
+        allow_reg_count = 0
+        if count_matched := re.search(r"剩余可注册 \| (\d+)", str(message.content)):
+            allow_reg_count = int(count_matched.group(1))
+            if allow_reg_count <= 0:
+                return
+            logger.warning(f"【Emby】{reg_bot} 已开启注册!")
+            if bot_name not in strings_list(EMBY_SKIP_REG_BOT) and allow_reg_count >= 5 and nowdt(tz=TZ).hour >= 8:  # 至少 5 个名额
+                await client.send_message(reg_bot, "/start")
+            if ACCOUNT_NAME == "xiaohao":
+                fwd_message = await message.forward(GROUP_JSQ)
+                # get continue duration
+                duration_seconds = 1200  # default 20 min
+                if duration_matched := re.search(r"⏳ 可持续时间 \| (\d+) min", message.content):
+                    duration_seconds = int(duration_matched.group(1)) * 60
+                duration_seconds = min(duration_seconds, allow_reg_count * 5)  # 预估每个注册名额最多够 5s
+                duration_seconds = min(duration_seconds, 1800)  # 最多 30 min
+                duration_seconds = max(duration_seconds, 300)  # 最少 5 min
+                await asyncio.sleep(duration_seconds)
+                await delete_message(fwd_message)  # type: ignore
+
+    # join group and channel
+    if ACCOUNT_NAME == "xiaohao" and str(message.content).startswith("💢 拜托啦"):
+        urls = glom(message, "reply_markup.inline_keyboard.**.url", default=[])
+        urls = {x for x in urls if str(x).startswith(("t.me/", "https://t.me/", "http://t.me/"))}
+        for url in urls:
+            with contextlib.suppress(Exception):
+                await client.join_chat(url.replace("http://", "https://"))
+
+    # init account
+    if "重新召唤面板" in message.content:
+        await client.send_message(message.chat.id, "/start")
+
+    # click `create` button
+    callback_data = glom(message, "reply_markup.inline_keyboard.**.callback_data", default=[])
+    if "create" in callback_data and bot_name not in {"sntp_lite_emby_bot", "shrekpublic_bot"}:
+        logger.warning(f"【Emby】{glom(message, 'from_user.username', default='')} 正在创建账户!")
+        resp = await client.request_callback_answer(message.chat.id, message.id, callback_data="create")
+        logger.warning(f"【Emby】{glom(resp, 'message', default='')}")
+
+    # set account credentials (only for bot)
+    if glom(message, "chat.type.name", default="") != "BOT":
+        return
+    if "您已进入注册状态" in str(message.content):
+        username = fakeuser()
+        safecode = random.randint(1000, 9999)
+        notice = f"【Emby】@{bot_name} 正在设置账户: {username} [{safecode}]"
+        logger.warning(notice)
+        await client.send_message(message.from_user.id, f"{username} {safecode}")
+        await client.send_message(USER_BENNY, notice)
+
+    if "创建用户成功" in str(message.content):
+        await message.forward(USER_BENNY)
+
+
+def fakeuser() -> str:
+    """生成一个随机的用户名, 格式为1个随机的声母 + 2个字母 + 4位年份数字."""
+    shengmu = "bpmfdtnlgkhjqxrzcsyw"
+    letters = "abcdefghijklmnopqrstuvwxyz"
+    return random.choice(shengmu) + "".join(random.choices(letters, k=2)) + str(random.randint(1980, 1999))
src/emby/search.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+from collections import defaultdict
+
+from glom import Coalesce, glom
+from httpx import AsyncHTTPTransport
+from loguru import logger
+
+from config import TEXT_LENGTH
+from emby.account import all_accounts, emby_login, get_account
+from emby.api import build_params, default_headers
+from emby.constant import EMBY_PROXY, ITEM_TYPE
+from messages.utils import count_without_entities
+from networking import hx_req
+from publish import publish_telegraph
+
+
+async def emby_search(query: str, *, allow_nsfw: bool = False) -> tuple[str, str, str]:
+    """Search for movies or series from emby.
+
+    Returns:
+        tuple[str, str]: summary_texts, full_texts, telegraph_url of full_texts
+
+    Examples:
+    Summary:
+        @emby_1_bot
+        🎬(2007)Movie-1
+        🎬(2020)Movie-2
+
+        @emby_2_bot
+        🎬(2007)Movie-1
+        🎬(2020)Movie-2
+    """
+    accounts = await all_accounts()
+    tasks = [search_single(query, acc_name, allow_nsfw=allow_nsfw) for acc_name in accounts]
+    results = await asyncio.gather(*tasks)
+    max_length = TEXT_LENGTH - 50
+    cur_len = sum([len(name) for name in accounts])
+    max_num_item = max(len(item) for item in results)
+    summary = defaultdict(list)  # {"bot-1": ["item-1", "item-2"], "bot-2": ["item-3", "item-4"]}
+    full = dict(zip(accounts, results, strict=True))
+    trimmed = False
+    for item_idx in range(max_num_item):
+        for acc_name, items in zip(accounts, results, strict=True):
+            if item := glom(items, f"{item_idx}", default=""):
+                # check if we can add this item to summary
+                this_len = await count_without_entities(item)
+                if cur_len + this_len >= max_length:
+                    trimmed = True
+                    break
+                summary[acc_name].append(item)
+    summary_texts = ""
+    full_texts = ""
+    failed_accounts = []
+    for acc_name, acc_info in accounts.items():
+        if items := summary.get(acc_name):
+            summary_texts += f"🤖[{acc_name}](t.me/{acc_info['bot']})\n"
+            summary_texts += "".join(items)
+        else:
+            failed_accounts.append(f"❌[{acc_name}](t.me/{acc_info['bot']})\n")
+        if items := full.get(acc_name):
+            full_texts += f"🤖[{acc_name}](t.me/{acc_info['bot']})\n"
+            full_texts += "".join(items)
+    for failed in failed_accounts:
+        summary_texts += failed
+    if trimmed:
+        html = "\n".join([f"<p>{s}</p>" for s in full_texts.split("\n")])
+        telegraph_url = await publish_telegraph(title=query, html=html)
+    else:
+        telegraph_url = ""
+    return summary_texts.strip(), full_texts.strip(), telegraph_url
+
+
+async def search_single(query: str, account_name: str, *, allow_nsfw: bool = False, refresh: bool = False, retry: int = 0) -> list[str]:
+    """Search from a single emby server.
+
+    https://dev.emby.media/reference/RestAPI/ItemsService/getUsersByUseridItems.html
+    """
+    if retry > 2:
+        return []
+    account = await get_account(account_name)
+    if not account:
+        return []
+    if not allow_nsfw and account.get("nsfw"):
+        return []
+    credentials = await emby_login(account_name, refresh=refresh)
+    if not credentials:
+        return []
+    params = {
+        "UserId": credentials["User"]["Id"],
+        "SearchTerm": query,
+        "Recursive": "true",
+        "SortBy": "SortName",
+        "SortOrder": "Ascending",
+        "GroupProgramsBySeries": "true",
+        "IncludeItemTypes": "Movie,Series",
+        "Fields": "PrimaryImageAspectRatio,ProductionYear,Status,EndDate,CommunityRating,RecursiveItemCount,ProviderIds,MediaSources,AlternateMediaSources,PremiereDate",
+        "EnableImageTypes": "Primary,Backdrop,Thumb,Logo",
+        "ImageTypeLimit": "1",
+        "Limit": "50",
+        "StartIndex": "0",
+    }
+    logger.trace(f"Searching `{query}` on Emby Server: {account_name}")
+    resp = await hx_req(
+        f"{account['server']}/emby/items",
+        headers=default_headers(credentials),
+        params=build_params(credentials, params),
+        transport=AsyncHTTPTransport(),
+        proxy=EMBY_PROXY,
+        verify=False,
+        timeout=2,
+        check_keys=["Items"],
+        silent=True,
+        max_retry=1,
+    )
+    if error := resp.get("hx_error"):
+        logger.error(error)
+        if "401 Unauthorized" in error:
+            return await search_single(query, account_name, allow_nsfw=allow_nsfw, refresh=True, retry=retry + 1)
+        return []
+    if not resp["Items"]:
+        return []
+    logger.success(f"Found {len(resp['Items'])} results from {account_name}")
+    retrevaled = []
+    for item in resp["Items"]:
+        itype = ITEM_TYPE.get(item["Type"], "📺")
+        date = str(glom(item, Coalesce("PremiereDate", "ProductionYear"), default="0"))[:4]
+        text = f"{itype}({date}){item['Name']}\n" if date != "0" else f"{itype}{item['Name']}\n"
+        if text not in retrevaled:
+            retrevaled.append(text)
+    return retrevaled
src/messages/database.py
@@ -47,7 +47,7 @@ async def save_messages(messages: list[Message | None], key: str, metadata: dict
     media_group_ids = set()  # save once
     for msg in valid_messages:
         info = parse_msg(msg, silent=True)
-        # Caution: this format should be consistent with `handle_social_media` function in `handler.py`
+        # Caution: this format should be consistent with `process_message` function in `messages/main.py`
         # text = re.sub(r"^👤\[@.*?\]\(tg://user\?id=\d+\)//", "", text)  # remove markdown send_from_user
         text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", info["html"])  # remove html send_from_user
         msg_extra = {"text": text} if text else {}
src/quotly/quotly.py
@@ -97,7 +97,11 @@ async def quote_message(client: Client, message: Message, **kwargs):
 
     # user avatar (DO NOT use file_id directly, because some user hide it for public access)
     avatar_id = "AgACAgIAAxUAAWh1vGNwV9ry4BlFLlyCmkVZewcwAAKqpzEbY1mCPTQvRMdEdMXOAAgBAAMCAANiAAceBA"  # QuotLyBot avatar
-    if big_file_id := glom(quote_msg, "from_user.photo.big_file_id", default=""):
+    if int(uid) == 1639998668:  # hook CYF
+        async for photo in client.get_chat_photos(1639998668, limit=1):  # type: ignore
+            # this method will ignore my custom avatar
+            avatar_id = glom(photo, "sizes.-1.file_id", default="")
+    elif big_file_id := glom(quote_msg, "from_user.photo.big_file_id", default=""):
         avatar_id = big_file_id
     else:  # avatar is not available (Channel hides sender's avatar)
         async for photo in client.get_chat_photos(message.chat.id, limit=1):  # type: ignore
src/main.py
@@ -6,6 +6,7 @@ import json
 import logging
 import os
 import platform
+import random
 import sys
 from json import JSONDecodeError
 from urllib.parse import urlparse
@@ -23,14 +24,42 @@ from bridge.chartimg import forward_chartimg_results
 from bridge.ocr import forward_ocr_results
 from bridge.social import forward_social_media_results
 from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
+from custom.ai_news import daily_ainews
+from custom.config import ACCOUNT_NAME
+from custom.cyf_greeting import cyf_greeting
+from custom.cyf_quote import fafa_quote
+from custom.cyf_twitter_rss import fafa_twitter_rss
+from custom.d1_daily_backup_msg import daily_backup_history_to_d1
+from custom.del_msg import del_unwanted_message
+from custom.dnkt_attendance import dnkt_attendance
+from custom.dnkt_email import dnkt_email
+from custom.email2md import eml2md
+from custom.events import lottery
+from custom.history_alias import tg_history_alias
+from custom.lilaoshi import handle_lilaoshi, preview_lilaoshi_history_message
+from custom.link_extract import link_extract
+from custom.msg_backup import message_backup
+from custom.readhub import readhub
+from custom.restart import restart_bot
+from custom.rss import update_rss
+from custom.summary_video import summary_videos
+from custom.sync_youtube import sync_youtube
+from custom.tempmail import tempmail
 from danmu.sync import sync_livechats
 from database.r2 import clean_r2_expired
+from emby.checkin import checkin_emby, daily_checkin_emby
+from emby.keepalive import keepalive_emby
+from emby.main import emby_entrypoint
+from emby.register import emby_register
 from history.sync import backup_chat_history, sync_chat_history
 from messages.main import process_message
+from messages.modify import message_modify
 from messages.parser import parse_msg
+from messages.utils import delete_message, startswith_prefix
 from permission import check_permission
 from podcast.main import summary_pods
 from price.entrypoint import match_symbol_category
+from quotly.quotly import quote_message
 from utils import cleanup_old_files, to_int
 
 
@@ -92,15 +121,43 @@ async def main():
     @app.on_message(group=1)
     @app.on_edited_message(group=1)
     @app.on_deleted_messages(group=1)
-    async def save_history(client: Client, message: Message | list[Message]):
+    async def on_changed(client: Client, message: Message | list[Message]):
         await sync_chat_history(client, message)
+        if isinstance(message, Message):
+            await checkin_emby(client, message)
+
+    @app.on_message(group=2)
+    async def custom(client: Client, message: Message):
+        message = message_modify(message)
+        if ACCOUNT_NAME == "benny":
+            await quote_message(client, message)
+            await emby_entrypoint(message)
+            await fafa_quote(client, message)
+            if startswith_prefix(message.content, ["/save"]):
+                await delete_message(message)
+            await tempmail(client, message)
+
+        if ACCOUNT_NAME == "xiaohao":
+            await cyf_greeting(client, message)
+            await handle_lilaoshi(client, message)  # 李老师不是你老师
+            # await chenyifa_social_rss(client, message)  # CYF社媒追踪
+
+        await restart_bot(message)
+        await lottery(client, message)
+        await emby_register(client, message)
+        await tg_history_alias(client, message)
+        await summary_videos(client, message)  # 自动总结视频
+        await message_backup(client, message)
+        await del_unwanted_message(client, message)
+        await eml2md(client, message)
+        await link_extract(client, message)
 
     if ENABLE.CRONTAB:
         scheduler = AsyncIOScheduler(timezone=TZ)
-        scheduler.add_job(cron_secondly, "interval", args=[app], seconds=1)
-        scheduler.add_job(cron_minutely, "cron", args=[app], second=0)
+        # scheduler.add_job(cron_secondly, "interval", args=[app], seconds=1)
+        scheduler.add_job(cron_minutely, "interval", args=[app], minutes=1)
         scheduler.add_job(cron_hourly, "cron", args=[app], minute=0)
-        scheduler.add_job(cron_daily, "cron", args=[app], hour=8, minute=30)
+        scheduler.add_job(cron_daily, "cron", args=[app], hour=6, minute=40, jitter=4800)
         logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
         scheduler.start()
 
@@ -109,34 +166,52 @@ async def main():
     await app.stop()
 
 
-async def cron_secondly(client: Client):
-    pass
+# async def cron_secondly(client: Client):
+#     pass
 
 
 async def cron_minutely(client: Client):
     cache.evict()  # delete expired cache
     cleanup_old_files()
     await backup_chat_history(client)
+    await daily_backup_history_to_d1(client)
+    if ACCOUNT_NAME == "benny":
+        pass
+    elif ACCOUNT_NAME == "xiaohao":
+        await dnkt_attendance(client)
+        await fafa_twitter_rss(client)
+    elif ACCOUNT_NAME == "bot":
+        await dnkt_email(client)
+        await readhub()
+        await sync_youtube(client)
+        await update_rss(client)
+        await keepalive_emby()
 
 
 async def cron_hourly(client: Client):
     await daily_summary(client)
-    await clean_anthropic_files()
-    await clean_gemini_files()
-    await clean_r2_expired()
     await sync_livechats()
     if ENABLE.CACHE_PRICE_SYMBOLS:
         await match_symbol_category()  # to cache all supported symbols
     await summary_pods(client)
+    if ACCOUNT_NAME == "xiaohao":
+        await preview_lilaoshi_history_message(client)  # 解析李老师遗漏的历史消息
+        await clean_r2_expired()
+        await clean_anthropic_files()
+        await clean_gemini_files()
+    if ACCOUNT_NAME == "bot":
+        await daily_ainews()
 
 
 async def cron_daily(client: Client):
     # send daliy messages
+    await daily_checkin_emby(client)
     try:
         daliy = json.loads(DAILY_MESSAGES)
         for chat_id, msg in daliy.items():
             logger.info(f"Sending daily message to {chat_id}: {msg}")
             await client.send_message(to_int(chat_id), msg)
+            await asyncio.sleep(random.randint(3, 8))
     except (JSONDecodeError, TypeError):
         logger.warning(f"Invalid DAILY_MESSAGES: {DAILY_MESSAGES}")
     except Exception as e: