main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3import argparse
  4import asyncio
  5import json
  6import logging
  7import os
  8import platform
  9import random
 10import sys
 11from json import JSONDecodeError
 12from urllib.parse import urlparse
 13
 14from apscheduler.schedulers.asyncio import AsyncIOScheduler
 15from loguru import logger
 16from pyrogram import filters
 17from pyrogram.client import Client
 18from pyrogram.sync import idle
 19from pyrogram.types import LinkPreviewOptions, Message
 20
 21from ai.chat_summary import daily_summary
 22from ai.utils import clean_anthropic_files, clean_gemini_files
 23from bridge.chartimg import forward_chartimg_results
 24from bridge.ocr import forward_ocr_results
 25from bridge.social import forward_social_media_results
 26from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
 27from custom.ai_news import daily_ainews
 28from custom.config import ACCOUNT_NAME
 29from custom.cyf_greeting import cyf_greeting
 30from custom.cyf_quote import fafa_quote
 31from custom.cyf_twitter_rss import fafa_twitter_rss
 32from custom.d1_daily_backup_msg import daily_backup_history_to_d1
 33from custom.del_msg import del_unwanted_message
 34from custom.dnkt_attendance import dnkt_attendance
 35from custom.dnkt_email import dnkt_email
 36from custom.email2md import eml2md
 37from custom.events import lottery
 38from custom.history_alias import tg_history_alias
 39from custom.lilaoshi import handle_lilaoshi, preview_lilaoshi_history_message
 40from custom.link_extract import link_extract
 41from custom.msg_backup import message_backup
 42from custom.readhub import readhub
 43from custom.restart import restart_bot
 44from custom.rss import update_rss
 45from custom.summary_video import summary_videos
 46from custom.sync_youtube import sync_youtube
 47from custom.tempmail import tempmail
 48from danmu.sync import sync_livechats
 49from database.r2 import clean_r2_expired
 50from emby.checkin import checkin_emby, daily_checkin_emby
 51from emby.keepalive import keepalive_emby
 52from emby.main import emby_entrypoint
 53from emby.register import emby_register
 54from history.sync import backup_chat_history, sync_chat_history
 55from messages.main import process_message
 56from messages.modify import message_modify
 57from messages.parser import parse_msg
 58from messages.utils import delete_message, startswith_prefix
 59from permission import check_permission
 60from podcast.main import summary_pods
 61from price.entrypoint import match_symbol_category
 62from quotly.quotly import quote_message
 63from utils import cleanup_old_files, to_int
 64
 65
 66async def main():
 67    app = Client(
 68        "bot",
 69        session_string=session_string,
 70        in_memory=True,
 71        proxy=proxy,
 72        device_model=DEVICE_NAME,  # A friendly name can be viewed in "Active Sessions" in Telegram settings
 73        app_version=f"{Client.APP_VERSION}, Python {platform.python_version()}",
 74        skip_updates=False,  # handle messages while client is offline
 75        fetch_replies=-1,  # fetch all replies
 76        link_preview_options=LinkPreviewOptions(is_disabled=True),
 77        max_concurrent_transmissions=2,
 78        max_business_user_connection_cache_size=100,  # reduce memory usage
 79        max_message_cache_size=100,  # reduce memory usage
 80    )
 81
 82    @app.on_message(filters.group)
 83    async def groups(client: Client, message: Message):
 84        permission = await check_permission(client, message)
 85        if permission["disabled"]:
 86            return
 87        await process_message(client, message, **permission)
 88
 89    @app.on_message(filters.channel)
 90    async def channels(client: Client, message: Message):
 91        permission = await check_permission(client, message)
 92        if permission["disabled"]:
 93            return
 94        await process_message(client, message, **permission)
 95
 96    @app.on_message(filters.bot)
 97    async def bots(client: Client, message: Message):
 98        permission = await check_permission(client, message)
 99        if permission["disabled"]:
