Commit c81953f
src/history/query.py
@@ -9,10 +9,10 @@ from pyrogram.client import Client
from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
from pyrogram.types import Message
-from config import PREFIX, TZ, cutter
+from config import PREFIX, TZ, cache, cutter
from database.turso import turso_exec, turso_parse_resp
-from history.turso import get_turso_chatinfo, save_chatinfo_to_turso
-from history.utils import TURSO_KWARGS, check_save_history, get_chat, get_uid_by_username, is_admin, list_chat_ids
+from history.turso import get_turso_chatinfo, get_user, save_chatinfo_to_turso
+from history.utils import TURSO_KWARGS, check_save_history, get_chat, is_admin, list_chat_ids
from llm.utils import convert_html
from messages.parser import parse_chat, parse_msg
from messages.progress import modify_progress
@@ -20,7 +20,7 @@ from messages.sender import send2tg
from messages.utils import blockquote, equal_prefix, smart_split, startswith_prefix
from others.emoji import MTYPE_EMOJI
from publish import publish_telegraph
-from utils import myself, nowstr, slim_cid, strings_list
+from utils import myself, nowstr, slim_cid, strings_list, to_int
HELP = f"""🗣**查询当前对话聊天记录**
`/hist` 使用说明:
@@ -49,7 +49,7 @@ HELP = f"""🗣**查询当前对话聊天记录**
async def query_chat_history(client: Client, message: Message, **kwargs):
- info = parse_msg(message, silent=True)
+ info = parse_msg(message, silent=True, use_cache=False)
admin_call = is_admin(info["uid"])
if not check_save_history(info["ctype"], info["cid"]) and not admin_call: # save history is disabled for this chat
return
@@ -230,3 +230,16 @@ async def query_turso(client: Client, cinfo: dict[str, str], match_time: str, us
texts += f"\n👤[{username}]({url}) {row['time']}{emoji}:\n{content}\n"
count += 1
return {"texts": texts.strip(), "full_texts": full_texts.strip(), "count": count}
+
+
+async def get_uid_by_username(client: Client, chat_id: str | int, username: str) -> int:
+ """Get Telegram user id by username.
+
+ Support formats of `username`:
+ handle (a-z, A-Z, 0-9, _)
+ """
+ if cache.get(f"get_uid_by_username-{chat_id}-{username}"):
+ return cache.get(f"get_uid_by_username-{chat_id}-{username}")
+ user = await get_user(client, to_int(username), chat_id)
+ cache.set(f"get_uid_by_username-{username}", user.id, ttl=0)
+ return user.id
src/history/turso.py
@@ -10,11 +10,12 @@ from zoneinfo import ZoneInfo
from glom import Coalesce, flatten, glom
from loguru import logger
from pyrogram.client import Client
-from pyrogram.types import Message
+from pyrogram.errors import PeerIdInvalid, UsernameNotOccupied
+from pyrogram.types import Message, User
from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
-from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, TURSO_KWARGS, USER_COLUMNS, USER_INDEXES, check_save_history, fine_grained_check, get_chat
+from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, TURSO_KWARGS, USER_COLUMNS, USER_INDEXES, check_save_history, fine_grained_check, get_chat, get_user_from_chat
from messages.parser import parse_chat, parse_msg
from utils import i_am_bot, nowdt, slim_cid, to_int, true
@@ -137,7 +138,7 @@ async def backup_chat_history_to_turso(
resp = await turso_exec(statements, silent=True, retry=2, **TURSO_KWARGS)
num_success = sum([1 for x in glom(resp, "results.*.type", default=[]) if x == "ok"]) - 1
if sync_ids := glom(resp, "results.**.last_insert_rowid", default=[0]):
- logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}")
+ logger.success(f"Synced {num_success} messages to Turso, {min(sync_ids)} -> {max(sync_ids)}. {info['time']}") # type: ignore
async def upload_exported_history_to_turso(client: Client, path: str | Path | None = None) -> None:
@@ -383,3 +384,40 @@ async def save_userinfo_to_turso(client: Client, minfo: dict) -> dict[str, str]:
cache.set(f"userinfo-{uid}-{cid}", records, ttl=0)
await turso_exec([insert_statement("userinfo", records, update_on_conflict="id")], retry=2, **TURSO_KWARGS)
return records
+
+
+async def get_user(client: Client, uid: int | str, cid: int | str = "") -> User:
+ try:
+ user = await client.get_users(to_int(uid))
+ if isinstance(user, User):
+ return user
+ except (PeerIdInvalid, UsernameNotOccupied):
+ user = await get_user_from_chat(client, uid, cid)
+ if user.id == 0: # this uid is not in this chat
+ users = await get_userinfo_by_uid(uid, cid)
+ for user_id, chat_id in users: # check if this user is still in this chat
+ user = await get_user_from_chat(client, user_id, chat_id)
+ if user.id != 0:
+ return user
+ return User(id=0)
+
+
+async def get_userinfo_by_uid(uid: int | str, cid: int | str = "") -> list[tuple[int, int]]:
+ """Get user info by uid from turso.
+
+ Returns:
+ [(uid, cid)]
+ """
+ uid = to_int(uid)
+ cond = f"uid = {uid}" if isinstance(uid, int) else f"handle = '{uid}' OR name = '{uid}'"
+ resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT cid,uid FROM userinfo WHERE {cond};"}}], retry=2, silent=True, **TURSO_KWARGS)
+ parsed = turso_parse_resp(resp)
+ if cid:
+ parsed = [x for x in parsed if slim_cid(x["cid"]) == slim_cid(cid)]
+ res = []
+ for info in parsed:
+ cid = int(info["cid"])
+ if chat := await get_turso_chatinfo(cid):
+ real_cid = int(cid) if chat["ctype"] in ["PRIVATE", "BOT"] else int(f"-100{cid}")
+ res.append((int(info["uid"]), real_cid))
+ return res
src/history/utils.py
@@ -2,12 +2,11 @@
# -*- coding: utf-8 -*-
import contextlib
import os
-import string
from loguru import logger
from pyrogram.client import Client
from pyrogram.errors import PeerIdInvalid
-from pyrogram.types import Chat, ChatMember, Message
+from pyrogram.types import Chat, Message, User
from config import DB, HISTORY, TID, cache
from database.turso import turso_exec, turso_parse_resp
@@ -137,20 +136,15 @@ def is_admin(uid: int) -> bool:
return any(slim_cid(admin) == slim_cid(uid) for admin in strings_list(TID.HISTORY_ADMIN))
-async def get_uid_by_username(client: Client, chat_id: str | int, username: str) -> int:
- """Get Telegram user id by username.
-
- Support formats of `username`:
- handle (a-z, A-Z, 0-9, _)
- """
- if cache.get(f"get_uid_by_username-{chat_id}-{username}"):
- return cache.get(f"get_uid_by_username-{chat_id}-{username}")
- if all(x in list(string.digits) + list(string.ascii_letters) for x in username):
- logger.debug(f"Getting uid by username: {username}")
- with contextlib.suppress(Exception):
- member = await client.get_chat_member(to_int(chat_id), to_int(username))
- if isinstance(member, ChatMember):
- cache.set(f"get_uid_by_username-{username}", member.user.id, ttl=0)
- return member.user.id
- cache.set(f"get_uid_by_username-{chat_id}-{username}", 0, ttl=0)
- return 0
+async def get_user_from_chat(client: Client, uid: int | str, cid: int | str) -> User:
+ user = User(id=0)
+ try: # get chat member directly
+ chat_member = await client.get_chat_member(to_int(cid), to_int(uid))
+ user = chat_member.user
+ except Exception:
+ with contextlib.suppress(Exception): # get chat member from chat members
+ async for member in client.get_chat_members(to_int(cid)): # type: ignore
+ if member.user.id == to_int(uid) or member.user.username == to_int(uid):
+ user = member.user
+ break
+ return user
src/quotly/quotly.py
@@ -10,11 +10,12 @@ from pyrogram.client import Client
from pyrogram.types import Message
from config import PREFIX
+from history.turso import get_user
from messages.sender import send2tg
from messages.utils import equal_prefix, set_reaction, startswith_prefix
from quotly.api import generate_from_api
from quotly.pillow import generate_from_pillow
-from quotly.utils import download_avatar, get_user
+from quotly.utils import download_avatar
from utils import to_int
HELP = f"""↪️**引用语录**
src/quotly/utils.py
@@ -1,6 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import contextlib
import string
from pathlib import Path
@@ -8,16 +7,10 @@ from loguru import logger
from PIL import Image, ImageDraw
from PIL.ImageFont import FreeTypeFont
from pyrogram.client import Client
-from pyrogram.errors import PeerIdInvalid, UsernameNotOccupied
-from pyrogram.types import User
from pyuegc import EGC
from config import DOWNLOAD_DIR
-from database.turso import turso_exec, turso_parse_resp
-from history.turso import get_turso_chatinfo
-from history.utils import TURSO_KWARGS
from quotly.fonts import EMOJI_CMAP, MATH_CMAP, SYMBOL_CMAP, TEXT_CMAP
-from utils import to_int
async def download_avatar(client: Client, avatar_id: str) -> str:
@@ -30,50 +23,6 @@ async def download_avatar(client: Client, avatar_id: str) -> str:
return avatar_path
-async def get_user(client: Client, uid: int | str, cid: int | str = "") -> User:
- try:
- user = await client.get_users(to_int(uid))
- if isinstance(user, User):
- return user
- except (PeerIdInvalid, UsernameNotOccupied):
- user = await get_user_from_chat(client, uid, cid)
- if user.id == 0: # this uid is not in this chat
- chat_ids = await get_cid_by_uid_from_turso(uid)
- for chat_id in chat_ids:
- user = await get_user_from_chat(client, uid, chat_id)
- if user.id != 0:
- return user
- return User(id=0)
-
-
-async def get_user_from_chat(client: Client, uid: int | str, cid: int | str) -> User:
- user = User(id=0)
- try: # get chat member directly
- chat_member = await client.get_chat_member(to_int(cid), to_int(uid))
- user = chat_member.user
- except Exception:
- with contextlib.suppress(Exception): # get chat member from chat members
- async for member in client.get_chat_members(to_int(cid)): # type: ignore
- if member.user.id == to_int(uid) or member.user.username == to_int(uid):
- user = member.user
- break
- return user
-
-
-async def get_cid_by_uid_from_turso(uid: int | str) -> list[int]:
- """Get chat id by uid from turso."""
- chat_ids = []
- uid = to_int(uid)
- match_key = "uid" if isinstance(uid, int) else "handle"
- resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT cid FROM userinfo WHERE {match_key}={uid};"}}], retry=2, silent=True, **TURSO_KWARGS)
- slim_chat_ids = [x["cid"] for x in turso_parse_resp(resp)]
- for slim_cid in slim_chat_ids:
- if chat := await get_turso_chatinfo(slim_cid):
- real_cid = int(slim_cid) if chat["ctype"] in ["PRIVATE", "BOT"] else int(f"-100{slim_cid}")
- chat_ids.append(real_cid)
- return chat_ids
-
-
def get_name_color(uid: int = 0) -> str:
color = [
"#FF8E86", # red