Commit 49ae41e

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-09-11 09:16:13
fix(danmu): remove deduplicate messages
1 parent 63f13c3
src/danmu/entrypoint.py
@@ -156,6 +156,8 @@ def parse_queries(texts: str, qtype: str) -> tuple[str, str, str, str]:
     # 2025
     elif matched := re.match(rf"^{PREFIX.DANMU}" + r"\s+(\d{4})(\s+)?", texts):
         match_time = matched.group(1)
+    if not match_time.startswith("20"):
+        match_time = ""
     # remove prefix + date
     texts = re.sub(rf"^{PREFIX.DANMU}\s+{match_time}", "", texts).lstrip()
     # @张三 你好
src/danmu/r2.py
@@ -73,9 +73,16 @@ async def parse_from_r2(data: dict[str, list[dict]], user: str, keyword: str, su
             items = [x for x in items if keyword in x.get("m", "")]
         if user and qtype == "弹幕":
             items = [x for x in items if x.get("u", "") == user.replace(" ", "")]
+
+        # deduplicate
+        added = set()
+        deduplicated = []
+        for x in items:
+            if f"{x.get('s')}{x.get('u')}{x.get('m')}{x.get('t')}" not in added:
+                deduplicated.append(x)
+                added.add(f"{x.get('s')}{x.get('u')}{x.get('m')}{x.get('t')}")
         sort_key = "s" if qtype == "弹幕" else "t"
-        items = sorted(items, key=lambda x: x[sort_key])  # 数据从旧到新
-        for idx, x in enumerate(items):
+        for idx, x in enumerate(sorted(deduplicated, key=lambda x: x[sort_key])):  # 数据从旧到新
             # only show the day once
             day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
             if qtype == "发言":
src/danmu/server.py
@@ -71,6 +71,15 @@ async def parse_from_server(data: list[dict], user: str, keyword: str, super_cha
     text_key = "message" if qtype == "弹幕" else "content"
     if keyword:
         data = [x for x in data if keyword in x.get(text_key, "")]
+    # deduplicate
+    added = set()
+    deduplicated = []
+    for x in data:
+        if f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}" not in added:
+            deduplicated.append(x)
+            added.add(f"{x.get('message')}{x.get('content')}{x.get('liveDate')}{x.get('startTime')}{x.get('authorName')}")
+    data = deduplicated
+
     logged_date = []
     if qtype == "发言":
         data = [x for x in data if x.get("liveDate") and x.get("startTime")]
src/danmu/sync.py
@@ -108,6 +108,7 @@ async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str)
 
     # 标准化数据格式
     normed_data = []
+    added = set()
     if qtype == "发言":
         for x in data:
             if not all([x.get("startTime"), x.get("content")]):
@@ -115,7 +116,10 @@ async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str)
             time = x["startTime"].split(",")[0]  # 3:53:23
             if len(time.split(":")[0]) == 1:
                 time = f"0{time}"
+            if f"{date}{time}{x['content']}" in added:
+                continue
             normed_data.append({"time": f"{date} {time}", "content": x["content"], "segmented": " ".join(cutter.cutword(x["content"]))})
+            added.add(f"{date}{time}{x['content']}")
     else:
         for x in sorted(data, key=lambda x: x.get("timestamp", 0)):
             # time, user, fullname, content, uid, superchat, currency, segmented
@@ -134,7 +138,10 @@ async def save_livechats_to_turso(live_info: dict, data: list[dict], qtype: str)
             if x.get("message"):
                 item["content"] = x["message"]
                 item["segmented"] = " ".join(cutter.cutword(x["message"]))
+            if f"{item['time']}{item['user']}{item.get('content', '')}" in added:
+                continue
             normed_data.append(item)
+            added.add(f"{item['time']}{item['user']}{item.get('content', '')}")
 
     # 过滤掉获取已保存在turso的记录
     data = await filter_records_in_turso(normed_data, date, qtype)
src/danmu/turso.py
@@ -79,19 +79,17 @@ async def parse_from_turso(data: list[dict], user: str, keyword: str, super_chat
     """解析从Turso获取的记录.
 
     日期从新到旧, 数据从旧到新
-    字段含义详见 `simplify_json` 函数
-
-    发言: {"t": "00:04:27", "m": "你好"}
-    弹幕: {"u": "User", "s": 1640799880, "m": "你好", "p": "USD 100"}
-
 
+    COLUMNS = {
+    "发言": "time TEXT, content TEXT, segmented TEXT",
+    "弹幕": "time TEXT, fullname TEXT, content TEXT, superchat TEXT,user TEXT, uid TEXT, segmented TEXT",
+    }
     """
     # ruff: noqa: PLW2901
     # group by dates
     grouped_data = defaultdict(list)  # {date: list[dict]}
     for x in data:
         grouped_data[x["time"][:10]].append(x)
-
     texts = ""
     count = 0
     for date, items in sorted(grouped_data.items(), reverse=True):  # 日期从新到旧
@@ -99,8 +97,15 @@ async def parse_from_turso(data: list[dict], user: str, keyword: str, super_chat
             items = [x for x in items if keyword in x.get("content", "")]
         if user and qtype == "弹幕":
             items = [x for x in items if x.get("fullname", "") == user]
-        items = sorted(items, key=lambda x: x["time"])  # 数据从旧到新
-        for idx, x in enumerate(items):
+
+        # deduplicate
+        added = set()
+        deduplicated = []
+        for x in items:
+            if f"{x['time']}{x['content']}{x.get('user')}" not in added:
+                deduplicated.append(x)
+                added.add(f"{x['time']}{x['content']}{x.get('user')}")
+        for idx, x in enumerate(sorted(deduplicated, key=lambda x: x["time"])):  # 数据从旧到新
             # only show the day once
             day = f"\n开播日期: {await live_date(date)}\n" if idx == 0 else ""
             if qtype == "发言":
src/danmu/utils.py
@@ -101,19 +101,29 @@ def simplify_json(data: list[dict], qtype: str) -> dict:
             if x.get("content"):
                 item["m"] = x["content"]
             res[x["liveDate"][:10]].append(item)
-        return res
-
-    for x in data:
-        item = {}
-        if not all([x.get("timestamp"), x.get("authorName")]):
-            continue
-        item["s"] = round(x["timestamp"] / 1000000)
-        item["u"] = x["authorName"].replace(" ", "")  # UserName
-        if x.get("scAmount"):
-            item["p"] = x["scAmount"]
-        if x.get("message"):
-            item["m"] = x["message"]
-        res[x["liveDate"][:10]].append(item)
+
+    else:  # 弹幕
+        for x in data:
+            item = {}
+            if not all([x.get("timestamp"), x.get("authorName")]):
+                continue
+            item["s"] = round(x["timestamp"] / 1000000)
+            item["u"] = x["authorName"].replace(" ", "")  # UserName
+            if x.get("scAmount"):
+                item["p"] = x["scAmount"]
+            if x.get("message"):
+                item["m"] = x["message"]
+            res[x["liveDate"][:10]].append(item)
+
+    # deduplicate
+    for date, items in res.items():
+        added = set()
+        deduplicated = []
+        for x in items:
+            if f"{x.get('s')}{x.get('u')}{x.get('m')}{x.get('t')}" not in added:
+                deduplicated.append(x)
+                added.add(f"{x.get('s')}{x.get('u')}{x.get('m')}{x.get('t')}")
+        res[date] = deduplicated
     return res