main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import re
  4from collections import defaultdict
  5from decimal import Decimal
  6from io import BytesIO
  7from pathlib import Path
  8
  9from pyrogram.client import Client
 10from pyrogram.errors.exceptions.bad_request_400 import MediaCaptionTooLong
 11from pyrogram.types import InputMediaDocument, Message
 12
 13from config import DANMU, DOWNLOAD_DIR, PREFIX, TZ
 14from danmu.r2 import get_username_history, query_r2
 15from danmu.turso import query_turso
 16from danmu.utils import count_entities, to_usd
 17from messages.parser import parse_msg
 18from messages.progress import modify_progress
 19from messages.sender import send2tg
 20from messages.utils import blockquote, delete_message, equal_prefix, smart_split, startswith_prefix
 21from others.emoji import CURRENCY
 22from publish import publish_telegraph
 23from utils import convert2html, nowdt, number, strings_list
 24
 25HELP = f"""📖**查询直播合订本**
 26`{PREFIX.DANMU}` 使用说明:
 271.`{PREFIX.DANMU} + 日期`
 282.`{PREFIX.DANMU} + @用户名` (区分大小写)
 293.`{PREFIX.DANMU} + 关键词` (区分大小写)
 304.以上可组合使用, 但日期必须放前面, 关键词必须放后面
 31示例:
 32- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
 33- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
 34- `{PREFIX.DANMU} 你好`: 查询包含“你好”关键词的弹幕
 35- `{PREFIX.DANMU} 2025-01 @张三`: 查询2025年1月份用户【张三】的弹幕
 36- `{PREFIX.DANMU} 2025-01-01 你好`: 查询2025-01-01日包含“你好”的弹幕
 37- `{PREFIX.DANMU} 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的弹幕
 38
 39🕒**日期说明:**
 40- 发言记录的时间点是开播时长
 41- 弹幕记录的时间点是真实时间,时区为 **{TZ}**
 42- 指定弹幕查询日期时,采用的是30小时制(凌晨0—6点属于前一天)
 43- 例如 `{PREFIX.DANMU} 2025-01-01` 查询的是2025-01-01日6点至次日6点的弹幕
 44
 45👤**用户名说明:**
 46- 如果用户名中有空格, 请用**引号**包住用户名 (单双引号、中英引号皆可)。
 47- 例如: 想指定用户为John Doe请使用 `@"John Doe"`
 48- 用户名也支持指定YouTube的 **ChannelID**, 例如 `{PREFIX.DANMU} @UC...`
 49- 查询ChannelID方法:在YouTube用户页面的简介处点击“更多” -> 点击最下方的“分享频道” -> 点击“复制频道ID”
 50
 51`{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
 52( `@用户名` 对于 `{PREFIX.FAYAN}` 命令无效)
 53"""
 54
 55DANMU_TIPS = f"时间点为{TZ}时区\n"
 56
 57
 58async def query_danmu(client: Client, message: Message, **kwargs):
 59    info = parse_msg(message)
 60    if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
 61        return
 62    if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
 63        await send2tg(client, message, texts=HELP, **kwargs)
 64        return
 65    if not DANMU.BASE_URL:
 66        await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
 67        return
 68
 69    qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
 70    match_time, user, keyword, error = parse_queries(info["text"], qtype)
 71    if error:
 72        await send2tg(client, message, texts=error, **kwargs)
 73        return
 74
 75    user = user if qtype == "弹幕" else DANMU.STREAMER
 76
 77    caption = f"📖**{qtype}记录**:"
 78    if match_time:
 79        caption += f"\n🕒日期: {match_time}"
 80    caption += f"\n👤用户: {user}"
 81    caption += f"\n🔤关键词: {keyword}"
 82
 83    status_msg = await message.reply_text(caption, quote=True)
 84    kwargs["progress"] = status_msg
 85
 86    super_chats = defaultdict(Decimal)  # {"currency": amount}
 87    if DANMU.QUERY_METHOD.lower() == "turso":
 88        resp = await query_turso(match_time, user, keyword, caption, super_chats, qtype, **kwargs)
 89        count = resp.get("count", 0)
 90        paths = resp.get("paths", [])
 91        texts = "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
 92        [Path(path).unlink(missing_ok=True) for path in paths]
 93        user = resp.get("user", user)
 94    else:
 95        query_dates = await get_query_dates(match_time, qtype)
 96        resp = await query_r2(query_dates, user, keyword, caption, super_chats, qtype, **kwargs)
 97        texts = resp.get("texts", "")
 98        count = resp.get("count", 0)
 99    if count == 0:
