Commit 0abb768
src/danmu/sync.py
@@ -55,8 +55,8 @@ async def sync_server_to_turso(qtype: str) -> None:
api_url = DANMU.BASE_URL + "/liveChat/queryList" if qtype == "弹幕" else DANMU.BASE_URL + "/srtInfo/queryList"
# 首先, 建立 `发言` / `弹幕` 以及 `liveinfo` table, 和相关index
- await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], silent=True, **TURSO_KWARGS)
- await create_table_and_index(qtype)
+ await turso_create_table("liveinfo", LIVEINFO_COLUMNS, idx_cols=["date"], idx_prefix="idx_liveinfo_", silent=True, **TURSO_KWARGS)
+ await turso_create_table(qtype, COLUMNS[qtype], idx_cols=INDEX_NAMES[qtype], idx_prefix=f"idx_{qtype}_", fts_on_col="rowid", silent=True, **TURSO_KWARGS)
# 获取turso存储的liveinfo
resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM liveinfo;"}}], silent=True, **TURSO_KWARGS)
@@ -205,62 +205,6 @@ async def filter_records_in_turso(data: list[dict], date: str, qtype: str) -> li
return [x for x in data if f"{x['time']}-{x['user']}" not in saved]
-async def create_table_and_index(table_name: str) -> None:
- if cache.get(f"danmu-table-{table_name}"):
- return
- cache.set(f"danmu-table-{table_name}", table_name, ttl=0)
-
- await turso_create_table(table_name, COLUMNS[table_name], idx_cols=INDEX_NAMES[table_name], silent=True, **TURSO_KWARGS)
-
- # 列出所有虚拟表
- resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM pragma_table_list WHERE type="virtual";'}}], silent=True, **TURSO_KWARGS)
- virtual_tables = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
-
- primary_key = "rowid"
- """创建 FTS5 虚拟表
- -- content=table_name 指明关联的原表
- -- content_rowid=primary_key 指明原表的行 ID 列是 primary_key
- -- segmented 是我们要索引的列
- -- tokenize='unicode61' 使用 unicode61 分词器, 对多种语言支持更好
- """
- statements = []
- if f"fts_{table_name}" not in virtual_tables:
- logger.debug(f"Creating FTS5 virtual table for {table_name}")
- sql = f"CREATE VIRTUAL TABLE IF NOT EXISTS 'fts_{table_name}' USING fts5(segmented, content='{table_name}', content_rowid={primary_key}, tokenize='unicode61');"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- """将现有数据从原表复制到 FTS 表
- 注意, 我们在这里插入的是 rowid (它会对应到 content_rowid=primary_key 指定的列) 和 content
- 从原表中选择 primary_key 和 segmented 列。primary_key 列的值会被插入到 FTS 表中对应原表 rowid (或 content_rowid) 的位置。
- """
- sql = f"INSERT INTO 'fts_{table_name}' (rowid, segmented) SELECT {primary_key}, segmented FROM '{table_name}' WHERE {primary_key} NOT IN (SELECT rowid FROM 'fts_{table_name}');"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- # 列出所有触发器
- resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM sqlite_master WHERE type="trigger";'}}], silent=True, **TURSO_KWARGS)
- triggers = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
- """维护 FTS 表
- 为了让 FTS 表与原表保持同步, 需要在原表上创建触发器。
- 在原表插入、删除、更新时, 同步更新 FTS 表
- """
- # 创建触发器, 在原表插入数据时, 同步从 FTS 表插入
- if f"trigger_{table_name}_ai" not in triggers:
- sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_ai' AFTER INSERT ON '{table_name}' BEGIN INSERT INTO 'fts_{table_name}' (rowid, segmented) VALUES (NEW.{primary_key}, NEW.segmented); END;"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- # 创建触发器, 在原表删除数据时, 同步从 FTS 表删除
- if f"trigger_{table_name}_ad" not in triggers:
- sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_ad' AFTER DELETE ON '{table_name}' BEGIN DELETE FROM 'fts_{table_name}' WHERE rowid = OLD.{primary_key}; END;"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- # 创建触发器, 在原表更新数据时, 同步更新 FTS 表
- # FTS5 的更新通常是先删除旧的, 再插入新的
- if f"trigger_{table_name}_au" not in triggers:
- sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_au' AFTER UPDATE ON '{table_name}' BEGIN DELETE FROM 'fts_{table_name}' WHERE rowid = OLD.{primary_key} AND OLD.segmented <> NEW.segmented; INSERT INTO 'fts_{table_name}' (rowid, segmented) SELECT NEW.{primary_key}, NEW.segmented WHERE OLD.segmented <> NEW.segmented; END;"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
- await turso_exec(statements, silent=True, **TURSO_KWARGS)
-
-
async def sync_server_to_r2(qtype: str) -> None:
concurrency = 1 # DANMU.NUM_PER_QUERY 设置很大的话只用单并发即可
prefix = DANMU.R2_PREFIX.rstrip("/") + f"/{qtype}/"
src/database/turso.py
@@ -7,6 +7,7 @@ from config import DB, PROXY, cache
from networking import hx_req
+# ruff: noqa: S608
@cache.memoize(ttl=0)
async def turso_db_url(
db_name: str = "bennybot",
@@ -37,15 +38,32 @@ async def turso_db_url(
async def turso_create_table(
table_name: str | float,
columns: str,
- db_name: str = "bennybot",
- idx_cols: list[str] | None = None,
*,
+ idx_cols: list[str] | None = None,
+ idx_prefix: str = "idx_",
+ fts_on_col: str | None = None,
+ fts_index_col: str = "segmented",
+ fts_tabme_name: str | None = None,
+ db_name: str = "bennybot",
username: str = DB.TURSO_USERNAME,
api_token: str = DB.TURSO_API_TOKEN,
group_token: str = DB.TURSO_GROUP_TOKEN,
silent: bool = False,
) -> None:
- """Create a turso table."""
+ """Create a turso table.
+
+ If `idx_cols` is provided, create indexs for these columns.
+
+ idx_cols should be a list of strings, the created index names prefixed by `idx_prefix`
+ for example:
+ idx_prefix = "idx_"
+ idx_cols = ["uid", "time"]
+ indexs = ["idx_uid", "idx_time"]
+
+ # create FTS table for Chinese search
+ If `fts_on_col` is provided, create a FTS5 table with `fts_on_col` as the on column.
+ the `fts_index_col` is the column used for FTS5 indexing.
+ """
tables = await turso_list_tables(db_name, username=username, api_token=api_token, group_token=group_token, silent=silent)
if table_name not in tables:
resp = await turso_exec(
@@ -60,32 +78,102 @@ async def turso_create_table(
logger.success(f'Create Table "{table_name}" in Turso database "{db_name}"')
# create indexs if idx_cols is not None
- if idx_cols is None:
- return
- resp = await turso_exec(
- [{"type": "execute", "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='index';"}}],
- db_name=db_name,
- username=username,
- api_token=api_token,
- group_token=group_token,
- silent=silent,
- )
- indexs = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
- for idx_name in idx_cols:
- if idx_name not in columns:
- logger.warning(f"Index {idx_name} not in columns {columns}")
- continue
- if f"idx_{table_name}_{idx_name}" not in indexs:
- resp = await turso_exec(
- [{"type": "execute", "stmt": {"sql": f'CREATE INDEX IF NOT EXISTS "idx_{table_name}_{idx_name}" ON "{table_name}"({idx_name})'}}],
- db_name=db_name,
- username=username,
- api_token=api_token,
- group_token=group_token,
- silent=silent,
- )
- if glom(resp, "results.0.type", default="") == "ok":
- logger.success(f'Create Index "idx_{table_name}_{idx_name}" of table "{table_name}" in Turso database "{db_name}"')
+ if idx_cols is not None:
+ resp = await turso_exec(
+ [{"type": "execute", "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='index';"}}],
+ db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
+ silent=silent,
+ )
+ indexs = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
+ for idx_name in idx_cols:
+ if idx_name not in columns:
+ logger.warning(f"Index {idx_name} not in columns {columns}")
+ continue
+ if f"{idx_prefix}{idx_name}" not in indexs:
+ resp = await turso_exec(
+ [{"type": "execute", "stmt": {"sql": f'CREATE INDEX IF NOT EXISTS "{idx_prefix}{idx_name}" ON "{table_name}"({idx_name})'}}],
+ db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
+ silent=silent,
+ )
+ if glom(resp, "results.0.type", default="") == "ok":
+ logger.success(f'Create Index "{idx_prefix}{idx_name}" of table "{table_name}" in Turso database "{db_name}"')
+
+ if fts_on_col is not None:
+ # 列出所有虚拟表
+ resp = await turso_exec(
+ [{"type": "execute", "stmt": {"sql": 'SELECT name FROM pragma_table_list WHERE type="virtual";'}}],
+ db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
+ silent=silent,
+ )
+ virtual_tables = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
+
+ """创建 FTS5 虚拟表
+ -- content=table_name 指明关联的原表
+ -- content_rowid=fts_on_col 指明原表的行 ID 列是 fts_on_col
+ -- fts_index_col 是我们要索引的列
+ -- tokenize='unicode61' 使用 unicode61 分词器, 对多种语言支持更好
+ """
+ if fts_tabme_name is None:
+ fts_tabme_name = f"fts_{table_name}"
+ statements = []
+ if fts_tabme_name not in virtual_tables:
+ logger.debug(f"Creating FTS5 virtual table for {table_name}")
+ sql = f"CREATE VIRTUAL TABLE IF NOT EXISTS '{fts_tabme_name}' USING fts5({fts_index_col}, content='{table_name}', content_rowid={fts_on_col}, tokenize='unicode61');"
+ statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+ """将现有数据从原表复制到 FTS 表
+ 注意, 我们在这里插入的是 rowid (它会对应到 content_rowid=fts_on_col 指定的列) 和 content
+ 从原表中选择 fts_on_col 和 segmented 列。fts_on_col 列的值会被插入到 FTS 表中对应原表 rowid (或 content_rowid) 的位置。
+ """
+ sql = f"INSERT INTO '{fts_tabme_name}' (rowid, {fts_index_col}) SELECT {fts_on_col}, {fts_index_col} FROM '{table_name}' WHERE {fts_on_col} NOT IN (SELECT rowid FROM '{fts_tabme_name}');"
+ statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+ # 列出所有触发器
+ resp = await turso_exec(
+ [{"type": "execute", "stmt": {"sql": 'SELECT name FROM sqlite_master WHERE type="trigger";'}}],
+ db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
+ silent=silent,
+ )
+ triggers = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
+ """维护 FTS 表
+ 为了让 FTS 表与原表保持同步, 需要在原表上创建触发器。
+ 在原表插入、删除、更新时, 同步更新 FTS 表
+ """
+ # 创建触发器, 在原表插入数据时, 同步从 FTS 表插入
+ if f"trigger_{table_name}_ai" not in triggers:
+ sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_ai' AFTER INSERT ON '{table_name}' BEGIN INSERT INTO '{fts_tabme_name}' (rowid, {fts_index_col}) VALUES (NEW.{fts_on_col}, NEW.{fts_index_col}); END;"
+ statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+ # 创建触发器, 在原表删除数据时, 同步从 FTS 表删除
+ if f"trigger_{table_name}_ad" not in triggers:
+ sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_ad' AFTER DELETE ON '{table_name}' BEGIN DELETE FROM '{fts_tabme_name}' WHERE rowid = OLD.{fts_on_col}; END;"
+ statements.append({"type": "execute", "stmt": {"sql": sql}})
+
+ # 创建触发器, 在原表更新数据时, 同步更新 FTS 表
+ # FTS5 的更新通常是先删除旧的, 再插入新的
+ if f"trigger_{table_name}_au" not in triggers:
+ sql = f"CREATE TRIGGER IF NOT EXISTS 'trigger_{table_name}_au' AFTER UPDATE ON '{table_name}' BEGIN DELETE FROM '{fts_tabme_name}' WHERE rowid = OLD.{fts_on_col} AND OLD.{fts_index_col} <> NEW.{fts_index_col}; INSERT INTO '{fts_tabme_name}' (rowid, {fts_index_col}) SELECT NEW.{fts_on_col}, NEW.{fts_index_col} WHERE OLD.{fts_index_col} <> NEW.{fts_index_col}; END;"
+ statements.append({"type": "execute", "stmt": {"sql": sql}})
+ await turso_exec(
+ statements,
+ db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
+ silent=silent,
+ )
async def turso_list_tables(
@@ -157,10 +245,10 @@ def insert_statement(table_name: str, records: dict, update_on_conflict: str = "
values = ", ".join(["?" for _ in range(len(records))])
args = [{"type": SQL_TYPES[type(x).__name__.lower()], "value": str(x) if isinstance(x, (int, float)) else x} for x in records.values()]
- sql = f"INSERT INTO '{table_name}' ({keys}) VALUES ({values});" # noqa: S608
+ sql = f"INSERT INTO '{table_name}' ({keys}) VALUES ({values});"
if update_on_conflict:
updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != update_on_conflict])
- sql = f"INSERT INTO '{table_name}' ({keys}) VALUES ({values}) ON CONFLICT ({update_on_conflict}) DO UPDATE SET {updates};" # noqa: S608
+ sql = f"INSERT INTO '{table_name}' ({keys}) VALUES ({values}) ON CONFLICT ({update_on_conflict}) DO UPDATE SET {updates};"
stmt = {"sql": sql}
if args:
stmt |= {"args": args}
src/history/turso.py
@@ -216,66 +216,13 @@ async def get_table_name(client: Client, chat_id: str | int) -> str:
cache.set(f"tablename-{chat_id}", table_name, ttl=0)
# create table and index
- await create_table_and_index(slim_cid, table_name)
+ await turso_create_table(
+ table_name,
+ DB_COLUMNS,
+ idx_cols=INDEX_NAMES,
+ idx_prefix=f"idx_{slim_cid}_",
+ fts_on_col="mid",
+ fts_tabme_name=f"fts_{slim_cid}",
+ **TURSO_KWARGS,
+ )
return table_name
-
-
-async def create_table_and_index(slim_cid: str, table_name: str) -> None:
- # create table
- await turso_create_table(table_name, DB_COLUMNS, **TURSO_KWARGS)
-
- # get all index names
- resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='index';"}}], silent=True, **TURSO_KWARGS)
- indexs = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
-
- # create standard index
- for idx_name in [x for x in INDEX_NAMES if f"idx_{slim_cid}_{x}" not in indexs]:
- logger.debug(f"Creating index on {table_name} for {idx_name}")
- resp = await turso_exec([{"type": "execute", "stmt": {"sql": f'CREATE INDEX IF NOT EXISTS "idx_{slim_cid}_{idx_name}" ON "{table_name}"({idx_name})'}}], silent=True, **TURSO_KWARGS)
-
- # 列出所有虚拟表
- resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM pragma_table_list WHERE type="virtual";'}}], silent=True, **TURSO_KWARGS)
- virtual_tables = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
-
- statements = []
- primary_key = "mid"
- """创建 FTS5 虚拟表
- -- content=table_name 指明关联的原表
- -- content_rowid=mid 指明原表的行 ID 列是 mid
- -- segmented 是我们要索引的列
- -- tokenize='unicode61' 使用 unicode61 分词器
- """
- if f"fts_{slim_cid}" not in virtual_tables:
- sql = f"""CREATE VIRTUAL TABLE IF NOT EXISTS fts_{slim_cid} USING fts5(segmented, content="{table_name}", content_rowid={primary_key}, tokenize="unicode61");"""
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- """将现有数据从原表复制到 FTS 表
- 注意, 我们在这里插入的是 rowid (它会对应到 content_rowid=mid 指定的列) 和 segmented
- 从原表中选择 mid 和 segmented 列。mid 列的值会被插入到 FTS 表中对应原表 rowid (或 content_rowid) 的位置。
- """
- sql = f"INSERT INTO fts_{slim_cid} (rowid, segmented) SELECT {primary_key}, segmented FROM '{table_name}' WHERE {primary_key} NOT IN (SELECT rowid FROM fts_{slim_cid});"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- # 列出所有触发器
- resp = await turso_exec([{"type": "execute", "stmt": {"sql": 'SELECT name FROM sqlite_master WHERE type="trigger";'}}], silent=True, **TURSO_KWARGS)
- triggers = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
- """维护 FTS 表
- 为了让 FTS 表与原表保持同步, 需要在原表上创建触发器。
- 在原表插入、删除、更新时, 同步更新 FTS 表
- """
- # 创建触发器, 在原表插入数据时, 同步从 FTS 表插入
- if f"trigger_{slim_cid}_ai" not in triggers:
- sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_ai AFTER INSERT ON '{table_name}' BEGIN INSERT INTO fts_{slim_cid} (rowid, segmented) VALUES (NEW.{primary_key}, NEW.segmented); END;"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- # 创建触发器, 在原表删除数据时, 同步从 FTS 表删除
- if f"trigger_{slim_cid}_ad" not in triggers:
- sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_ad AFTER DELETE ON '{table_name}' BEGIN DELETE FROM fts_{slim_cid} WHERE rowid = OLD.{primary_key}; END;"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
-
- # 创建触发器, 在原表更新数据时, 同步更新 FTS 表
- # FTS5 的更新通常是先删除旧的, 再插入新的
- if f"trigger_{slim_cid}_au" not in triggers:
- sql = f"CREATE TRIGGER IF NOT EXISTS trigger_{slim_cid}_au AFTER UPDATE ON '{table_name}' BEGIN DELETE FROM fts_{slim_cid} WHERE rowid = OLD.{primary_key} AND OLD.segmented <> NEW.segmented; INSERT INTO fts_{slim_cid} (rowid, segmented) SELECT NEW.{primary_key}, NEW.segmented WHERE OLD.segmented <> NEW.segmented; END;"
- statements.append({"type": "execute", "stmt": {"sql": sql}})
- await turso_exec(statements, silent=True, **TURSO_KWARGS)