100            return
101        parse_msg(message, verbose=True)
102        await forward_social_media_results(client, message)
103        await forward_ocr_results(client, message)
104        await forward_chartimg_results(client, message)
105        await process_message(client, message, **permission)
106
107    # filters.private = {user chats + bot chats}
108    # so the private handler should be placed after the bot handler
109    @app.on_message(filters.private)
110    async def private(client: Client, message: Message):
111        ctype = message.chat.type.name if message.chat and message.chat.type else ""
112        if ctype == "BOT":
113            await bots(client, message)  # handle bot messages
114            return
115        permission = await check_permission(client, message)
116        if permission["disabled"]:
117            return
118        parse_msg(message, verbose=True)
119        await process_message(client, message, **permission)
120
121    @app.on_message(group=1)
122    @app.on_edited_message(group=1)
123    @app.on_deleted_messages(group=1)
124    async def on_changed(client: Client, message: Message | list[Message]):
125        await sync_chat_history(client, message)
126        if isinstance(message, Message):
127            await checkin_emby(client, message)
128
129    @app.on_message(group=2)
130    async def custom(client: Client, message: Message):
131        message = message_modify(message)
132        if ACCOUNT_NAME == "benny":
133            await quote_message(client, message)
134            await emby_entrypoint(message)
135            await fafa_quote(client, message)
136            if startswith_prefix(message.content, ["/save"]):
137                await delete_message(message)
138            await tempmail(client, message)
139
140        if ACCOUNT_NAME == "xiaohao":
141            await cyf_greeting(client, message)
142            await handle_lilaoshi(client, message)  # 李老师不是你老师
143            # await chenyifa_social_rss(client, message)  # CYF社媒追踪
144
145        await restart_bot(message)
146        await lottery(client, message)
147        await emby_register(client, message)
148        await tg_history_alias(client, message)
149        await summary_videos(client, message)  # 自动总结视频
150        await message_backup(client, message)
151        await del_unwanted_message(client, message)
152        await eml2md(client, message)
153        await link_extract(client, message)
154
155    if ENABLE.CRONTAB:
156        scheduler = AsyncIOScheduler(timezone=TZ)
157        # scheduler.add_job(cron_secondly, "interval", args=[app], seconds=1)
158        scheduler.add_job(cron_minutely, "interval", args=[app], minutes=1)
159        scheduler.add_job(cron_hourly, "cron", args=[app], minute=0)
160        scheduler.add_job(cron_daily, "cron", args=[app], hour=6, minute=40, jitter=4800)
161        logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
162        scheduler.start()
163
164    await app.start()
165    await idle()
166    await app.stop()
167
168
169# async def cron_secondly(client: Client):
170#     pass
171
172
173async def cron_minutely(client: Client):
174    cache.evict()  # delete expired cache
175    cleanup_old_files()
176    await backup_chat_history(client)
177    await daily_backup_history_to_d1(client)
178    if ACCOUNT_NAME == "benny":
179        pass
180    elif ACCOUNT_NAME == "xiaohao":
181        await dnkt_attendance(client)
182        await fafa_twitter_rss(client)
183    elif ACCOUNT_NAME == "bot":
184        await dnkt_email(client)
185        await readhub()
186        await sync_youtube(client)
187        await update_rss(client)
188        await keepalive_emby()
189
190
191async def cron_hourly(client: Client):
192    await daily_summary(client)
193    await sync_livechats()
194    if ENABLE.CACHE_PRICE_SYMBOLS:
195        await match_symbol_category()  # to cache all supported symbols
196    await summary_pods(client)
197    if ACCOUNT_NAME == "xiaohao":
198        await preview_lilaoshi_history_message(client)  # 解析李老师遗漏的历史消息
199        await clean_r2_expired()
200        await clean_anthropic_files()
201        await clean_gemini_files()
202    if ACCOUNT_NAME == "bot":
203        await daily_ainews()
204
205
206async def cron_daily(client: Client):
207    # send daliy messages
208    await daily_checkin_emby(client)
209    try:
210        daliy = json.loads(DAILY_MESSAGES)
211        for chat_id, msg in daliy.items():
212            logger.info(f"Sending daily message to {chat_id}: {msg}")
213            await client.send_message(to_int(chat_id), msg)
214            await asyncio.sleep(random.randint(3, 8))
215    except (JSONDecodeError, TypeError):
216        logger.warning(f"Invalid DAILY_MESSAGES: {DAILY_MESSAGES}")
217    except Exception as e:
218        logger.warning(f"Error sending daily message: {e}")
219
220
221if __name__ == "__main__":
222    parser = argparse.ArgumentParser()
223    parser.add_argument("--log-level", type=str, help="Log level")
224    parser.add_argument("--session-str", type=str, help="Telegram SESSION STRING")
225    parser.add_argument("--proxy", type=str, help="Telegram proxy (e.g. socks5://127.0.0.1:7890)")
226    args = parser.parse_args()
227
228    logger.remove()  # Remove default handler.
229    logger.add(
230        sys.stderr,
231        level=args.log_level.upper() if args.log_level else os.getenv("LOG_LEVEL", "TRACE").upper(),
232        colorize=True,
233        backtrace=True,
234        diagnose=True,
235        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green>| <level>{level: <7}</level> |<cyan>{name: <12}</cyan>:<cyan>{function: ^20}</cyan>:<cyan>{line: >4}</cyan> - <level>{message}</level>",
236    )
237    # settings
238    session_string = args.session_str or TOKEN.SESSION_STRING
239    if not session_string:
240        logger.error("No session string, you should run python scripts/auth.py first")
241        os._exit(1)
242
243    if args.proxy or PROXY.TELEGRAM:
244        info = urlparse(args.proxy) if args.proxy else urlparse(PROXY.TELEGRAM)
245        proxy = {"scheme": info.scheme, "hostname": info.hostname, "port": info.port}
246        logger.warning(f"Using proxy: {proxy}")
247    else:
248        proxy = {}
249
250    asyncio.run(main())