Commit 739bcc2
Changed files (8)
src/preview/douyin.py
@@ -9,11 +9,12 @@ from pyrogram.client import Client
from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
-from config import API, DB, TOKEN, TZ, cache
+from config import API, DB, PROVIDER, TOKEN, TZ, cache
from database import get_db
from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
from networking import download_first_success_urls, download_media, hx_req
from others.emoji import emojify
+from utils import true
@cache.memoize(ttl=10)
@@ -23,10 +24,9 @@ async def preview_douyin(
url: str = "",
db_key: str = "",
platform: str = "douyin",
- douyin_extractor: str | None = None,
- douyin_comments_extractor: str | None = None,
+ douyin_provider: str = PROVIDER.DOUYIN,
+ douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS,
*,
- fetch_douyin_comments: bool = True,
fallback: bool = True,
**kwargs,
):
@@ -38,9 +38,8 @@ async def preview_douyin(
url (str, optional): The douyin or tiktok link.
db_key (str, optional): The cache key.
platform(str, optional): The platform name. Defaults to "douyin".
- douyin_extractor (str, optional): The douyin extractor: "free" or "tikhub". Defaults to "free".
- douyin_comments_extractor (str, optional): The douyin comments extractor: "free" or "tikhub". Defaults to "free".
- fetch_douyin_comments (bool, optional): Fetch douyin comments. Defaults to True.
+ douyin_provider (str, optional): The douyin extractor: "free" or "tikhub".
+ douyin_comments_provider (str, optional): The douyin comments extractor: "free" or "tikhub".
fallback (bool, optional): Fallback to other bots. Defaults to True.
"""
if kwargs.get("show_progress") and "progress" not in kwargs:
@@ -54,9 +53,9 @@ async def preview_douyin(
logger.info(f"{platform} link preview for {url}")
succ = False
- if douyin_extractor is None or douyin_extractor == "free": # try free first
+ if douyin_provider == "free": # try free first
api_url = f"{API.TIKHUB_FREE}/api/hybrid/video_data?url={url}"
- headers = {"accept": "application/json"} if douyin_extractor == "tikhub" else {}
+ headers = {"accept": "application/json"} if douyin_provider == "tikhub" else {}
try:
resp = await hx_req(api_url, headers=headers, check_has_kv=["data"], check_kv={"code": 200})
data = resp.json()["data"]
@@ -97,7 +96,7 @@ async def preview_douyin(
texts += f"\n{decs}"
comments = []
- if fetch_douyin_comments and (comments_list := await get_comments(aweme_id, platform, douyin_comments_extractor)):
+ if comments_list := await get_comments(aweme_id, platform, douyin_comments_provider):
comments.append("\n**> 💬**点此展开评论区**:")
for idx, cmt in enumerate(comments_list):
cmt_text = cmt["text"].replace("\n", "\n> ")
@@ -111,17 +110,19 @@ async def preview_douyin(
await save_messages(messages=sent_messages, key=db_key)
-async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comments_extractor: str | None = None) -> list[dict]:
+async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comments_provider: str = PROVIDER.DOUYIN_COMMENTS) -> list[dict]:
"""Fetch douyin or tiktok comments.
Args:
aweme_id (str, optional): post id.
platform (str, optional): douyin or tiktok. Defaults to "douyin".
- douyin_comments_extractor (str | None, optional): The douyin comments extractor: "free" or "tikhub". Defaults to "free".
+ douyin_comments_provider (str, optional): The douyin comments extractor: "free" or "tikhub".
Returns:
list[dict]: comments list.
"""
+ if not true(douyin_comments_provider):
+ return []
comments = []
api_urls = {
"douyin_tikhub": f"{API.TIKHUB}/api/v1/douyin/web/fetch_video_comments?aweme_id={aweme_id}",
@@ -130,7 +131,7 @@ async def get_comments(aweme_id: str = "", platform: str = "douyin", douyin_comm
"tiktok_free": f"{API.TIKHUB_FREE}/api/tiktok/web/fetch_post_comment?aweme_id={aweme_id}",
}
succ = False
- if douyin_comments_extractor is None or douyin_comments_extractor == "free": # try free first
+ if douyin_comments_provider == "free": # try free first
api_url = api_urls.get(f"{platform}_free")
headers = {"accept": "application/json"}
try:
src/preview/instagram.py
@@ -10,15 +10,25 @@ from pyrogram.client import Client
from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
-from config import API, DB, DOWNLOAD_DIR, TOKEN, TZ, UA, cache
+from config import API, DB, DOWNLOAD_DIR, PROVIDER, TOKEN, TZ, UA, cache
from database import get_db
from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
from multimedia import is_valid_video, validate_img
from networking import download_file, download_media, hx_req
+from utils import true
@cache.memoize(ttl=10)
-async def preview_instagram(client: Client, message: Message, url: str = "", db_key: str = "", *, fallback: bool = True, **kwargs):
+async def preview_instagram(
+ client: Client,
+ message: Message,
+ url: str = "",
+ db_key: str = "",
+ *,
+ instagram_comments_provider: str = PROVIDER.INSTAGRAM_COMMENTS,
+ fallback: bool = True,
+ **kwargs,
+):
"""Preview instagram link in the message.
Args:
@@ -26,6 +36,7 @@ async def preview_instagram(client: Client, message: Message, url: str = "", db_
message (Message): The trigger message object.
url (str, optional): Tnstagram link.
db_key (str, optional): The cache key.
+ instagram_comments_provider (str, optional): The instagram comments extractor: "tikhub" or "false".
fallback (bool, optional): Fallback to other bots. Defaults to True.
"""
if kwargs.get("show_progress") and "progress" not in kwargs:
@@ -74,19 +85,20 @@ async def preview_instagram(client: Client, message: Message, url: str = "", db_
texts += f"{description}\n"
# parse comments
- comment_nodes = data.get("edge_media_to_parent_comment", {}).get("edges", [])
- comment_nodes = sorted(comment_nodes, key=lambda x: x.get("node", {}).get("created_at", 0))
- comment_list = [{"author": node.get("node", {}).get("owner", {}).get("username", "user"), "text": node.get("node", {}).get("text", "")} for node in comment_nodes]
- comment_list = [x for x in comment_list if x["text"]]
comments: list[str] = []
- for idx, cmt in enumerate(comment_list):
- cmt_text = cmt["text"].replace("\n", "\n> ")
- if idx == 0:
- comments.append("\n**> 💬**点此展开评论区**:")
- if idx == len(comment_list) - 1: # last cmt
- comments.append(f"\n> 💬**{cmt['author']}**: {cmt_text}||")
- else:
- comments.append(f"\n> 💬**{cmt['author']}**: {cmt_text}")
+ if true(instagram_comments_provider):
+ comment_nodes = data.get("edge_media_to_parent_comment", {}).get("edges", [])
+ comment_nodes = sorted(comment_nodes, key=lambda x: x.get("node", {}).get("created_at", 0))
+ comment_list = [{"author": node.get("node", {}).get("owner", {}).get("username", "user"), "text": node.get("node", {}).get("text", "")} for node in comment_nodes]
+ comment_list = [x for x in comment_list if x["text"]]
+ for idx, cmt in enumerate(comment_list):
+ cmt_text = cmt["text"].replace("\n", "\n> ")
+ if idx == 0:
+ comments.append("\n**> 💬**点此展开评论区**:")
+ if idx == len(comment_list) - 1: # last cmt
+ comments.append(f"\n> 💬**{cmt['author']}**: {cmt_text}||")
+ else:
+ comments.append(f"\n> 💬**{cmt['author']}**: {cmt_text}")
await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
media = await download_media(media, **kwargs)
src/preview/twitter.py
@@ -10,7 +10,7 @@ from pyrogram.client import Client
from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
-from config import API, DB, TOKEN, TZ, UA, cache
+from config import API, DB, PROVIDER, TOKEN, TZ, UA, cache
from database import get_db
from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
from networking import download_file, download_media, flatten_rediercts, hx_req
@@ -24,7 +24,8 @@ async def preview_twitter(
url: str = "",
db_key: str = "",
platform: str = "",
- twitter_extractor: str | None = None,
+ twitter_provider: str = PROVIDER.TWITTER,
+ twitter_comments_provider: str = PROVIDER.TWITTER_COMMENTS,
*,
fallback: bool = True,
**kwargs,
@@ -37,7 +38,8 @@ async def preview_twitter(
url (str, optional): The twitter link.
db_key (str, optional): The cache key.
platform (str): The domain of the link: twitter, x, fxtwitter, fixupx
- twitter_extractor (str): The extractor to use: fxtwitter or tikhub. Defaults to "tikhub".
+ twitter_provider (str): The extractor to use: fxtwitter or tikhub. Defaults to "tikhub".
+ twitter_comments_provider (str, optional): The twitter comments extractor: "tikhub" or "false".
fallback (bool, optional): Fallback to other bots. Defaults to True.
If skip_fxtwitter is set to True, and the domain is fxtwitter or fixupx, this function is skipped.
@@ -55,7 +57,7 @@ async def preview_twitter(
return
await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
succ = False
- if twitter_extractor is None or twitter_extractor == "tikhub": # try tikhub first
+ if twitter_provider == "tikhub": # try tikhub first
try:
this_info = await get_tweet_info_via_tikhub(url=url, **kwargs)
if not this_info:
@@ -140,7 +142,7 @@ async def preview_twitter(
msg += f"\n🏞{part_strs['first']}属于主推"
if texts := master_info.get("texts"):
msg += f"\n{texts}"
- if comments := master_info.get("comments"):
+ if true(twitter_comments_provider) and (comments := master_info.get("comments")):
msg += "\n**> 💬**点此展开评论区**:"
for idx, cmt in enumerate(comments):
if str(cmt["post_id"]) == str(this_info["post_id"]):
@@ -173,7 +175,7 @@ async def preview_twitter(
if texts := this_info.get("texts"):
msg += f"\n{texts}"
- if comments := this_info.get("comments"):
+ if true(twitter_comments_provider) and (comments := this_info.get("comments")):
msg += "\n**> 💬**点此展开评论区**:"
for idx, cmt in enumerate(comments):
cmt_texts = cmt["text"].strip().removeprefix(f"@{master_handle}").strip().replace("\n", "\n> ") # 有时回推的comment前会附带被回推的handle, 这里去掉
src/preview/weibo.py
@@ -13,17 +13,27 @@ from pyrogram.client import Client
from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
-from config import API, DB, DOWNLOAD_DIR, TOKEN, TZ, cache
+from config import API, DB, DOWNLOAD_DIR, PROVIDER, TOKEN, TZ, cache
from cookies import get_weibo_cookies
from database import get_db
from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
from networking import download_file, download_first_success_urls, download_media, hx_req
from others.emoji import emojify
-from utils import https_url, rand_string, soup_to_text, split_parts, ts_to_dt
+from utils import https_url, rand_string, soup_to_text, split_parts, true, ts_to_dt
@cache.memoize(ttl=10)
-async def preview_weibo(client: Client, message: Message, url: str, db_key: str = "", post_id: str = "", *, fetch_weibo_comments: bool = True, fallback: bool = True, **kwargs):
+async def preview_weibo(
+ client: Client,
+ message: Message,
+ url: str,
+ db_key: str = "",
+ post_id: str = "",
+ *,
+ weibo_comments_provider: str = PROVIDER.WEIBO_COMMENTS,
+ fallback: bool = True,
+ **kwargs,
+):
"""Preview weibo link in the message.
Args:
@@ -32,11 +42,11 @@ async def preview_weibo(client: Client, message: Message, url: str, db_key: str
url (str, optional): Weibo link.
db_key (str, optional): The cache key.
post_id (str, optional): Weibo post ID
- fetch_weibo_comments (bool, optional): Fetch weibo comments. Defaults to True.
+ weibo_comments_provider (str, optional): The weibo comments extractor: "free" or "false".
fallback (bool, optional): Fallback to other bots. Defaults to True.
"""
if post_id.startswith("weibovideo"): # disable comments for weibo video
- fetch_weibo_comments = False
+ weibo_comments_provider = "0"
if kwargs.get("show_progress") and "progress" not in kwargs:
res = await send2tg(client, message, texts=f"🔗正在解析微博链接\n{url}", **kwargs)
kwargs["progress"] = res[0]
@@ -104,7 +114,7 @@ async def preview_weibo(client: Client, message: Message, url: str, db_key: str
msg += f"\n{texts}"
media.extend(quote_info["media"])
- comments = await parse_weibo_comments(post_id) if fetch_weibo_comments else []
+ comments = await parse_weibo_comments(post_id) if true(weibo_comments_provider) else []
sent_messages = await send2tg(client, message, texts=emojify(msg.strip()), media=media, comments=comments, **kwargs)
await modify_progress(del_status=True, **kwargs)
await save_messages(messages=sent_messages, key=db_key)
src/preview/ytdlp.py
@@ -15,7 +15,7 @@ from pyrogram.types import Message, ReplyParameters
from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
-from config import API, CAPTION_LENGTH, DB, DOWNLOAD_DIR, MAX_FILE_BYTES, PROXY, TID, TOKEN, cache
+from config import API, CAPTION_LENGTH, DB, DOWNLOAD_DIR, MAX_FILE_BYTES, PROVIDER, PROXY, TID, TOKEN, cache
from database import get_db
from message_utils import copy_messages_from_db, modify_progress, preprocess_media, save_messages, send2tg, telegram_uploading
from multimedia import generate_cover
@@ -38,8 +38,8 @@ async def preview_ytdlp(
ytdlp_audio_only: bool = False,
ytdlp_send_video: bool = True,
ytdlp_send_audio: bool = False,
- bilibili_comments: bool = True,
- youtube_comments: bool = True,
+ bilibili_comments_provider: str = PROVIDER.BILIBILI_COMMENTS,
+ youtube_comments_provider: str = PROVIDER.YOUTUBE_COMMENTS,
proxy: str | None = None,
**kwargs,
):
@@ -53,8 +53,8 @@ async def preview_ytdlp(
ytdlp_audio_only (bool, optional): Download audio only. Defaults to True.
ytdlp_send_video (bool, optional): Send video. Defaults to True.
ytdlp_send_audio (bool, optional): Send audio. Defaults to False.
- bilibili_comments (bool, optional): Get bilibili comments. Defaults to True.
- youtube_comments (bool, optional): Get youtube comments. Defaults to True.
+ bilibili_comments_provider (str, optional): The bilibili comments extractor: "free", "tikhub" or "false"
+ youtube_comments_provider (str, optional): The youtube comments extractor: "free" or "false".
proxy (str, optional): Proxy to use. Defaults to None.
"""
logger.trace(f"url: {url} kwargs: {kwargs}")
@@ -180,10 +180,10 @@ async def preview_ytdlp(
texts += f"\n{soup_to_text(soup)}"
# comments
comments = []
- if bilibili_comments and platform == "bilibili":
- comments = await get_bilibili_comments(kwargs.get("bvid"))
- if youtube_comments and platform == "youtube":
- comments = await get_youtube_comments(kwargs.get("vid"))
+ if platform == "bilibili":
+ comments = await get_bilibili_comments(kwargs.get("bvid"), bilibili_comments_provider)
+ if platform == "youtube":
+ comments = await get_youtube_comments(kwargs.get("vid"), youtube_comments_provider)
for comment in comments:
if len(f"{texts}{comment}") < CAPTION_LENGTH:
@@ -401,17 +401,32 @@ async def download_video_async(url: str, ydl_opts: dict) -> tuple[str, dict]:
@cache.memoize(ttl=60)
-async def get_bilibili_comments(bvid: str | None) -> list[str]:
- if not bvid:
+async def get_bilibili_comments(bvid: str | None, provider: str = PROVIDER.BILIBILI_COMMENTS) -> list[str]:
+ if not bvid or not true(provider):
return []
+
+ succ = False
+ if provider == "free":
+ try:
+ api = f"{API.TIKHUB_FREE}/api/bilibili/web/fetch_video_comments?bv_id={bvid}"
+ headers = {"accept": "application/json"}
+ resp = await hx_req(api, headers={"accept": "application/json"}, check_has_kv=["data.data"], check_kv={"code": 200})
+ data = resp.json()["data"]["data"].get("replies", [])
+ succ = True
+ except Exception:
+ logger.warning(f"Bilibili comments API [free] failed: {resp}")
+ if not succ: # try tikhub
+ api_url = f"{API.TIKHUB}/api/v1/bilibili/web/fetch_video_comments?bv_id={bvid}"
+ headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
+ try:
+ resp = await hx_req(api_url, headers=headers, check_has_kv=["data.data"], check_kv={"code": 200})
+ data = resp.json()["data"]["data"].get("replies", [])
+ except Exception:
+ logger.warning(f"Bilibili comments API [tikhub] failed: {resp}")
+ return []
+
comments = []
try:
- api = f"{API.TIKHUB_FREE}/api/bilibili/web/fetch_video_comments?bv_id={bvid}"
- resp = await hx_req(api, check_has_kv=["data.data"], check_kv={"code": 200})
- if resp.status_code != 200:
- logger.warning(f"Bilibili Comments API failed: {resp}")
- return []
- data = resp.json()["data"]["data"].get("replies", [])
for idx, x in enumerate(data):
name = x.get("member", {}).get("uname", "匿名")
if cmt := x.get("content", {}).get("message"):
@@ -419,7 +434,8 @@ async def get_bilibili_comments(bvid: str | None) -> list[str]:
if idx == 0:
comments.append("\n**> 💬**点此展开评论区**:")
comments.append(f"\n> 💬**{name}**: {emojify(cmt)}")
- if replies := x.get("replies"):
+ # replies of comments, free api only got 3 comments, so we add replies here
+ if provider == "free" and (replies := x.get("replies")):
for r in replies:
name = r.get("member", {}).get("uname", "匿名")
if cmt := r.get("content", {}).get("message"):
@@ -434,8 +450,8 @@ async def get_bilibili_comments(bvid: str | None) -> list[str]:
@cache.memoize(ttl=60)
-async def get_youtube_comments(vid: str | None) -> list[str]:
- if not vid:
+async def get_youtube_comments(vid: str | None, provider: str = PROVIDER.YOUTUBE_COMMENTS) -> list[str]:
+ if not vid or not true(provider):
return []
api = "https://www.googleapis.com/youtube/v3/commentThreads"
params = {"key": TOKEN.YOUTUBE_API_KEY, "maxResults": 100, "textFormat": "plainText", "part": "snippet", "videoId": vid}
src/config.py
@@ -72,6 +72,17 @@ class API:
TIKHUB_WEIBO_VIDEO = os.getenv("TIKHUB_WEIBO_VIDEO_API", "https://api.tikhub.io/api/v1/weibo/web/fetch_short_video_data?share_text=")
+class PROVIDER: # default API provider
+ DOUYIN = os.getenv("DOUYIN_PROVIDER", "free").lower() # free or tikhub
+ DOUYIN_COMMENTS = os.getenv("DOUYIN_COMMENTS_PROVIDER", "free").lower() # free or tikhub or a false value (0, false, none, null, etc.)
+ TWITTER = os.getenv("TWITTER_PROVIDER", "tikhub").lower() # tikhub or fxtwitter
+ TWITTER_COMMENTS = os.getenv("TWITTER_COMMENTS_PROVIDER", "tikhub").lower() # tikhub or a false value (0, false, none, null, etc.)
+ INSTAGRAM_COMMENTS = os.getenv("INSTAGRAM_COMMENTS_PROVIDER", "tikhub").lower() # tikhub or a false value (0, false, none, null, etc.)
+ WEIBO_COMMENTS = os.getenv("WEIBO_COMMENTS_PROVIDER", "free").lower() # free or a false value (0, false, none, null, etc.)
+ BILIBILI_COMMENTS = os.getenv("BILIBILI_COMMENTS_PROVIDER", "free").lower() # free or tikhub or a false value (0, false, none, null, etc.)
+ YOUTUBE_COMMENTS = os.getenv("YOUTUBE_COMMENTS_PROVIDER", "free").lower() # free or a false value (0, false, none, null, etc.)
+
+
class TOKEN:
SESSION_STRING = os.getenv("SESSION_STRING", "")
TIKHUB = os.getenv("TIKHUB_TOKEN", "")
src/handler.py
@@ -194,7 +194,7 @@ def params_from_msg_text(texts: str | None = None) -> dict:
2. #with_xx: kwargs["xx"] = True
3. #set_xx=var: kwargs["xx"] = var
- Example text: #no_fetch_douyin_comments #set_douyin_extractor=tikhub
+ Example text: #no_ytdlp_send_video #set_douyin_provider=tikhub
Args:
texts (str | None, optional): The message text.
@@ -263,4 +263,4 @@ def get_social_media_help(cmd_prefix: list[str] | None = None, ignore_prefix: li
if __name__ == "__main__":
- params_from_msg_text("#with_1 #WITH_x #NO_1 #no_2 #set_yy=3, #no_fetch_douyin_comments #set_douyin_extractor=tikhub #set_reply_msg_id=None")
+ params_from_msg_text("#with_1 #WITH_x #NO_1 #no_2 #set_yy=3, #no_fetch_douyin_comments #set_douyin_provider=tikhub #set_reply_msg_id=None")
src/utils.py
@@ -6,6 +6,7 @@ import random
import string
from datetime import UTC, datetime
from pathlib import Path
+from typing import Any
from zoneinfo import ZoneInfo
from bs4 import PageElement
@@ -82,8 +83,12 @@ def rand_number(length: int = 8) -> int:
return int("".join(random.choices(string.digits, k=length)))
-def true(value: str | int | bool | None) -> bool:
- return str(value).lower() in ["1", "y", "yes", "t", "true", "on"]
+def true(value: Any) -> bool:
+ if not value:
+ return False
+ if isinstance(value, str):
+ return str(value).lower() not in {"0", "n", "na", "n/a", "no", "not", "f", "false", "off", "none", "null", "disable", "disabled"}
+ return True
def remove_none_values(d: dict | list) -> dict: