Commit 34906b1

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-02-08 07:55:05
refactor(bridge): save metadata into cache instead of message body
1 parent 55ac784
src/bridge/chartimg.py
@@ -1,15 +1,16 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
+import contextlib
+
 from loguru import logger
 from pyrogram.client import Client
-from pyrogram.types import Message
+from pyrogram.types import Message, ReplyParameters
 
-from bridge.utils import forward_bot_message
 from config import cache
 from messages.parser import parse_msg
 from utils import i_am_bot, to_int
 
-CHART_BOT = "chartImgOpnBot"
+CHART_BOT = "chartImgBot"
 
 
 @cache.memoize(ttl=10)
@@ -19,8 +20,8 @@ async def send_to_chartimg_bridge(client: Client, message: Message, symbol: str,
     Args:
         target_chat (int | str, optional): Send result to this telegram target chat. If not set, send to the trigger message's chat.
         reply_msg_id (int, optional): If set to integer > 0, the result is sent as a reply message to this message_id.
-                                             If set to 0, reply to the trigger message itself.
-                                             If set to -1, do not send as a reply message.
+                                      If set to 0, reply to the trigger message itself.
+                                      If set to -1, do not send as a reply message.
     """
     if await i_am_bot(client):  # bot can't send message to other bots
         return
@@ -32,10 +33,10 @@ async def send_to_chartimg_bridge(client: Client, message: Message, symbol: str,
         target_mid = None
     else:
         target_mid = to_int(reply_msg_id)
-    metadata = {"target_cid": target_cid, "target_mid": target_mid, "src": f"{symbol} {interval}"}
-    cache.set(f"bridge-{symbol} {interval}", metadata, ttl=60)  # save metadata to cache
-    logger.warning(f"Trying chartimg bridge (@{CHART_BOT}): {symbol} {interval}")
-    await client.send_message(chat_id=f"@{CHART_BOT}", text=f"/chart {symbol} {interval}")
+    params = {"target_cid": target_cid, "target_mid": target_mid, "text": f"{symbol} {interval}"}
+    cache.set(f"bridge-{params['text']}", params, ttl=60)  # save params to cache
+    logger.warning(f"Trying chartimg bridge (@{CHART_BOT}): {params['text']}")
+    await client.send_message(chat_id=f"@{CHART_BOT}", text=f"/chart {params['text']}")
 
 
 @cache.memoize(ttl=10)
@@ -44,8 +45,21 @@ async def forward_chartimg_results(client: Client, message: Message):
     if message.from_user.username != CHART_BOT or not message.photo:
         return
 
+    info = parse_msg(message)
     # got a photo message, format:
     # [kline chart]\n{symbol} {interval}
-    info = parse_msg(message)
-    if metadata := cache.get(f"bridge-{info['text']}"):
-        await forward_bot_message(client, message, metadata)
+    if not cache.get(f"bridge-{info['text']}"):
+        return
+    params = cache.get(f"bridge-{info['text']}")
+
+    logger.info(f"Forwarding tradingview chart @{info['handle']} -> chat={params['target_cid']}, id={params['target_mid']}")
+    await client.copy_message(
+        chat_id=params["target_cid"],
+        from_chat_id=message.chat.id,
+        message_id=message.id,
+        reply_parameters=ReplyParameters(message_id=params["target_mid"]),  # type: ignore
+    )
+    cache.delete(f"bridge-{params['text']}")
+    with contextlib.suppress(Exception):
+        if params.get("prog_cid") and params.get("prog_mid"):
+            await client.delete_messages(chat_id=params["prog_cid"], message_ids=params["prog_mid"])
src/bridge/ocr.py
@@ -21,6 +21,12 @@ async def send_to_ocr_bridge(client: Client, message: Message, **kwargs):
     """Send photo to miaomiao bot for OCR.
 
     See docs in `bridge/README.md` for details.
+
+    kwargs:
+    target_chat (int | str, optional): Send result to this telegram target chat. If not set, send to the trigger message's chat.
+    reply_msg_id (int, optional): If set to integer > 0, the result is sent as a reply message to this message_id.
+                                            If set to 0, reply to the trigger message itself.
+                                            If set to -1, do not send as a reply message.
     """
     if not ENABLE.OCR:
         return
@@ -46,8 +52,8 @@ async def send_to_ocr_bridge(client: Client, message: Message, **kwargs):
     file_id = info["file_id"]
 
     cid = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id  # MSG-A's cid
-    mid = kwargs.get("reply_msg_id", message.id)  # MSG-A's mid
-    msg += f" \n#ID=({cid},{mid})".replace("None", "0")
+    mid = kwargs["reply_msg_id"] if kwargs.get("reply_msg_id") else message.id  # MSG-A's mid
+    msg += f" \n#ID=({cid},{mid})".replace("None", "-1")
     logger.warning(f"OCR via 妙妙小工具 (@{OCR_BOT}): {msg!r}")
     await client.send_photo(chat_id=f"@{OCR_BOT}", photo=file_id, caption=msg)
 
@@ -70,7 +76,7 @@ async def forward_ocr_results(client: Client, message: Message):
 
     if matched := re.search(r"#ID=\((-?\d+),(-?\d+)\)", reply_info["text"]):
         target_cid = matched.group(1)  # MSG-A's cid
-        target_mid = int(matched.group(2)) if int(matched.group(2)) != 0 else None  # MSG-A's mid
+        target_mid = int(matched.group(2)) if int(matched.group(2)) != -1 else None  # MSG-A's mid
         cid = message.chat.id  # result's cid
         mid = message.id  # result's mid
         logger.info(f"Forwarding chat=@{OCR_BOT}, id={mid} -> chat={target_cid}, id={target_mid}")
src/bridge/README.md
@@ -14,41 +14,39 @@ mid: message_id
 
 2. We first reply to MSG-A with a progress updating message, named MSG-UPDATE. (Like "Processing your request...")
 
-3. Then we construct a special message (named MSG-B) which can trigger the task of the third-party bot (bridgebot).
+3. Then we save necessary info into cache.
 
-   We log the `cid` and `mid` of MSG-A into MSG-B in this format: #ID=(MSG-A's cid,MSG-A's mid)
+   - log the `cid` and `mid` of MSG-A
+   - log the `cid` and `mid` of MSG-UPDATE
 
-   And log the `cid` and `mid` of MSG-UPDATE into MSG-B in this format: #PROGRESS=(MSG-UPDATE's cid,MSG-UPDATE's mid)
+   More info can also be logged like `url`, ...
 
-   More info can also be logged into MSG-B, like #URL, ...
-
-4. Send MSG-B to bridgebot to finish the task.
+4. Send a message that can trigger the task of the third-party bot (bridgebot).
 
 5. After the task is finished, the bridgebot will send us a new message containing the task results, named MSG-RES.
 
-   So, we can retrieve the last message sent from us (MSG-B) to parse the #ID (MSG-A) and #PROGRESS (MSG-UPDATE) info, and then forward MSG-RES to MSG-A.
+6. We retrieve the logged info from cache and forward MSG-RES to MSG-A.
 
-6. Finally, we delete MSG-UPDATE.
+7. Finally, we delete MSG-UPDATE.
 
 ## Example
 
-For example, a user send us a douyin link which need to previewed. MSG-A `cid=-1001234455, mid=2385`
+For example, a user send us "/price AAPL @15m" to get tradingview chart. (MSG-A)
+
+We want to use @chartImgBot to finish this task:
 
-We want to use @ParsehubBot to finish this task.
+- (Optional) We reply to MSG-A with a progress updating message. (MSG-UPDATE)
 
-First, we reply to MSG-A with a progress updating message. MSG-UPDATE `cid=-1001234455, mid=2386`
+- We save the info into cache:
 
-Then we construct a special message (MSG-B) which can trigger the function of @ParsehubBot:
+  - log the `cid` and `mid` of MSG-A
+  - log the `cid` and `mid` of MSG-UPDATE
+  - log the `symbol` and `interval`
 
-```txt
-#URL=( https://v.douyin.com/helloworld )
-#ID=(-1001234455,2385)
-#PROGRESS=(-1001234455,2386)
-```
+- Send a message that can trigger the function of @chartImgBot: `/chart NASDAQ:AAPL 15m`
 
-Send MSG-B to @ParsehubBot to finish the task.
+After the task is finished, @chartImgBot will send us the chart image, named MSG-RES
 
-After the task is finished, @ParsehubBot will send us the douyin video, named MSG-RES
-We retrieve the chat history of @ParsehubBot to get last message sent from us (MSG-B) and parse it to get `cid=-1001234455, mid=2385`, and then forward MSG-RES to MSG-A.
+We retrieve the logged info from cache and check if this photo is the symbol and interval we want. If so, forward MSG-RES to MSG-A.
 
-Finally, we delete MSG-UPDATE `cid=-1001234455, mid=2386`.
+Finally, we delete MSG-UPDATE.
src/bridge/social.py
@@ -1,12 +1,14 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
+import asyncio
+import contextlib
+import random
 
 from loguru import logger
 from pyrogram.client import Client
-from pyrogram.types import Message
+from pyrogram.types import Message, ReplyParameters
 
-from bridge.utils import extract_forwarding_params, forward_bot_message, get_recent_msg_from_me
 from config import cache
 from messages.parser import parse_msg
 from utils import i_am_bot
@@ -27,43 +29,65 @@ async def send_to_social_media_bridge(client: Client, message: Message, url: str
     """See docs in `bridge/README.md` for details."""
     if await i_am_bot(client):  # bot can't send message to other bots
         return
-    cid = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id  # MSG-A's cid
-    # set MSG-A's mid
-    if not kwargs.get("reply_msg_id") or str(kwargs.get("reply_msg_id")) == "0":
-        mid = message.id
-    elif str(kwargs.get("reply_msg_id")) == "-1":
-        mid = None
-    else:
-        mid = kwargs["reply_msg_id"]
-    msg = f"#SRC=( {url} ) \n#ID=({cid},{mid})".replace("None", "0")
+
+    params = {
+        "target_cid": kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id,  # MSG-A's cid
+        "target_mid": None,  # disable reply, because the reply message may be sent as a series of messages.
+        "url": url,
+    }
 
     # add progress message
-    if prog := kwargs.get("progress"):
-        msg += f"\n#PROGRESS=({prog.chat.id},{prog.id})"
-    cache.set(f"bridge-{url}", "pending", ttl=1200)  # save to cache
+    if (prog := kwargs.get("progress")) and isinstance(prog, Message):
+        params["prog_cid"] = prog.chat.id
+        params["prog_mid"] = prog.id
+
+    # save a global flag to cache, if any bot is processing this url, set this the the bot name
+    # ! Warning: currently we can only handle single url at a time
+    cache.set("social-global", "bot_name", ttl=120)
+
     for bot, platforms in SOCIAL_BOTS.items():
         if platform in platforms:
-            logger.warning(f"Trying {platform} bridge (@{bot}): {msg!r}")
-            await client.send_message(chat_id=f"@{bot}", text=msg)
+            logger.warning(f"Trying {platform} bridge (@{bot}): {url}")
+            cache.set(f"social-{bot}", params, ttl=120)  # save params to cache for each bot
+            await client.send_message(chat_id=f"@{bot}", text=url)
 
 
 @cache.memoize(ttl=10)
 async def forward_social_media_results(client: Client, message: Message):
-    """See docs in `bridge/README.md` for details."""
+    """See docs in `bridge/README.md` for details.
+
+    Note, we may receive a series of messages from different bots
+    First, we need to check the global flag in the cache. Format: social-global-{url}
+    Then, we need to check the cache for each bot. Format: social-individual-{bot}-{url}
+    """
     if message.from_user.username not in SOCIAL_BOTS or not message.media:
         return
     #  got a media message
     info = parse_msg(message)
 
-    # Process reply-to messages
-    if message.reply_to_message:
-        params = extract_forwarding_params(str(message.reply_to_message.text))
-        if params and cache.get(f"bridge-{params['src']}"):
-            await forward_bot_message(client, message, params)
-            return
-
-    # Process messages not in reply context
-    my_msg = await get_recent_msg_from_me(client, info["handle"], info["uid"])
-    params = extract_forwarding_params(my_msg)
-    if params and cache.get(f"bridge-{params['src']}"):
-        await forward_bot_message(client, message, params)
+    bot_name = cache.get("social-global", default="")
+    if bot_name == "bot_name":  # this bot is the first one to get the results
+        logger.success(f"Bridge @{info['handle']} is the first bot to get a {info['mtype']}")
+        cache.set("social-global", info["handle"], ttl=120)
+    elif bot_name != info["handle"]:  # already processed by other bot
+        return
+
+    params = cache.get(f"social-{info['handle']}")
+    if not params:
+        return
+
+    if not message.media_group_id:  # single media, send diectly
+        logger.info(f"Forwarding {info['mtype']} from @{bot_name} -> chat={params['target_cid']}, id={params['target_mid']}")
+        await client.copy_message(chat_id=params["target_cid"], from_chat_id=info["cid"], message_id=info["mid"], reply_parameters=ReplyParameters(message_id=params["target_mid"]))
+
+    # Media group. We will receive multiple messages at nearly the same time
+    # Send media_group only once
+    if not cache.get(f"social-{bot_name}-{message.media_group_id}"):
+        cache.set(f"social-{bot_name}-{message.media_group_id}", info["mid"], ttl=120)  # this may be overwritten by the next message of same media_group
+        await asyncio.sleep(1 + random.random())  # wait for other messages
+        if mid := cache.get(f"social-{bot_name}-{message.media_group_id}"):
+            logger.info(f"Forwarding media_group from @{bot_name} -> chat={params['target_cid']}, id={params['target_mid']}")
+            await client.copy_media_group(chat_id=params["target_cid"], from_chat_id=info["cid"], message_id=mid, reply_parameters=ReplyParameters(message_id=params["target_mid"]))
+    with contextlib.suppress(Exception):
+        if params.get("prog_cid") and params.get("prog_mid"):
+            await client.delete_messages(chat_id=params["prog_cid"], message_ids=params["prog_mid"])
src/bridge/utils.py
@@ -1,11 +1,7 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
-import contextlib
-import re
 
-from loguru import logger
 from pyrogram.client import Client
-from pyrogram.types import Message, ReplyParameters
 
 from config import cache
 from utils import to_int
@@ -28,45 +24,3 @@ async def get_recent_msg_from_me(client: Client, chat_id: int | str, opponent_id
             if hit == idx:
                 return message.text or message.caption or ""
     return ""
-
-
-def extract_forwarding_params(msg_text: str) -> dict:
-    """Extract target chat ID, message ID, and SRC from message text."""
-    params = {}
-    id_match = re.search(r"#ID=\((-?\d+),(\d+)\)", msg_text)
-    src_match = re.search(r"#SRC=\( (.*?) \)", msg_text)
-    if id_match and src_match:
-        params = {
-            "target_cid": id_match.group(1),
-            "target_mid": int(id_match.group(2)) if int(id_match.group(2)) != 0 else None,
-            "src": src_match.group(1),
-        }
-    if prog_match := re.search(r"#PROGRESS=\((-?\d+),(\d+)\)", msg_text):
-        params["prog_cid"] = int(prog_match.group(1))
-        params["prog_mid"] = int(prog_match.group(2)) if int(prog_match.group(2)) != 0 else None
-    return params
-
-
-async def forward_bot_message(client: Client, message: Message, params: dict):
-    """Forward the message to the target chat and delete the pending cache."""
-    logger.info(f"Forwarding chat=@{message.from_user.username}, id={message.id} -> chat={params['target_cid']}, id={params['target_mid']}")
-    if message.media_group_id and not cache.get(f"bridge-{params['src']}-{message.media_group_id}"):
-        # send media_group only once
-        cache.set(f"bridge-{params['src']}-{message.media_group_id}", "1", ttl=120)
-        await client.copy_media_group(
-            chat_id=params["target_cid"],
-            from_chat_id=message.chat.id,
-            message_id=message.id,
-            reply_parameters=ReplyParameters(message_id=params["target_mid"]),  # type: ignore
-        )
-    elif cache.get(f"bridge-{params['src']}"):
-        await client.copy_message(
-            chat_id=params["target_cid"],
-            from_chat_id=message.chat.id,
-            message_id=message.id,
-            reply_parameters=ReplyParameters(message_id=params["target_mid"]),  # type: ignore
-        )
-    cache.delete(f"bridge-{params['src']}")
-    with contextlib.suppress(Exception):
-        if params.get("prog_cid") and params.get("prog_mid"):
-            await client.delete_messages(chat_id=params["prog_cid"], message_ids=params["prog_mid"])