main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import re
  4from collections import defaultdict
  5from decimal import Decimal
  6from pathlib import Path
  7
  8from pyrogram.client import Client
  9from pyrogram.types import Message
 10
 11from config import DANMU, PREFIX, TZ
 12from danmu.r2 import query_r2
 13from danmu.turso import query_turso
 14from danmu.utils import file_bytes, merge_txt_files, to_usd
 15from messages.parser import parse_msg
 16from messages.progress import modify_progress
 17from messages.sender import send2tg
 18from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
 19from others.emoji import CURRENCY
 20from publish import publish_telegraph
 21from utils import convert_html, nowdt, number, strings_list
 22
 23HELP = f"""📖**查询直播合订本**
 24`{PREFIX.DANMU}` 使用说明:
 251.`{PREFIX.DANMU} + 日期`
 262.`{PREFIX.DANMU} + @用户名` (区分大小写)
 273.`{PREFIX.DANMU} + 关键词` (区分大小写)
 284.以上可组合使用, 但日期必须放前面, 关键词必须放后面
 29示例:
 30- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
 31- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
 32- `{PREFIX.DANMU} 你好`: 查询包含“你好”关键词的弹幕
 33- `{PREFIX.DANMU} 2025-01 @张三`: 查询2025年1月份用户【张三】的弹幕
 34- `{PREFIX.DANMU} 2025-01-01 你好`: 查询2025-01-01日包含“你好”的弹幕
 35- `{PREFIX.DANMU} 2025 @张三 你好`: 查询2025年用户【张三】包含“你好”的弹幕
 36
 37⚠️注意:
 38- 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
 39- 如果用户名中有空格, 请用**引号**包住用户名 (单双引号、中英引号皆可)。
 40- 例如: 想指定用户为John Doe请使用 `@"John Doe"`
 41
 42`{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
 43( `@用户名` 对于 `{PREFIX.FAYAN}` 命令无效)
 44"""
 45
 46DANMU_TIPS = f"时间点为{TZ}时区"
 47FAYAN_TIPS = "时间点并非真实时间, 而是相对开播时长"
 48
 49
 50async def query_danmu(client: Client, message: Message, **kwargs):
 51    info = parse_msg(message)
 52    if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
 53        return
 54    if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
 55        await send2tg(client, message, texts=HELP, **kwargs)
 56        return
 57    if not DANMU.BASE_URL:
 58        await send2tg(client, message, texts="⚠️请联系管理员配置API地址", **kwargs)
 59        return
 60
 61    qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
 62    match_time, user, keyword, error = parse_queries(info["text"], qtype)
 63    if error:
 64        await send2tg(client, message, texts=error, **kwargs)
 65        return
 66
 67    user = user if qtype == "弹幕" else DANMU.STREAMER
 68
 69    caption = f"📖**{qtype}记录**:"
 70    if match_time:
 71        caption += f"\n🕒日期: {match_time}"
 72    caption += f"\n👤用户: {user}"
 73    caption += f"\n🔤关键词: {keyword}"
 74
 75    status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0] if kwargs.get("show_progress") else None
 76    kwargs["progress"] = status_msg
 77
 78    super_chats = defaultdict(Decimal)  # {"currency": amount}
 79    if DANMU.QUERY_METHOD.lower() == "turso":
 80        resp = await query_turso(match_time, user, keyword, caption, super_chats, qtype, **kwargs)
 81        count = resp.get("count", 0)
 82        paths = resp.get("paths", [])
 83        user = resp.get("user", user)
 84    else:
 85        query_dates = await get_query_dates(match_time, qtype)
 86        resp = await query_r2(query_dates, user, keyword, caption, super_chats, qtype, **kwargs)
 87        paths = resp.get("paths", [])
 88        count = resp.get("count", 0)
 89    if count == 0:
 90        await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
 91        return
 92    header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
 93    profit = ""
 94    profit_usd = 0
 95    for currency, amount in sorted(super_chats.items()):
 96        profit += f"\n{CURRENCY[currency]}**{currency}**: {number(amount)}" if currency in CURRENCY else ""
 97        profit_usd += amount * (await to_usd(currency)) if currency in CURRENCY else Decimal()
 98    # if only "USD" ccy, do not include total USD
 99    super_chats.pop("USD", None)  # remove "USD"
