Commit c3095fb
Changed files (1)
src
history
src/history/turso.py
@@ -20,7 +20,8 @@ from utils import i_am_bot, nowdt, slim_cid
CHAT_COLUMNS = "cid INTEGER PRIMARY KEY, ctype TEXT, ctitle TEXT, chandle TEXT, tablename TEXT, tags TEXT"
USER_COLUMNS = "uid INTEGER PRIMARY KEY, full_name TEXT, handle TEXT, tags TEXT"
-MSG_COLUMNS = "mid INTEGER PRIMARY KEY, mtype TEXT, time TEXT NOT NULL, fullname TEXT, content TEXT, filename TEXT, urls TEXT, reply INTEGER, mime TEXT, user TEXT, uid INTEGER, segmented TEXT"
+MSG_COLUMNS = "mid INTEGER PRIMARY KEY, mtype TEXT, time TEXT NOT NULL, fullname TEXT, content TEXT, filename TEXT, urls TEXT, reply INTEGER, mime TEXT, user TEXT, uid INTEGER, gid INTEGER, segmented TEXT" # fmt: off
+
INDEX_NAMES = ["time", "user", "uid"]
@@ -51,6 +52,7 @@ async def sync_history_to_turso(message: Message) -> None:
"mime": info["mime_type"],
"user": info["full_name"].replace(" ", ""),
"uid": info["uid"],
+ "gid": info["media_group_id"],
"segmented": " ".join(cutter.cutword(message.content)),
}
await turso_exec([insert_statement(chatinfo["tablename"], records, update_on_conflict="mid")], silent=True, retry=2, **TURSO_KWARGS)
@@ -103,6 +105,7 @@ async def backup_chat_history_to_turso(client: Client, chat_id: str | int, hours
"mime": info["mime_type"],
"user": info["full_name"].replace(" ", ""),
"uid": info["uid"],
+ "gid": info["media_group_id"],
"segmented": " ".join(cutter.cutword(info["text"])),
}
logger.trace(f"Syncing {table_name}: {info['mid']}")
@@ -145,6 +148,18 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
with path.open("r") as f: # noqa: ASYNC230
data = json.load(f)
logger.info(f"Found {len(data['messages'])} messages in json file")
+ """Since the exported history does not has media_group_id,
+ So we first process all messages and add media_group_id for it.
+ If two consecutive messages have the same `from_id` and `date_unixtime`,
+ and the message type is photo or video, these messages will be considered as a media group.
+ """
+ last_msg = {}
+ for idx, msg in enumerate(data["messages"]):
+ if all(msg.get(key) == last_msg.get(key) for key in ["from_id", "date_unixtime"]) and any(key in msg for key in ["photo", "thumbnail"]):
+ data["messages"][idx - 1]["media_group_id"] = glom(data["messages"][idx - 1], Coalesce("media_group_id", "id"))
+ data["messages"][idx]["media_group_id"] = glom(data["messages"][idx - 1], Coalesce("media_group_id", "id"))
+ last_msg = msg
+
mtypes = {
"audio_file": "audio",
"voice_message": "voice",
@@ -161,9 +176,6 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'SELECT mid FROM "{table_name}";'}}], silent=True, **TURSO_KWARGS)
saved_ids = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
saved_ids = {int(x) for x in saved_ids}
- logger.info(f"Found {len(saved_ids)} messages in Turso. Rows read: {glom(resp, 'results.0.response.result.rows_read', default=1)}")
- last_id = max(saved_ids, default=0)
- logger.info(f"Found last message at {last_id}")
concurrency = 5000
statements = []
for info in [msg for msg in data["messages"] if msg["id"] not in saved_ids]: # type: ignore
@@ -201,6 +213,7 @@ async def upload_exported_history_to_turso(client: Client, path: str | Path | No
"mime": info.get("mime_type", ""),
"user": user.replace(" ", ""),
"uid": uid,
+ "gid": info.get("media_group_id", 0),
"segmented": " ".join(cutter.cutword(content)),
}
# logger.debug(f"Syncing message {table_name}: {info['id']}")