100        await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
101        return
102    header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
103    username_history = await get_username_history(user) if qtype == "弹幕" and user else ""
104    profit = ""
105    profit_usd = 0
106    for currency, amount in sorted(super_chats.items()):
107        profit += f"\n{CURRENCY[currency]}**{currency}**: {number(amount)}" if currency in CURRENCY else ""
108        profit_usd += amount * (await to_usd(currency)) if currency in CURRENCY else Decimal()
109    # if only "USD" ccy, do not include total USD
110    super_chats.pop("USD", None)  # remove "USD"
111    if profit_usd > 0 and super_chats:
112        profit += f"\n💵**总计**: {profit_usd:.2f} USD"
113
114    # try send as message directly
115    tips = DANMU_TIPS if qtype == "弹幕" else ""
116    final = f"{header}{tips}{profit}{username_history.strip()}\n\n{texts}"
117    if (await count_entities(client, final)) <= 100 and len(await smart_split(final)) == 1:
118        await modify_progress(message=status_msg, text=blockquote(final), force_update=True, **kwargs)
119        return
120
121    caption += f"\n#️⃣{qtype}数: {count}"
122    caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
123    html = convert2html(texts)
124    if telegraph_url := await publish_telegraph(title=f"{qtype}{user}{match_time} {keyword}", html=html, author=user, ttl="1d"):
125        caption += f"\n⚡️[即时预览]({telegraph_url})"
126    caption += blockquote(username_history)
127    html = f'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>{qtype}查询结果</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css"></head><body><article>{html}</article></body></html>'
128    try:
129        with BytesIO(html.encode("utf-8")) as f:
130            await status_msg.edit_media(media=InputMediaDocument(f, file_name=f"{qtype}查询结果.html", caption=caption))
131    except MediaCaptionTooLong:
132        save_path = Path(DOWNLOAD_DIR).joinpath(f"{qtype}查询结果.html")
133        save_path.write_text(html)
134        await send2tg(client, message, texts=caption, media=[{"document": save_path.as_posix()}], **kwargs)
135        await delete_message(status_msg)
136
137
138def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
139    """Parse from users' query.
140
141    Returns:
142        match_time, user, keyword, error
143    """
144    # ruff: noqa: RUF001
145    match_time = ""
146    user = ""
147    texts = texts.replace(PREFIX.FAYAN, PREFIX.DANMU)  # unify prefix
148    # 2025-01-01
149    if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):  # noqa: SIM114
150        match_time = matched.group(1)
151    # 2025-01
152    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2})(\s+)?", texts):  # noqa: SIM114
153        match_time = matched.group(1)
154    # 2025
155    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
156        match_time = matched.group(1)
157    if not match_time.startswith("20"):
158        match_time = ""
159    # remove prefix + date
160    texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
161    # @张三 你好
162    # @张三
163    # @"Zhang San"
164    if texts.startswith("@"):  # user is specified
165        texts = texts.lstrip("@")
166        if texts.startswith('"'):
167            if matched := re.match(r'^"(.*?)"', texts):
168                user = matched.group(1)
169                texts = re.sub(r'^"(.*)"', "", texts)
170            else:
171                return "", "", "", "查询格式有误, 用户名右侧缺失英文双引号"
172        elif texts.startswith(""):
173            if matched := re.match(r"^“(.*?)”", texts):
174                user = matched.group(1)
175                texts = re.sub(r"^“(.*?)”", "", texts)
176            else:
177                return "", "", "", "查询格式有误, 用户名右侧缺失中文双引号"
178        elif texts.startswith("'"):
179            if matched := re.match(r"^'(.*?)'", texts):
180                user = matched.group(1)
181                texts = re.sub(r"^'(.*?)'", "", texts)
182            else:
183                return "", "", "", "查询格式有误, 用户名右侧缺失英文单引号"
184        elif texts.startswith(""):
185            if matched := re.match(r"^‘(.*?)’", texts):
186                user = matched.group(1)
187                texts = re.sub(r"^‘(.*?)’", "", texts)
188            else:
189                return "", "", "", "查询格式有误, 用户名右侧缺失中文单引号"
190        elif matched := re.match(r"^(\S*)", texts):
191            user = matched.group(1)
192            texts = re.sub(rf"^{user}", "", texts)
193
194    keyword = texts.lstrip()
195
196    if qtype == "发言":
197        user = ""
198
199    if not any((match_time, user, keyword)):
200        return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}{PREFIX.FAYAN}命令查看帮助"
201    return match_time, user, keyword, ""
202
203
204async def get_query_dates(match_time: str, qtype: str = "弹幕") -> list[str]:
205    """获取查询日期.
206
207    Returns:
208        list[str]: [date-1, date-2, ...]
209    """
210    now = nowdt(TZ)
211    allowed_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
212    allowed_years.append(str(now.year))
213    allowed_years = sorted(set(allowed_years))
214    # YYYY-MM-DD, YYYY-MM, YYYY
215    if match_time:
216        return [match_time]
217
218    return allowed_years