100    if profit_usd > 0 and super_chats:
101        profit += f"\n💵**总计**: {profit_usd:.2f} USD"
102
103    tips = f"{DANMU_TIPS}\n" if qtype == "弹幕" else f"{FAYAN_TIPS}\n"
104    if file_bytes(paths) < 20480:  #  20 KB, short length, try send as message directly
105        danmu = tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
106        final = f"{header}{profit}\n{blockquote(danmu)}"
107        if len(await smart_split(final)) == 1:
108            await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
109            return
110    caption += f"\n#️⃣{qtype}数: {count}"
111    caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
112    merged_paths = paths if DANMU.QUERY_METHOD.lower() == "turso" else merge_txt_files(paths, query_dates, user, keyword, qtype, tips)
113    media = [{"document": path} for path in sorted(merged_paths)]
114    # less than 200K, add instant view
115    if file_bytes(merged_paths) < 204800:
116        texts = "\n".join([Path(path).read_text() for path in sorted(merged_paths, reverse=True)]).strip()
117        if telegraph_url := await publish_telegraph(title=f"{qtype}{user}{match_time} {keyword}", html=convert_html(texts), author=user, ttl="1d"):
118            caption += f"\n⚡️[即时预览]({telegraph_url})"
119    await send2tg(client, message, texts=caption, media=media, **kwargs)
120    [Path(path).unlink(missing_ok=True) for path in paths]
121    [Path(path).unlink(missing_ok=True) for path in merged_paths]
122    await modify_progress(message=status_msg, del_status=True, **kwargs)
123
124
125def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
126    """Parse from users' query.
127
128    Returns:
129        match_time, user, keyword, error
130    """
131    # ruff: noqa: RUF001
132    match_time = ""
133    user = ""
134    texts = texts.replace(PREFIX.FAYAN, PREFIX.DANMU)  # unify prefix
135    # 2025-01-01
136    if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts):  # noqa: SIM114
137        match_time = matched.group(1)
138    # 2025-01
139    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2})(\s+)?", texts):  # noqa: SIM114
140        match_time = matched.group(1)
141    # 2025
142    elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
143        match_time = matched.group(1)
144    if not match_time.startswith("20"):
145        match_time = ""
146    # remove prefix + date
147    texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
148    # @张三 你好
149    # @张三
150    # @"Zhang San"
151    if texts.startswith("@"):  # user is specified
152        texts = texts.lstrip("@")
153        if texts.startswith('"'):
154            if matched := re.match(r'^"(.*?)"', texts):
155                user = matched.group(1)
156                texts = re.sub(r'^"(.*)"', "", texts)
157            else:
158                return "", "", "", "查询格式有误, 用户名右侧缺失英文双引号"
159        elif texts.startswith(""):
160            if matched := re.match(r"^“(.*?)”", texts):
161                user = matched.group(1)
162                texts = re.sub(r"^“(.*?)”", "", texts)
163            else:
164                return "", "", "", "查询格式有误, 用户名右侧缺失中文双引号"
165        elif texts.startswith("'"):
166            if matched := re.match(r"^'(.*?)'", texts):
167                user = matched.group(1)
168                texts = re.sub(r"^'(.*?)'", "", texts)
169            else:
170                return "", "", "", "查询格式有误, 用户名右侧缺失英文单引号"
171        elif texts.startswith(""):
172            if matched := re.match(r"^‘(.*?)’", texts):
173                user = matched.group(1)
174                texts = re.sub(r"^‘(.*?)’", "", texts)
175            else:
176                return "", "", "", "查询格式有误, 用户名右侧缺失中文单引号"
177        elif matched := re.match(r"^(\S*)", texts):
178            user = matched.group(1)
179            texts = re.sub(rf"^{user}", "", texts)
180
181    keyword = texts.lstrip()
182
183    if qtype == "发言":
184        user = ""
185
186    if not any((match_time, user, keyword)):
187        return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}{PREFIX.FAYAN}命令查看帮助"
188    return match_time, user, keyword, ""
189
190
191async def get_query_dates(match_time: str, qtype: str = "弹幕") -> list[str]:
192    """获取查询日期.
193
194    Returns:
195        list[str]: [date-1, date-2, ...]
196    """
197    now = nowdt(TZ)
198    allowed_years = strings_list(DANMU.SYNC_DANMU_YEARS) if qtype == "弹幕" else strings_list(DANMU.SYNC_FAYAN_YEARS)
199    allowed_years.append(str(now.year))
200    allowed_years = sorted(set(allowed_years))
201    # YYYY-MM-DD, YYYY-MM, YYYY
202    if match_time:
203        return [match_time]
204
205    return allowed_years