main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3import argparse
4import asyncio
5import json
6import logging
7import os
8import platform
9import random
10import sys
11from json import JSONDecodeError
12from urllib.parse import urlparse
13
14from apscheduler.schedulers.asyncio import AsyncIOScheduler
15from loguru import logger
16from pyrogram import filters
17from pyrogram.client import Client
18from pyrogram.sync import idle
19from pyrogram.types import LinkPreviewOptions, Message
20
21from ai.chat_summary import daily_summary
22from ai.utils import clean_anthropic_files, clean_gemini_files
23from bridge.chartimg import forward_chartimg_results
24from bridge.ocr import forward_ocr_results
25from bridge.social import forward_social_media_results
26from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
27from custom.ai_news import daily_ainews
28from custom.config import ACCOUNT_NAME
29from custom.cyf_greeting import cyf_greeting
30from custom.cyf_quote import fafa_quote
31from custom.cyf_twitter_rss import fafa_twitter_rss
32from custom.d1_daily_backup_msg import daily_backup_history_to_d1
33from custom.del_msg import del_unwanted_message
34from custom.dnkt_attendance import dnkt_attendance
35from custom.dnkt_email import dnkt_email
36from custom.email2md import eml2md
37from custom.events import lottery
38from custom.history_alias import tg_history_alias
39from custom.lilaoshi import handle_lilaoshi, preview_lilaoshi_history_message
40from custom.link_extract import link_extract
41from custom.msg_backup import message_backup
42from custom.readhub import readhub
43from custom.restart import restart_bot
44from custom.rss import update_rss
45from custom.summary_video import summary_videos
46from custom.sync_youtube import sync_youtube
47from custom.tempmail import tempmail
48from danmu.sync import sync_livechats
49from database.r2 import clean_r2_expired
50from emby.checkin import checkin_emby, daily_checkin_emby
51from emby.keepalive import keepalive_emby
52from emby.main import emby_entrypoint
53from emby.register import emby_register
54from history.sync import backup_chat_history, sync_chat_history
55from messages.main import process_message
56from messages.modify import message_modify
57from messages.parser import parse_msg
58from messages.utils import delete_message, startswith_prefix
59from permission import check_permission
60from podcast.main import summary_pods
61from price.entrypoint import match_symbol_category
62from quotly.quotly import quote_message
63from utils import cleanup_old_files, to_int
64
65
66async def main():
67 app = Client(
68 "bot",
69 session_string=session_string,
70 in_memory=True,
71 proxy=proxy,
72 device_model=DEVICE_NAME, # A friendly name can be viewed in "Active Sessions" in Telegram settings
73 app_version=f"{Client.APP_VERSION}, Python {platform.python_version()}",
74 skip_updates=False, # handle messages while client is offline
75 fetch_replies=-1, # fetch all replies
76 link_preview_options=LinkPreviewOptions(is_disabled=True),
77 max_concurrent_transmissions=2,
78 max_business_user_connection_cache_size=100, # reduce memory usage
79 max_message_cache_size=100, # reduce memory usage
80 )
81
82 @app.on_message(filters.group)
83 async def groups(client: Client, message: Message):
84 permission = await check_permission(client, message)
85 if permission["disabled"]:
86 return
87 await process_message(client, message, **permission)
88
89 @app.on_message(filters.channel)
90 async def channels(client: Client, message: Message):
91 permission = await check_permission(client, message)
92 if permission["disabled"]:
93 return
94 await process_message(client, message, **permission)
95
96 @app.on_message(filters.bot)
97 async def bots(client: Client, message: Message):
98 permission = await check_permission(client, message)
99 if permission["disabled"]:
100 return
101 parse_msg(message, verbose=True)
102 await forward_social_media_results(client, message)
103 await forward_ocr_results(client, message)
104 await forward_chartimg_results(client, message)
105 await process_message(client, message, **permission)
106
107 # filters.private = {user chats + bot chats}
108 # so the private handler should be placed after the bot handler
109 @app.on_message(filters.private)
110 async def private(client: Client, message: Message):
111 ctype = message.chat.type.name if message.chat and message.chat.type else ""
112 if ctype == "BOT":
113 await bots(client, message) # handle bot messages
114 return
115 permission = await check_permission(client, message)
116 if permission["disabled"]:
117 return
118 parse_msg(message, verbose=True)
119 await process_message(client, message, **permission)
120
121 @app.on_message(group=1)
122 @app.on_edited_message(group=1)
123 @app.on_deleted_messages(group=1)
124 async def on_changed(client: Client, message: Message | list[Message]):
125 await sync_chat_history(client, message)
126 if isinstance(message, Message):
127 await checkin_emby(client, message)
128
129 @app.on_message(group=2)
130 async def custom(client: Client, message: Message):
131 message = message_modify(message)
132 if ACCOUNT_NAME == "benny":
133 await quote_message(client, message)
134 await emby_entrypoint(message)
135 await fafa_quote(client, message)
136 if startswith_prefix(message.content, ["/save"]):
137 await delete_message(message)
138 await tempmail(client, message)
139
140 if ACCOUNT_NAME == "xiaohao":
141 await cyf_greeting(client, message)
142 await handle_lilaoshi(client, message) # 李老师不是你老师
143 # await chenyifa_social_rss(client, message) # CYF社媒追踪
144
145 await restart_bot(message)
146 await lottery(client, message)
147 await emby_register(client, message)
148 await tg_history_alias(client, message)
149 await summary_videos(client, message) # 自动总结视频
150 await message_backup(client, message)
151 await del_unwanted_message(client, message)
152 await eml2md(client, message)
153 await link_extract(client, message)
154
155 if ENABLE.CRONTAB:
156 scheduler = AsyncIOScheduler(timezone=TZ)
157 # scheduler.add_job(cron_secondly, "interval", args=[app], seconds=1)
158 scheduler.add_job(cron_minutely, "interval", args=[app], minutes=1)
159 scheduler.add_job(cron_hourly, "cron", args=[app], minute=0)
160 scheduler.add_job(cron_daily, "cron", args=[app], hour=6, minute=40, jitter=4800)
161 logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
162 scheduler.start()
163
164 await app.start()
165 await idle()
166 await app.stop()
167
168
169# async def cron_secondly(client: Client):
170# pass
171
172
173async def cron_minutely(client: Client):
174 cache.evict() # delete expired cache
175 cleanup_old_files()
176 await backup_chat_history(client)
177 await daily_backup_history_to_d1(client)
178 if ACCOUNT_NAME == "benny":
179 pass
180 elif ACCOUNT_NAME == "xiaohao":
181 await dnkt_attendance(client)
182 await fafa_twitter_rss(client)
183 elif ACCOUNT_NAME == "bot":
184 await dnkt_email(client)
185 await readhub()
186 await sync_youtube(client)
187 await update_rss(client)
188 await keepalive_emby()
189
190
191async def cron_hourly(client: Client):
192 await daily_summary(client)
193 await sync_livechats()
194 if ENABLE.CACHE_PRICE_SYMBOLS:
195 await match_symbol_category() # to cache all supported symbols
196 await summary_pods(client)
197 if ACCOUNT_NAME == "xiaohao":
198 await preview_lilaoshi_history_message(client) # 解析李老师遗漏的历史消息
199 await clean_r2_expired()
200 await clean_anthropic_files()
201 await clean_gemini_files()
202 if ACCOUNT_NAME == "bot":
203 await daily_ainews()
204
205
206async def cron_daily(client: Client):
207 # send daliy messages
208 await daily_checkin_emby(client)
209 try:
210 daliy = json.loads(DAILY_MESSAGES)
211 for chat_id, msg in daliy.items():
212 logger.info(f"Sending daily message to {chat_id}: {msg}")
213 await client.send_message(to_int(chat_id), msg)
214 await asyncio.sleep(random.randint(3, 8))
215 except (JSONDecodeError, TypeError):
216 logger.warning(f"Invalid DAILY_MESSAGES: {DAILY_MESSAGES}")
217 except Exception as e:
218 logger.warning(f"Error sending daily message: {e}")
219
220
221if __name__ == "__main__":
222 parser = argparse.ArgumentParser()
223 parser.add_argument("--log-level", type=str, help="Log level")
224 parser.add_argument("--session-str", type=str, help="Telegram SESSION STRING")
225 parser.add_argument("--proxy", type=str, help="Telegram proxy (e.g. socks5://127.0.0.1:7890)")
226 args = parser.parse_args()
227
228 logger.remove() # Remove default handler.
229 logger.add(
230 sys.stderr,
231 level=args.log_level.upper() if args.log_level else os.getenv("LOG_LEVEL", "TRACE").upper(),
232 colorize=True,
233 backtrace=True,
234 diagnose=True,
235 format="<green>{time:YYYY-MM-DD HH:mm:ss}</green>| <level>{level: <7}</level> |<cyan>{name: <12}</cyan>:<cyan>{function: ^20}</cyan>:<cyan>{line: >4}</cyan> - <level>{message}</level>",
236 )
237 # settings
238 session_string = args.session_str or TOKEN.SESSION_STRING
239 if not session_string:
240 logger.error("No session string, you should run python scripts/auth.py first")
241 os._exit(1)
242
243 if args.proxy or PROXY.TELEGRAM:
244 info = urlparse(args.proxy) if args.proxy else urlparse(PROXY.TELEGRAM)
245 proxy = {"scheme": info.scheme, "hostname": info.hostname, "port": info.port}
246 logger.warning(f"Using proxy: {proxy}")
247 else:
248 proxy = {}
249
250 asyncio.run(main())