Commit fec7ee9

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-07 03:37:48
perf(history): optimize D1 and Turso history backup speed
convert `saved_ids` from list to set to speed up membership checks
1 parent 469d9d0
Changed files (2)
src
src/history/d1.py
@@ -66,6 +66,7 @@ async def backup_chat_history_to_d1(client: Client, chat_id: str | int, hours: f
     sql = f'SELECT mid FROM "{table_name}" WHERE time >= "{begin_time}" AND time <= "{end_time}";'
     resp = await query_d1(sql, db_name=HISTORY.D1_DATABASE, silent=True)
     saved_mids = glom(resp, "result.0.results.*.mid", default=[])
+    saved_mids = {int(x) for x in saved_mids}
     logger.info(f"Found {len(saved_mids)} messages in D1. Rows read: {glom(resp, 'result.0.meta.rows_read', default=1)}")
     concurrency = 200
     tasks = []
@@ -148,11 +149,10 @@ async def upload_exported_history_to_d1(client: Client, path: str | Path | None
     sql = f'SELECT mid FROM "{table_name}" ORDER BY mid;'
     resp = await query_d1(sql, db_name=HISTORY.D1_DATABASE, silent=True)
     saved_ids = glom(resp, "result.0.results.*.mid", default=[])
+    saved_ids = {int(x) for x in saved_ids}
     concurrency = 200
     tasks = []
-    for info in data["messages"]:  # type: ignore
-        if info["id"] in saved_ids:
-            continue
+    for info in [msg for msg in data["messages"] if msg["id"] not in saved_ids]:  # type: ignore
         if info["type"] != "message":
             continue
         if info["date_unixtime"] == "0":
src/history/turso.py
@@ -69,7 +69,7 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
     sql = f'SELECT mid FROM "{table_name}" WHERE time >= "{begin_time}" AND time <= "{end_time}";'
     resp = await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], db_name=HISTORY.TURSO_DATABASE, silent=True)
     saved_mids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
-    saved_mids = [int(x) for x in saved_mids]
+    saved_mids = {int(x) for x in saved_mids}
     logger.info(f"Found {len(saved_mids)} messages in Turso. Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
     concurrency = 200
     statements = []
@@ -157,15 +157,13 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
         silent=True,
     )
     saved_ids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
-    saved_ids = [int(x) for x in saved_ids]
+    saved_ids = {int(x) for x in saved_ids}
     logger.info(f"Found {len(saved_ids)} messages in Turso. Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
     last_id = max(saved_ids, default=0)
     logger.info(f"Found last message at {last_id}")
     concurrency = 5000
     statements = []
-    for info in data["messages"]:  # type: ignore
-        if info["id"] in saved_ids:
-            continue
+    for info in [msg for msg in data["messages"] if msg["id"] not in saved_ids]:  # type: ignore
         if info["type"] != "message":
             continue
         if info["date_unixtime"] == "0":