Commit fe0d29d
Changed files (1)
src
history
src/history/sync.py
@@ -19,7 +19,7 @@ from utils import i_am_bot
# ruff: noqa: S608
-DB_COLUMNS = "mid INTEGER PRIMARY KEY, day TEXT, time TEXT, user TEXT, content TEXT, mtype TEXT, uid INTEGER, handle TEXT, filename TEXT, mime TEXT"
+DB_COLUMNS = "mid INTEGER PRIMARY KEY, time TEXT NOT NULL, user TEXT, content TEXT, mtype TEXT, uid INTEGER, filename TEXT, mime TEXT, urls TEXT, reply INTEGER"
async def save_history_to_d1(client: Client, message: Message) -> None:
@@ -29,15 +29,15 @@ async def save_history_to_d1(client: Client, message: Message) -> None:
table_name = await get_table_name(client, info["cid"])
records = {
"mid": info["mid"],
- "day": info["time"].split(" ")[0],
- "time": info["time"].split(" ")[-1],
+ "time": info["time"],
"user": info["full_name"],
"content": info["text"],
"mtype": info["mtype"],
"uid": info["uid"],
- "handle": info["handle"],
"filename": info["file_name"],
"mime": info["mime_type"],
+ "urls": "~~~".join(info["entity_urls"]),
+ "reply": message.reply_to_message_id,
}
keys = ", ".join(records)
values = ", ".join(["?" for _ in range(len(records))])
@@ -69,15 +69,15 @@ async def sync_chat_history_to_d1(client: Client, chat_id: str | int) -> None:
mids.add(info["mid"])
records = {
"mid": info["mid"],
- "day": info["time"].split(" ")[0],
- "time": info["time"].split(" ")[-1],
+ "time": info["time"],
"user": info["full_name"],
"content": info["text"],
"mtype": info["mtype"],
"uid": info["uid"],
- "handle": info["handle"],
"filename": info["file_name"],
"mime": info["mime_type"],
+ "urls": "~~~".join(info["entity_urls"]),
+ "reply": message.reply_to_message_id,
}
logger.trace(f"Syncing message {table_name}: {info['mid']}")
keys = ", ".join(records)
@@ -111,11 +111,17 @@ async def sync_export_history_to_d1(client: Client, path: str | Path | None = No
return
def parse_text(texts: list) -> str:
+ if isinstance(texts, str):
+ return texts
text = ""
for x in texts:
text += x if isinstance(x, str) else x.get("text", "")
return text
+ def parse_urls(entities: list) -> str:
+ urls = [x.get("text", "") for x in entities if x.get("type", "") == "link"]
+ return "~~~".join(urls)
+
with path.open("r") as f: # noqa: ASYNC230
data = json.load(f)
mtypes = {
@@ -154,19 +160,18 @@ async def sync_export_history_to_d1(client: Client, path: str | Path | None = No
if user == data["name"] and data["type"] in ["public_channel", "private_channel"]: # user is not shown
user = ""
uid = 1
- chat = await chat_info(client, uid)
records = {
"mid": info["id"],
- "day": dt.strftime("%Y-%m-%d"),
- "time": dt.strftime("%H:%M:%S"),
+ "time": dt.strftime("%Y-%m-%d %H:%M:%S"),
"user": user,
- "content": parse_text(info.get("text", "")),
+ "content": parse_text(info.get("text", [])),
"mtype": mtypes[info.get("media_type", "text")],
"uid": uid,
- "handle": chat.username or "",
"filename": info.get("file_name", ""),
"mime": info.get("mime_type", ""),
+ "parse_urls": parse_urls(info.get("text_entities", [])),
+ "reply": info.get("reply_to_message_id"),
}
logger.trace(f"Syncing message {table_name}: {info['id']}")
keys = ", ".join(records)
@@ -218,16 +223,37 @@ async def get_table_name(client: Client, chat_id: str | int) -> str:
slim_cid = str(chat_id).removeprefix("-100")
default_name = f"{slim_cid}-{chat_title}".replace(" ", "")
- # get D1 tables
+ # get database id, and create database if not exists
database_id = await create_d1_database(name=HISTORY.D1_DATABASE)
if not database_id:
await create_d1_table(default_name, DB_COLUMNS, HISTORY.D1_DATABASE)
+ # create index on time
+ sql = f'CREATE INDEX IF NOT EXISTS idx_{slim_cid}_time ON "{default_name}"(time)'
+ resp = await query_d1(sql, database_id)
return default_name
+
+ # list all table names
sql = "SELECT name FROM sqlite_master WHERE type='table';"
- resp = await query_d1(sql, database_id, silent=True)
+ resp = await query_d1(sql, database_id)
table_names = flatten(glom(resp, "result.*.results.*.name", default=[]))
table_mapping = {x.split("-")[0]: x for x in table_names}
+
+ # find the table name based on chat id
table_name = table_mapping.get(str(slim_cid), default_name)
cache.set(f"tablename-{chat_id}", table_name, ttl=0)
+
+ # create table
await create_d1_table(table_name, DB_COLUMNS, HISTORY.D1_DATABASE)
+
+ # list all index
+ sql = "SELECT name FROM sqlite_master WHERE type='index';"
+ resp = await query_d1(sql, database_id)
+ indexs = flatten(glom(resp, "result.*.results.*.name", default=[]))
+
+ # create index on time if not exists
+ if f"idx_{slim_cid}_time" not in indexs:
+ logger.debug(f"Creating index on {table_name} for time")
+ sql = f'CREATE INDEX IF NOT EXISTS idx_{slim_cid}_time ON "{table_name}"(time)'
+ resp = await query_d1(sql, database_id, silent=True)
+
return table_name