Commit 7362ae2
Changed files (5)
src/others/danmu.py
@@ -15,32 +15,30 @@ from pyrogram.client import Client
from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
from pyrogram.types import Message
-from config import DANMU, DOWNLOAD_DIR, PREFIX, PROXY, TEXT_LENGTH, TOKEN, TZ, cache
+from config import DANMU, DOWNLOAD_DIR, PREFIX, PROXY, TOKEN, TZ, cache
from database import create_cf_d1_table, get_cf_r2, list_cf_r2, query_cf_d1, set_cf_r2
+from llm.utils import convert_html
from messages.parser import parse_msg
from messages.progress import modify_progress
from messages.sender import send2tg
-from messages.utils import blockquote, equal_prefix, startswith_prefix
+from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
from networking import hx_req
from others.emoji import CURRENCY
from price.coinmarketcap import get_cmc_fiat
+from publish import publish_telegraph
from utils import nowdt, number
-HELP = f"""📖**查询弹幕记录**
+HELP = f"""📖**查询直播合订本**
`{PREFIX.DANMU}` 使用说明:
1.`{PREFIX.DANMU} + 日期`
2.`{PREFIX.DANMU} + @用户名` (区分大小写, 且要去除空格)
3.`{PREFIX.DANMU} + #关键词` (区分大小写)
4.以上可组合使用, 但日期必须放前面, 关键词必须放后面
-5.不指定日期时, 默认查询本年弹幕记录
-{BLOCKQUOTE_EXPANDABLE_DELIM}示例:
-- `{PREFIX.DANMU} 2025-01-01`: 查询2025-01-01日的弹幕
-- `{PREFIX.DANMU} 2025-01`: 查询2025年1月份的弹幕
-- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
+示例:
+{BLOCKQUOTE_EXPANDABLE_DELIM}- `{PREFIX.DANMU} 2025`: 查询2025年的弹幕
- `{PREFIX.DANMU} @张三`: 查询用户【张三】的弹幕
- `{PREFIX.DANMU} #你好`: 查询包含“你好”关键词的弹幕
-- `{PREFIX.DANMU} 2025-01-01 @张三`: 查询2025-01-01日用户【张三】的弹幕
-- `{PREFIX.DANMU} 2025 @张三`: 查询2025年张三的全部弹幕
+- `{PREFIX.DANMU} 2025-01 @张三`: 查询2025年1月份用户【张三】的弹幕
- `{PREFIX.DANMU} 2025-01-01 #你好`: 查询2025-01-01日包含“你好”的弹幕
- `{PREFIX.DANMU} 2025 @张三 #你好`: 查询2025年用户【张三】包含“你好”的弹幕
@@ -48,22 +46,22 @@ HELP = f"""📖**查询弹幕记录**
- 日期为开播日期, 不是弹幕发送日期 (过了凌晨也算前一天)
- 如果用户名中有空格, 请去除空格。例如: 想指定用户为John Doe请使用 `@JohnDoe`
{BLOCKQUOTE_EXPANDABLE_END_DELIM}
-`{PREFIX.ZIMU}` 和 `{PREFIX.DANMU}` 类似, 但查询的是{DANMU.STREAMER}直播发言。
+`{PREFIX.FAYAN}` 用法类似, 但查询的是**【{DANMU.STREAMER}】**直播语录。
额外需注意的是:
-`{PREFIX.ZIMU}`会忽略指定的 @用户名
-`{PREFIX.ZIMU} + #关键词` 也可以省略**#**号
-直接使用`{PREFIX.ZIMU} + 关键词` 即可
+`{PREFIX.FAYAN}` 命令会忽略指定的 `@用户名`
+`{PREFIX.FAYAN} + #关键词` 可以省略 `#` 号
+直接使用`{PREFIX.FAYAN} + 关键词` 即可
"""
DANMU_TIPS = f"时间点为{TZ}时区"
-SRT_TIPS = "时间点并非真实时间, 而是相对开播时长"
+FAYAN_TIPS = "时间点并非真实时间, 而是相对开播时长"
async def query_danmu(client: Client, message: Message, **kwargs):
info = parse_msg(message)
- if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.ZIMU]):
+ if not startswith_prefix(info["text"], prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
return
- if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.ZIMU]):
+ if equal_prefix(message.text, prefix=[PREFIX.DANMU, PREFIX.FAYAN]):
await send2tg(client, message, texts=HELP, **kwargs)
return
if not DANMU.BASE_URL:
@@ -71,21 +69,18 @@ async def query_danmu(client: Client, message: Message, **kwargs):
return
qtype = "弹幕" if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else "发言"
- match_time, user, keyword = parse_queries(info["text"], qtype)
- if not any((match_time, user, keyword)): # NO Match
- await send2tg(client, message, texts=f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.ZIMU}命令查看帮助", **kwargs)
+ match_time, user, keyword, error = parse_queries(info["text"], qtype)
+ if error:
+ await send2tg(client, message, texts=error, **kwargs)
return
- if not match_time:
- match_time = str(nowdt(TZ).year) # this year
-
- user = user if startswith_prefix(info["text"], prefix=[PREFIX.DANMU]) else DANMU.STREAMER
- caption = f"📖**{qtype}记录**:\n🕒日期: {match_time}"
- if user:
- caption += f"\n👤用户: {user}"
+ user = user if qtype == "弹幕" else DANMU.STREAMER
- if keyword:
- caption += f"\n🔤关键词: {keyword}"
+ caption = f"📖**{qtype}记录**:"
+ if match_time:
+ caption += f"\n🕒日期: {match_time}"
+ caption += f"\n👤用户: {user}"
+ caption += f"\n🔤关键词: {keyword}"
status_msg = (await send2tg(client, message, texts=caption, **kwargs))[0]
kwargs["progress"] = status_msg
@@ -94,20 +89,18 @@ async def query_danmu(client: Client, message: Message, **kwargs):
paths = []
count = 0
super_chats = defaultdict(Decimal) # {"currency": amount}
- engine_dates = await query_engine_with_dates(match_time, user, keyword)
- if qtype == "发言":
- engine_dates = {"server": [match_time]} # always use server
- for engine, dates in engine_dates.items():
+ engine_dates = await get_engine_with_dates(match_time, user, keyword, qtype)
+ for engine, dates in sorted(engine_dates.items(), reverse=True):
if engine == "r2":
- resp = await query_danmu_from_r2(dates, user, keyword, caption, super_chats, **kwargs)
- paths.extend(resp.get("paths", []))
+ resp = await query_r2(dates, user, keyword, caption, super_chats, qtype, **kwargs)
else:
- resp = await query_danmu_from_server(dates, user, keyword, caption, super_chats, qtype, **kwargs)
- paths.append(resp["path"]) if resp.get("path") else None
+ resp = await query_server(dates, user, keyword, caption, super_chats, qtype, **kwargs)
+ paths.extend(resp.get("paths", []))
count += resp.get("count", 0)
if count == 0:
await modify_progress(text=caption + f"\n⚠️未匹配任何{qtype}", force_update=True, **kwargs)
return
+
header = f"📖**{qtype}记录 ({user})**:" if user else f"📖**{qtype}记录**:"
profit = ""
profit_usd = 0
@@ -120,32 +113,39 @@ async def query_danmu(client: Client, message: Message, **kwargs):
if profit_usd > 0 and super_chats:
profit += f"\n💵**总计**: {profit_usd:.2f} USD"
- total_bytes = sum([Path(path).stat().st_size for path in paths])
- if total_bytes < 1048576: # short length, try send as message directly
- danmu = "\n".join([Path(path).read_text() for path in paths]).strip()
+ tips = f"{DANMU_TIPS}\n" if qtype == "弹幕" else f"{FAYAN_TIPS}\n"
+ if file_bytes(paths) < 20480: # 20 KB, short length, try send as message directly
+ danmu = tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
final = f"{header}{profit}\n{blockquote(danmu)}"
- if len(final) < TEXT_LENGTH - 10:
+ if len(await smart_split(final)) == 1:
await modify_progress(message=status_msg, text=final, force_update=True, **kwargs)
return
-
caption += f"\n#️⃣{qtype}数: {count}"
caption += f"\n\n🎉**SuperChat**:{profit}" if profit else ""
- media = [{"document": path} for path in sorted(paths)]
+ dates: list[str] = flatten(engine_dates.values()) # type: ignore
+ merged_paths = merge_txt_files(paths, dates, user, keyword, qtype, tips)
+ media = [{"document": path} for path in sorted(merged_paths)]
+ # less than 200K, add instant view
+ if file_bytes(merged_paths) < 204800:
+ texts = "\n".join([Path(path).read_text() for path in sorted(merged_paths, reverse=True)]).strip()
+ if telegraph_url := await publish_telegraph(title=f"【{qtype}】{user}{match_time} {keyword}", html=convert_html(texts), author=user):
+ caption += f"\n⚡️[即时预览]({telegraph_url})"
await send2tg(client, message, texts=caption, media=media, **kwargs)
[Path(path).unlink(missing_ok=True) for path in paths]
+ [Path(path).unlink(missing_ok=True) for path in merged_paths]
await modify_progress(message=status_msg, del_status=True, **kwargs)
-def parse_queries(texts: str, qtype: str) -> tuple[str, str, str]:
+def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
"""Parse from users' query.
Returns:
- match_time, user, keyword
+ match_time, user, keyword, error
"""
match_time = ""
user = ""
keyword = ""
- texts = texts.replace(PREFIX.ZIMU, PREFIX.DANMU) # unify prefix
+ texts = texts.replace(PREFIX.FAYAN, PREFIX.DANMU) # unify prefix
# 2025-01-01
if matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4}-\d{2}-\d{2})(\s+)?", texts): # noqa: SIM114
match_time = matched.group(1)
@@ -169,36 +169,49 @@ def parse_queries(texts: str, qtype: str) -> tuple[str, str, str]:
# #你好
if matched := re.match(r"^#(.*)", texts):
keyword = matched.group(1)
+
if qtype == "弹幕":
- return match_time, user, keyword
+ if not any((match_time, user, keyword)):
+ return "", "", "", f"查询格式有误, 请发送{PREFIX.DANMU}或{PREFIX.FAYAN}命令查看帮助"
+ return match_time, user, keyword, ""
- # ! wait for changing server limit
- if match_time and not texts: # 暂时不支持获取单日全部发言
- return "", "", ""
- # 对于发言, 如果未匹配时间和keyword, 则用户输入是忽略了keyword前的 #
- # 即使用 `PREFIX.ZIMU keyword`` 查询
+ # 对于使用`PREFIX.FAYAN keyword`的发言查询
+ # 如果未匹配keyword, 则用户输入是忽略了keyword前的 #
if not keyword:
keyword = texts
- return match_time, user, keyword
+ return match_time, "", keyword, ""
-async def query_engine_with_dates(match_time: str, user: str = "", keyword: str = "") -> dict[str, list[str]]:
+async def get_engine_with_dates(match_time: str, user: str = "", keyword: str = "", qtype: str = "弹幕") -> dict[str, list[str]]:
"""获取查询引擎和对应查询日期.
- 日期:
+ 对于弹幕记录根据日期进行判断:
+ 不指定日期: 从server获取
YYYY-MM-DD: 不是今天或者昨天, 则从R2获取
YYYY-MM: 不是本月, 则从R2获取
- YYYY: 如果获取全年的全部聊天记录 (不指定user和keyword), 文件可能会非常大, 小内存VPS经常爆内存
+ YYYY: 如果获取全年的全部弹幕记录 (不指定user和keyword), 文件可能会非常大, 小内存VPS经常爆内存
如果指定了user和keyword, 则从server获取
如果YYYY为本年, 从server获取本月, 其余月份从R2获取
其余情况一律从R2获取
为避免此情况, 当查询为全年时, 改为按月份从R2获取
然后合并每2个月的记录为一个文件, 发送6个文件.
+ 对于发言记录, 如果指定了日期, 从R2查询, 否则从server获取
Returns:
dict: {"server": [date-1, date-2], "r2": [date-3, date-4]}
"""
now = nowdt(TZ)
+ allowed_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
+ allowed_years.append(str(now.year))
+ allowed_years = sorted(set(allowed_years))
+ if qtype == "发言":
+ if match_time:
+ return {"r2": [match_time]}
+ return {"server": allowed_years}
+
+ # 以下为匹配弹幕查询
+ if not match_time:
+ return {"server": allowed_years}
today = now.strftime("%Y-%m-%d")
year = now.year
month = f"{now.month:02d}"
@@ -222,227 +235,319 @@ async def query_engine_with_dates(match_time: str, user: str = "", keyword: str
# 从server获取本月, 其余月份从R2获取
r2_months = [f"{year}-{mon:02d}" for mon in range(1, now.month)]
return {"server": [f"{year}-{month}"], "r2": r2_months} # type: ignore
+ # 往年
r2_months = [f"{match_time}-{mon:02d}" for mon in range(1, 13)]
return {"r2": r2_months}
-async def query_danmu_from_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
- """从远程数据库获取弹幕记录.
-
- dates列表中只有一个日期.
+async def query_server(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
+ """从远程数据库获取记录.
Returns:
- {"path": str, "count": int}
+ {"paths": list[str], "count": int}
"""
if not dates:
return {}
- date = dates[0]
- api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
- payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
- if qtype == "发言":
- payload["limit"] = min(100, DANMU.NUM_PER_QUERY) # ! wait for changing server limit
- if user and qtype == "弹幕":
- payload["authorName"] = user
- if keyword:
- payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
- payload["page"] = 1
- logger.debug(f"Query: {payload}")
- resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
- parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats) if qtype == "弹幕" else parse_srt(resp.get("data", ""), keyword)
- processed = parsed["count"]
- if processed == 0:
- return {}
- danmu = parsed["danmu"]
- await modify_progress(text=caption + f"\n⏳已获取 {processed} 条{qtype}", force_update=True, **kwargs)
- while len(resp.get("data", [])) == payload["limit"] and parsed["count"]:
- payload["page"] += 1
- logger.debug(f"Query: {payload}")
+ paths = []
+ total_count = 0
+ for date in sorted(dates, reverse=True):
+ api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
+ payload = {"limit": DANMU.NUM_PER_QUERY, "liveDate": date}
+ if user and qtype == "弹幕":
+ payload["authorName"] = user
+ if keyword:
+ payload |= {"message": keyword} if qtype == "弹幕" else {"content": keyword}
+ payload["page"] = 1
+ logger.debug(f"Query {qtype}: {payload}")
resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
- parsed = parse_danmu(resp.get("data", ""), user, keyword, super_chats) if qtype == "弹幕" else parse_srt(resp.get("data", ""), keyword)
- processed += parsed["count"]
- danmu += parsed["danmu"]
- await modify_progress(text=caption + f"\n⏳已获取 {processed} 条{qtype}", **kwargs)
-
- del parsed
- date_name = f"{date[:4]}年{date[5:7]}月" if len(date) == 7 else date
- user = f"{user}-" if user else ""
- keyword = f"-“{keyword}”" if keyword else ""
- save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}-{qtype}.txt"
- danmu = f"{DANMU_TIPS}\n{danmu.strip()}" if qtype == "弹幕" else f"{SRT_TIPS}\n{danmu.strip()}"
- async with await anyio.open_file(save_path, "w") as f:
- await f.write(danmu)
- return {"path": save_path, "count": processed}
-
-
-async def query_danmu_from_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, **kwargs) -> dict:
- """从R2获取弹幕记录.
+ parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
+ count = parsed.get("count", 0)
+ if count == 0:
+ continue
+ total_count += count
+ texts = parsed.get("texts", "")
+ await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", force_update=True, **kwargs)
+ while len(resp.get("data", [])) == payload["limit"] and parsed.get("count", 0):
+ payload["page"] += 1
+ logger.debug(f"Query {qtype}: {payload}")
+ resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+ parsed = await parse_from_server(resp.get("data", []), user, keyword, super_chats, qtype)
+ total_count += parsed.get("count", 0)
+ texts += parsed.get("texts", "")
+ await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", **kwargs)
+ del parsed
+ save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
+ async with await anyio.open_file(save_path, "w") as f:
+ await f.write(texts.strip())
+ paths.append(save_path)
+ return {"paths": paths, "count": total_count}
- 一般情况下, dates列表中只有一个日期.
- 只有在获取全年所有月份时, dates列表才会有多个日期. (详见`query_engine_with_dates`函数的docstrings)
- 当只有单日期时, 直接获取弹幕记录.
- 当dates为全年的月份列表时, 合并每两个相邻月份的弹幕记录为一个文件, 最后发送6个文件.
+async def parse_from_server(data: list[dict], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
+ """解析从远程数据库获取的记录.
+ 日期从新到旧, 数据从旧到新
+ """
+ texts = ""
+ count = 0
+ text_key = "message" if qtype == "弹幕" else "content"
+ if keyword:
+ data = [x for x in data if keyword in x.get(text_key, "")]
+ logged_date = []
+ if qtype == "发言":
+ data = [x for x in data if x.get("liveDate") and x.get("startTime")]
+ # group data by dates, and normalize startTime
+ grouped_data = defaultdict(list)
+ for x in data:
+ start_time = x["startTime"].split(",")[0] # 3:53:23 or 03:53:23
+ if len(start_time.split(":")[0]) == 1:
+ x["startTime"] = f"0{start_time}" # 03:53:23
+ else:
+ x["startTime"] = start_time
+ grouped_data[x["liveDate"][:10]].append(x)
+
+ for norm_date, date_data in sorted(grouped_data.items(), reverse=True): # 日期从新到旧
+ for x in sorted(date_data, key=lambda x: x["startTime"]): # 数据从旧到新
+ # only show the day once
+ if norm_date not in logged_date:
+ stream_date = await live_date(norm_date)
+ day = f"\n开播日期: {stream_date}\n"
+ logged_date.append(norm_date)
+ else:
+ day = ""
+ texts += f"\n{day}{x['startTime']}: {x['content'].strip()}"
+ count += 1
+ else:
+ hide_name = bool(user) # 当指定过滤user时, 隐藏用户名
+ if user:
+ data = [x for x in data if x.get("authorName", "") == user]
+ if keyword:
+ data = [x for x in data if keyword in x.get("message", "")]
+ data = [x for x in data if x.get("timestamp")] # ensure timestamp
+ # group data by dates, and normalize startTime
+ grouped_data = defaultdict(list)
+ for x in data:
+ grouped_data[x["liveDate"][:10]].append(x)
+
+ for norm_date, date_data in sorted(grouped_data.items(), reverse=True): # 日期从新到旧
+ for x in sorted(date_data, key=lambda x: x["timestamp"]): # 数据从旧到新
+ dt = datetime.fromtimestamp(x["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
+ sc_amount = ""
+ if super_chat := x.get("scAmount"):
+ currency, amount = super_chat.split(" ")
+ super_chats[currency] += Decimal(amount)
+ sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
+ msg = x.get("message", "")
+ username = "" if hide_name else "|" + x.get("authorName", "")
+ # only show the day once
+ if norm_date not in logged_date:
+ stream_date = await live_date(norm_date)
+ day = f"\n开播日期: {stream_date}\n"
+ logged_date.append(norm_date)
+ else:
+ day = ""
+ texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
+ count += 1
+
+ return {"texts": texts.rstrip(), "count": count}
+
+
+async def query_r2(dates: list[str], user: str, keyword: str, caption: str, super_chats: defaultdict, qtype: str, **kwargs) -> dict:
+ """从R2获取记录.
+
+ 日期从新到旧, 数据从旧到新
Returns:
{"paths": list[str], "count": int}
"""
if not dates:
return {}
- num_merge = 2
- chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
- count = 0
+ prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
+ total_count = 0
paths = []
- for chunk in chunks:
- danmu = ""
- for date in chunk:
- logger.debug(f"Get Danmu from R2: {date}")
- data: list[dict] = await get_cf_r2(DANMU.R2_PREFIX + date, silent=True) # type: ignore
- if not data:
- continue
- if user:
- data = [x for x in data if x.get("authorName", "") == user]
- if keyword:
- data = [x for x in data if keyword in x.get("message", "")]
- if not data:
- continue
- parsed = parse_danmu(data, user, keyword, super_chats)
- danmu += parsed["danmu"]
- count += parsed["count"]
- await modify_progress(text=caption + f"\n⏳已获取 {count} 条弹幕", force_update=True, **kwargs)
- if not danmu:
+ for date in sorted(dates, reverse=True): # 日期从新到旧
+ logger.debug(f"Get {qtype} from R2: {date}")
+ data: dict[str, list[dict]] = await get_cf_r2(prefix + date, silent=True)
+ if not data:
+ continue
+ parsed = await parse_from_r2(data, user, keyword, super_chats, qtype)
+ count = parsed.get("count", 0)
+ if count == 0:
continue
+ total_count += count
+ texts = parsed.get("texts", "")
+ await modify_progress(text=caption + f"\n⏳已匹配 {total_count} 条{qtype}", force_update=True, **kwargs)
del parsed
- year = chunk[0][:4]
- months = [f"{x[5:7]}月" for x in chunk]
- month_name = "&".join(months)
- date_name = f"{year}年{month_name}"
- user = f"{user}-" if user else ""
- keyword = f"-“{keyword}”" if keyword else ""
- save_path = f"{DOWNLOAD_DIR}/{user}{date_name}{keyword}-弹幕.txt"
+ save_path = f"{DOWNLOAD_DIR}/{user}-{date}-{keyword}-{qtype}.txt"
async with await anyio.open_file(save_path, "w") as f:
- await f.write(f"{DANMU_TIPS}\n{danmu.strip()}")
+ await f.write(texts.strip())
paths.append(save_path)
- return {"paths": paths, "count": count}
+ return {"paths": paths, "count": total_count}
-def parse_danmu(data: list[dict], user: str, keyword: str, super_chats: defaultdict) -> dict:
- """解析弹幕记录."""
- msg = ""
- hide_name = bool(user) # 当指定过滤user时, 隐藏用户名
- if user:
- data = [x for x in data if x.get("authorName", "") == user]
- if keyword:
- data = [x for x in data if keyword in x.get("message", "")]
- data = [x for x in data if x.get("timestamp")] # ensure timestamp
- count = 0
- for danmu in sorted(data, key=lambda x: x["timestamp"], reverse=True): # new to old
- dt = datetime.fromtimestamp(danmu["timestamp"] / 1000000, tz=UTC).astimezone(ZoneInfo(TZ))
- sc_amount = ""
- if super_chat := danmu.get("scAmount"):
- currency, amount = super_chat.split(" ")
- super_chats[currency] += Decimal(amount)
- sc_amount = f" ({CURRENCY[currency]}{super_chat})" if currency in CURRENCY else ""
- texts = danmu.get("message", "")
- username = danmu.get("authorName", "")
- msg += f"\n{dt:%m-%d %H:%M:%S}{sc_amount}: {texts}" if hide_name else f"\n{dt:%m-%d %H:%M:%S}|{username}{sc_amount}: {texts}"
- count += 1
- return {"danmu": msg.rstrip(), "count": count}
-
-
-def parse_srt(data: list[dict], keyword: str) -> dict:
- """解析发言记录."""
- msg = ""
- if keyword:
- data = [x for x in data if keyword in x.get("content", "")]
- data = [x for x in data if x.get("liveDate") and x.get("startTime")]
- count = 0
- for x in sorted(data, key=lambda x: (x["liveDate"], x["startTime"]), reverse=True): # new to old
- msg += f"\n{x['liveDate'][:10]} {x['startTime'][:8]}: {x['content'].strip()}"
- count += 1
- return {"danmu": msg.rstrip(), "count": count}
+async def parse_from_r2(data: dict[str, list[dict]], user: str, keyword: str, super_chats: defaultdict, qtype: str) -> dict:
+ """解析从R2获取的记录.
+ 日期从新到旧, 数据从旧到新
+ 字段含义详见 `simplify_json` 函数
-@cache.memoize(ttl=600)
-async def get_live_dates_from_server(year: str | int) -> dict[str, list[str]]:
- """从远程数据库获取直播日期.
+ 发言: {"t": "00:04:27", "m": "你好"}
+ 弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
- 但远程日期可能并非标准YYYY-MM-DD, 这里做标准化后返回.
- 例如可能因为断流问题导致同一日开播多次, 2025-04-14日对应: "2025-04-14_01", "2025-04-14_02", ...
- Reutnrs:
- {"2025-04-14": ["2025-04-14_01", "2025-04-14_02", ...]}
"""
- api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector?liveDate={year}"
- resp: list[dict] = await hx_req(api, proxy=DANMU.PROXY, silent=True) # type: ignore
- dates = defaultdict(list)
- for x in resp:
- if not x.get("liveDate"):
- continue
- live_date = x["liveDate"]
- dates[live_date[:10]].append(live_date)
- return dates
+ # ruff: noqa: PLW2901
+ texts = ""
+ count = 0
+ for date, items in sorted(data.items(), reverse=True): # 日期从新到旧
+ if keyword:
+ items = [x for x in items if keyword in x.get("m", "")]
+ if user and qtype == "弹幕":
+ items = [x for x in items if x.get("u", "") == user]
+ sort_key = "s" if qtype == "弹幕" else "t"
+ items = sorted(items, key=lambda x: x[sort_key]) # 数据从旧到新
+ for idx, x in enumerate(items):
+ # only show the day once
+ day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
+ if qtype == "发言":
+ texts += f"\n{day}{x['t']}: {x['m'].strip()}"
+ else:
+ hide_name = bool(user) # 当指定过滤user时, 隐藏用户名
+ sc_amount = ""
+ if super_chat := x.get("p"):
+ currency, amount = super_chat.split(" ")
+ super_chats[currency] += Decimal(amount)
+ sc_amount = f" ({CURRENCY[currency]}{currency} {number(amount)})" if currency in CURRENCY else ""
+ msg = x.get("m", "")
+ username = "" if hide_name else "|" + x.get("u", "")
+ dt = datetime.fromtimestamp(x["s"], tz=UTC).astimezone(ZoneInfo(TZ))
+ texts += f"\n{day}{dt:%H:%M:%S}{username}{sc_amount}: {msg}"
+ count += 1
+
+ return {"texts": texts.rstrip(), "count": count}
@cache.memoize(ttl=3600)
-async def sync_danmu_to_r2() -> None:
+async def sync_server_to_r2(qtype: str) -> None:
+ if qtype not in ["弹幕", "发言"]:
+ logger.warning(f"Unknown query type: {qtype}")
+ return
+
+ concurrency = 1 # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
+ prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
+ api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
+
async def batch_sync(new_dates: dict[str, list[str]]):
- for r2_key, server_dates in new_dates.items():
+ for r2_key, server_dates in sorted(new_dates.items()):
results = []
for date in sorted(server_dates):
- logger.trace(f"Query server date: {date}")
- payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date, "message": "", "authorName": ""}
- resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
- count = resp["count"]
- keep_keys = ["timestamp", "authorName", "message", "scAmount"]
- results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key)} for danmu in resp["data"]])
- while len(results) < count:
- payload["page"] += 1
- logger.debug(f"Query: {payload}")
- resp = await hx_req(DANMU.BASE_URL + "/liveChat/queryList", "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
- results.extend([{key: danmu[key] for key in keep_keys if danmu.get(key)} for danmu in resp["data"]])
- await set_cf_r2(DANMU.R2_PREFIX + r2_key, results, compress=True, silent=True)
+ logger.trace(f"Query {qtype} date: {date}")
+ payload = {"page": 1, "limit": DANMU.NUM_PER_QUERY, "liveDate": date}
+ resp = await hx_req(api_url, "POST", data=payload, proxy=DANMU.PROXY, check_kv={"code": 0}, silent=True)
+ if resp.get("count", 0) == 0:
+ continue
+ logger.trace(f"Query {qtype} date: {date} - {resp['count']} results")
+ # append the first page
+ results.extend(resp["data"])
+ quotient, remainder = divmod(resp["count"], DANMU.NUM_PER_QUERY) # type: ignore
+ n_pages = quotient + (1 if remainder else 0)
+ tasks = [
+ hx_req(
+ api_url,
+ "POST",
+ data={"page": page, "limit": DANMU.NUM_PER_QUERY, "liveDate": date},
+ proxy=DANMU.PROXY,
+ check_kv={"code": 0},
+ silent=True,
+ )
+ for page in range(2, n_pages + 1)
+ ]
+ chunks = [tasks[i : i + concurrency] for i in range(0, len(tasks), concurrency)]
+ for chunk in chunks:
+ async with asyncio.Semaphore(concurrency):
+ res = await asyncio.gather(*chunk, return_exceptions=True)
+ data = flatten(glom(res, "*.data"))
+ results.extend(data)
+ # if no results, create an empty key
+ metadata = {"empty": True} if not results else {}
+ await set_cf_r2(prefix + r2_key, simplify_json(results, qtype), metadata, compress=True, silent=True)
if not DANMU.R2_SYNC_ENABLE:
return
now = nowdt(TZ)
- if now.hour <= 4 or now.hour >= 20:
+ if qtype == "弹幕" and (now.hour <= 4 or now.hour >= 20):
+ # 忽略晚间的直播弹幕 (实时录制, 应该等结束后再Sync)
return
- years = [x.strip() for x in DANMU.R2_SYNC_YEARS.split(",") if x.strip()]
- if not years:
- years = [str(now.year)]
- for year in years:
- r2 = await list_cf_r2(prefix=DANMU.R2_PREFIX + f"{year}-")
- r2_dates = [x.removeprefix(DANMU.R2_PREFIX) for x in glom(r2, "Contents.*.Key", default=[])]
- api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector?liveDate={year}"
- resp = await hx_req(api, proxy=DANMU.PROXY, silent=True)
- all_server_dates = []
+
+ monitor_years = [x.strip() for x in DANMU.R2_SYNC_DANMU_YEARS.split(",") if x.strip()] if qtype == "弹幕" else [x.strip() for x in DANMU.R2_SYNC_FAYAN_YEARS.split(",") if x.strip()]
+ monitor_years.append(str(now.year))
+ monitor_years = sorted(set(monitor_years))
+ r2 = await list_cf_r2(prefix=prefix)
+ r2_dates = [x.removeprefix(prefix) for x in glom(r2, "Contents.*.Key", default=[])]
+ has_update = False
+ for year in monitor_years:
+ params = {"liveDate": year} if qtype == "弹幕" else {"srtCount": 1, "liveDate": year}
+ api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
+ resp = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True)
new_dates_map = defaultdict(list)
# some live dates on server are "2025-04-14_01", "2025-04-14_02", ...
# norm dates to YYYY-MM-DD
for x in resp:
live_date: str = x["liveDate"] # type: ignore
- all_server_dates.append(live_date)
- if live_date[:10] in r2_dates or live_date[:10] == f"{now:%Y-%m-%d}": # ignore today's live
+ if live_date[:10] in r2_dates:
+ continue
+ if qtype == "弹幕" and live_date[:10] == f"{now:%Y-%m-%d}": # ignore today's live
continue
new_dates_map[live_date[:10]].append(live_date)
-
+ if not new_dates_map:
+ continue
+ has_update = True
# sync per date
- logger.debug(f"Sync year: {year}, Num dates: {len(new_dates_map)}")
+ logger.debug(f"Sync {qtype}-{year}: {new_dates_map}")
await batch_sync(new_dates_map)
-
- # sync per month
- months = {x[:7] for x in new_dates_map} # YYYY-MM
- for month in sorted(months):
- days = [x for x in all_server_dates if x[:7] == month]
- logger.debug(f"Sync month: {month}, Num dates: {len(days)}")
- await batch_sync({month: days})
+ r2_dates.extend(new_dates_map.keys()) # update new dates to R2 dates
+
+ # sync whole month
+ year_months = {x[:7] for x in new_dates_map}
+ for year_month in sorted(year_months):
+ month_data = []
+ for d in range(1, 32):
+ date = f"{year_month}-{d:02d}"
+ if date not in r2_dates:
+ continue
+ logger.trace(f"Get R2 date: {date}")
+ if r2 := await get_cf_r2(prefix + date, silent=True):
+ month_data.append(r2)
+ await set_cf_r2(prefix + year_month, merge_json(month_data), compress=True, silent=True)
+ del month_data
+ r2_dates.extend(list(year_months)) # update new dates to R2 dates
+ # sync whole year
+ if qtype == "发言": # only sync 发言 for whole year. 弹幕 is too large
+ year_data = []
+ for mon in range(1, 13):
+ date = f"{year}-{mon:02d}"
+ if date not in r2_dates:
+ continue
+ logger.trace(f"Get R2 date: {date}")
+ if r2 := await get_cf_r2(prefix + date, silent=True):
+ year_data.append(r2)
+ await set_cf_r2(prefix + year, merge_json(year_data), compress=True, silent=True)
+ del year_data
+
+ # sync all
+ if has_update and qtype == "发言":
+ all_data = []
+ for year in monitor_years:
+ logger.trace(f"Get R2 date: {year}")
+ if r2 := await get_cf_r2(prefix + year, silent=True):
+ all_data.append(r2)
+ await set_cf_r2(prefix + "all", merge_json(all_data), compress=True, silent=True)
@cache.memoize(ttl=28800)
async def to_usd(ccy: str) -> Decimal:
"""Convert 1 unit ccy to USD."""
if ccy == "USD":
- return Decimal()
+ return Decimal(1)
fiats = await get_cmc_fiat() # {"ccy": (name, id)}
if ccy not in fiats:
return Decimal()
@@ -457,6 +562,170 @@ async def to_usd(ccy: str) -> Decimal:
return Decimal(rate)
+def date_str(date: str) -> str:
+ """格式化日期."""
+ if len(date) == 4:
+ date_name = f"{date}年"
+ elif len(date) == 7:
+ date_name = f"{date[:4]}年{date[5:7]}月"
+ elif len(date) == 10:
+ date_name = f"{date[:4]}年{date[5:7]}月{date[8:10]}日"
+ else:
+ date_name = date
+ return date_name
+
+
+def simplify_json(data: list[dict], qtype: str) -> dict:
+ """简化json数据, 去除无用字段.
+
+ 发言: {'content': '你好',
+ 'endTime': '00:04:28,399',
+ 'id': 135366,
+ 'liveDate': '2022-11-14',
+ 'serial': 1,
+ 'startTime': '00:04:27,566'
+ }
+
+ 简化后发言: {"t": "00:04:27", "m": "你好"}
+
+ 弹幕:
+ {'authorId': 'fakeid',
+ 'authorImage': 'https://yt4.ggpht.com/ytc/AKedOLSUwTp6ptiNCy1eovPrO61XfV74a8S21hpP3MU-D4ybbS47m9xmjKJ-JjbghP1m',
+ 'authorName': 'User',
+ 'count': None,
+ 'emotesCount': None,
+ 'id': 1092980,
+ 'liveDate': '2021-12-29',
+ 'message': '你好',
+ 'scAmount': None,
+ 'scInfo': None,
+ 'timeInSeconds': 18690.529,
+ 'timeText': '5:11:30',
+ 'timestamp': 1640799880050853}
+
+ 简化后弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
+
+ Returns:
+ {"liveDate": list[简化后dict]}
+ """
+ if not data:
+ return {}
+ res = defaultdict(list)
+ if qtype == "发言":
+ for x in data:
+ if not all([x.get("startTime"), x.get("content")]):
+ continue
+ item = {}
+ if ts := x.get("startTime"): # 3:53:23,123
+ ts = x["startTime"].split(",")[0] # 3:53:23
+ if len(ts.split(":")[0]) == 1:
+ ts = f"0{ts}"
+ item["t"] = ts
+ else:
+ continue
+ if x.get("content"):
+ item["m"] = x["content"]
+ res[x["liveDate"][:10]].append(item)
+ return res
+
+ for x in data:
+ item = {}
+ if not all([x.get("timestamp"), x.get("authorName")]):
+ continue
+ item["s"] = round(x["timestamp"] / 1000000)
+ item["u"] = x["authorName"]
+ if x.get("scAmount"):
+ item["p"] = x["scAmount"]
+ if x.get("message"):
+ item["m"] = x["message"]
+ res[x["liveDate"][:10]].append(item)
+ return res
+
+
+def merge_json(data: list[dict]) -> dict:
+ """合并简化后的json数据.
+
+ data: list[发言 | 弹幕]
+ 发言: {"2021-12-29": [{"t": "00:04:27", "m": "你好"}]}
+ 弹幕: {"2021-12-29": [{"s": 1640799880, "u": "User Name", "p": "USD 100", "m": "你好"}]}
+
+ Returns:
+ {"liveDate": list[dict]}
+ """
+ if not data:
+ return {}
+ res = defaultdict(list)
+ for chunk in data:
+ for live_date, items in chunk.items():
+ res[live_date[:10]].extend(items)
+ return res
+
+
+def merge_txt_files(paths: list[str], dates: list[str], user: str, keyword: str, qtype: str, header_tips: str) -> list[str]:
+ """Merge multiple txt files into one."""
+ if not paths:
+ return []
+ dates = sorted(dates)
+ paths = sorted(paths)
+ if all(len(x) == 10 for x in dates): # all days (不存在此情况)
+ date_name = dates[0][:7]
+ elif all(len(x) == 7 for x in dates): # all months
+ date_name = dates[0][:4]
+ elif all(len(x) == 4 for x in dates): # all years
+ date_name = dates[0] if len(set(dates)) == 1 else ""
+
+ keyword = f"“{keyword}”" if keyword else ""
+
+ if file_bytes(paths) < 10 * 1024 * 1024: # 10 MB
+ texts = header_tips + "\n\n".join([Path(path).read_text() for path in sorted(paths, reverse=True)]).strip()
+ save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_str(date_name)}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
+ Path(save_path).write_text(texts)
+ return [save_path]
+
+ num_merge = 2
+ date_chunks = [dates[i : i + num_merge] for i in range(0, len(dates), num_merge)]
+ path_chunks = [paths[i : i + num_merge] for i in range(0, len(paths), num_merge)]
+ final_paths = []
+ for date_chunk, path_chunk in zip(date_chunks, path_chunks, strict=True):
+ texts = header_tips + "\n\n".join([Path(p).read_text() for p in sorted(path_chunk, reverse=True)]).strip()
+ date_name = "&".join(date_chunk)
+ save_path = f"{DOWNLOAD_DIR}/【{qtype}】{user}-{date_name}-{keyword}.txt".replace("--", "-").replace("】-", "】").replace("-.txt", ".txt")
+ Path(save_path).write_text(texts)
+ final_paths.append(save_path)
+ return final_paths
+
+
+@cache.memoize(ttl=600)
+async def get_live_info(year: str | int) -> dict:
+ params = {"liveDate": year}
+ api = f"{DANMU.BASE_URL}/liveInfo/queryListBySelector"
+ resp: list[dict] = await hx_req(api, proxy=DANMU.PROXY, params=params, silent=True) # type: ignore
+ dates = {x["liveDate"][:10] for x in resp}
+ info = {date: {"titles": [], "urls": []} for date in dates}
+ for x in resp:
+ info[x["liveDate"][:10]]["titles"].append(x["title"])
+ info[x["liveDate"][:10]]["urls"].append(x["url"])
+ return info
+
+
+async def live_date(date: str) -> str:
+ """Convert YYYY-MM-DD live date to markdown with title."""
+ live_info = await get_live_info(date[:4])
+ day = date[:10]
+ titles = glom(live_info, f"{day}.titles", default=[])
+ urls = glom(live_info, f"{day}.urls", default=[])
+ markdown = [f"[{title}]({url})" for title, url in zip(titles, urls, strict=True)]
+ texts = date + "\n" + "\n".join(markdown)
+ return texts.rstrip()
+
+
+def file_bytes(paths: list[str] | str) -> int:
+ """Get file size in bytes."""
+ if isinstance(paths, str):
+ paths = [paths]
+ return sum([Path(path).stat().st_size for path in paths])
+
+
@cache.memoize(ttl=3600)
async def sync_danmu_to_d1() -> None:
"""Deprecated, D1 only allow 5M reads, 100K writes."""
src/config.py
@@ -82,7 +82,7 @@ class PREFIX:
SEARCH_GOOGLE = os.getenv("PREFIX_SEARCH_GOOGLE", "/google").lower()
GENIMG = os.getenv("PREFIX_GENIMG", "/gen").lower()
DANMU = os.getenv("PREFIX_DANMU", "/danmu").lower()
- ZIMU = os.getenv("PREFIX_ZIMU", "/zimu").lower()
+ FAYAN = os.getenv("PREFIX_FAYAN", "/fa").lower()
class API:
@@ -102,13 +102,13 @@ class API:
class DANMU:
BASE_URL = os.getenv("DANMU_BASE_URL", "") # Custom API, No docs
STREAMER = os.getenv("DANMU_STREAMER", "Streamer") # streamer name
- STREAM_YEARS = os.getenv("DANMU_STREAM_YEARS", "") # comma separated years that has live stream
PROXY = os.getenv("DANMU_PROXY", None) # socks5://127.0.0.1:7890
NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100")) # Number of items per query
D1_DATABASE = os.getenv("DANMU_D1_DATABASE", "bennybot-danmu")
- R2_PREFIX = os.getenv("DANMU_R2_PREFIX", "Danmu/")
+ R2_PREFIX = os.getenv("DANMU_R2_PREFIX", "Streaming/")
R2_SYNC_ENABLE = os.getenv("DANMU_R2_SYNC_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
- R2_SYNC_YEARS = os.getenv("DANMU_R2_SYNC_YEARS", "") # comma separated years to sync to R2. e.g. "2025,2024,2023"
+ R2_SYNC_DANMU_YEARS = os.getenv("R2_SYNC_DANMU_YEARS", "") # comma separated years to sync to R2. e.g. "2025,2024,2023"
+ R2_SYNC_FAYAN_YEARS = os.getenv("R2_SYNC_FAYAN_YEARS", "") # comma separated years that has live stream
class PROVIDER: # default API provider
src/database.py
@@ -215,6 +215,8 @@ async def set_cf_r2(
If `skip_in_memory` is True, it will skip setting to CF-R2 if the key is already in memory cache.
"""
+ if not data and not metadata:
+ return False
if skip_in_memory and cache.get(key):
logger.trace(f"SKIP SET CF-R2: key is already in memory cache for {key}: {cache.get(key)}")
return True
src/handler.py
@@ -184,7 +184,7 @@ async def handle_social_media(
PREFIX.SUBTITLE,
PREFIX.VOICE,
PREFIX.WGET,
- PREFIX.ZIMU,
+ PREFIX.FAYAN,
]
)
info = parse_msg(message)
@@ -348,7 +348,7 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefixes: list[str] |
if permission["google"]:
msg += f"\n🔍**搜索Google**: `{PREFIX.SEARCH_GOOGLE}` + 关键词"
if permission["danmu"]:
- msg += f"\n📖**查询弹幕记录**: 发送 `{PREFIX.DANMU}`, `{PREFIX.ZIMU}` 查看详细教程"
+ msg += f"\n📖**查询直播合订本**: 发送 `{PREFIX.DANMU}`, `{PREFIX.FAYAN}` 查看详细教程"
msg += "\n\n单独发送每个命令前缀本身可查看该命令详细使用说明"
return msg
src/main.py
@@ -26,7 +26,7 @@ from handler import handle_social_media, handle_utilities
from llm.summary import daily_summary
from llm.utils import clean_gemini_files
from messages.parser import parse_msg
-from others.danmu import sync_danmu_to_r2
+from others.danmu import sync_server_to_r2
from others.podcast import summary_pods
from permission import check_permission
from price.entrypoint import match_symbol_category
@@ -128,7 +128,8 @@ async def scheduling(client: Client):
logger.info(f"Sending daily message to {chat_id}: {msg}")
await client.send_message(to_int(chat_id), msg)
await summary_pods(client)
- await sync_danmu_to_r2()
+ await sync_server_to_r2(qtype="发言")
+ await sync_server_to_r2(qtype="弹幕")
await clean_gemini_files()