main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4from datetime import datetime
5from zoneinfo import ZoneInfo
6
7from bs4 import BeautifulSoup
8from glom import glom
9from loguru import logger
10from pyrogram.client import Client
11from pyrogram.types import Message
12
13from bridge.social import send_to_social_media_bridge
14from config import API, DB, DOWNLOAD_DIR, PROVIDER, PROXY, TELEGRAM_UA, TOKEN, TZ
15from database.database import get_db
16from messages.database import copy_messages_from_db, save_messages
17from messages.progress import modify_progress
18from messages.sender import send2tg
19from messages.utils import blockquote, summay_media
20from multimedia import is_valid_video_or_audio, validate_img
21from networking import download_file, download_media, hx_req
22from utils import readable_count, true
23
24
25async def preview_instagram(
26 client: Client,
27 message: Message,
28 url: str = "",
29 db_key: str = "",
30 *,
31 instagram_provider: str = PROVIDER.INSTAGRAM,
32 instagram_comments: bool = True,
33 show_author: bool = True,
34 show_pubdate: bool = True,
35 show_statistics: bool = True,
36 show_description: bool = True,
37 **kwargs,
38):
39 """Preview instagram link in the message.
40
41 Args:
42 client (Client): The Pyrogram client.
43 message (Message): The trigger message object.
44 url (str, optional): Tnstagram link.
45 db_key (str, optional): The cache key.
46 instagram_provider (str, optional): The instagram extractor: tikhub, ddinstagram, bridge
47 instagram_comments (bool, optional): Add instagram comments. Defaults to True.
48 """
49 if kwargs.get("show_progress") and "progress" not in kwargs:
50 res = await send2tg(client, message, texts=f"🔗正在解析Instagram链接\n{url}", **kwargs)
51 kwargs["progress"] = res[0]
52
53 if kv := await get_db(db_key):
54 logger.debug(f"Instagram preview {DB.ENGINE} cache hit for key={db_key}")
55 if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
56 return
57 await modify_progress(text=f"❌从{DB.ENGINE}缓存中转发失败, 尝试重新解析...", **kwargs)
58 succ = False
59 resp = {}
60 if "tikhub" in instagram_provider: # try tikhub
61 api_url = API.TIKHUB_INSTAGRAM + url
62 logger.info(f"Preview Instagram TikHub for {api_url}")
63 headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
64 resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200})
65 if not resp.get("hx_error"):
66 succ = True
67 if not succ:
68 logger.error("❌Instagram解析失败, 使用DDInstagram预览")
69 await preview_ddinstagram(client, message, url=url, instagram_provider=instagram_provider, **kwargs)
70 return
71
72 data = resp["data"]
73 # parse media
74 media = []
75 if data.get("video_url"): # reel
76 media.append({"video": download_file(data.get("video_url", ""), proxy=PROXY.INSTAGRAM, **kwargs)})
77 elif media_nodes := glom(data, "edge_sidecar_to_children.edges", default=[]):
78 for node in media_nodes:
79 ftype = "photo" if not glom(node, "node.is_video", default=False) else "video"
80 media_url = glom(node, "node.display_url", default="") if ftype == "photo" else glom(node, "node.video_url", default="")
81 media.append({ftype: download_file(media_url, proxy=PROXY.INSTAGRAM, **kwargs)})
82 elif data.get("display_url"):
83 media.append({"photo": download_file(data.get("display_url"), proxy=PROXY.INSTAGRAM, **kwargs)})
84
85 statistics = ""
86 if like := glom(data, "edge_media_preview_like.count", default=0):
87 statistics += f"❤️{readable_count(like)}"
88 if comment := glom(data, "edge_media_to_parent_comment.count", default=0):
89 statistics += f"💬{readable_count(comment)}"
90
91 texts = ""
92 if true(show_author) and (fullname := glom(data, "owner.full_name", default="")):
93 texts += f"🏞**[{fullname}]({url})**\n"
94
95 if metadata_node := glom(data, "edge_media_to_caption.edges.0", default=None):
96 if true(show_pubdate) and (ts := glom(metadata_node, "node.created_at", default=0)):
97 dt = datetime.fromtimestamp(float(ts)).astimezone(ZoneInfo(TZ))
98 create_time = f"{dt:%Y-%m-%d %H:%M:%S}"
99 texts += f"🕒{create_time}\n"
100 if true(show_statistics) and statistics:
101 texts += f"{statistics}\n"
102 if true(show_description) and (description := glom(metadata_node, "node.text", default="")):
103 texts += f"{description}\n"
104 # parse comments
105 comments = ""
106 if true(instagram_comments):
107 comment_nodes = glom(data, "edge_media_to_parent_comment.edges", default=[])
108 comment_nodes = sorted(comment_nodes, key=lambda x: glom(x, "node.created_at", default=0))
109 for idx, node in enumerate(comment_nodes):
110 if idx == 0:
111 comments += f"\n{blockquote('💬**点此展开评论区**:')}"
112 author = glom(node, "node.owner.username", default="user")
113 cmt = glom(node, "node.text", default="")
114 comment = f"💬**[{author}](https://www.instagram.com/{author})**: {cmt}"
115 comments += f"\n{blockquote(comment)}"
116
117 await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
118 media = await download_media(media, **kwargs)
119 sent_messages = await send2tg(client, message, texts=texts.strip() + comments, media=media, **kwargs)
120 await modify_progress(del_status=True, **kwargs)
121 await save_messages(messages=sent_messages, key=db_key)
122
123
124async def preview_ddinstagram(client: Client, message: Message, url: str, post_type: str, post_id: str, *, instagram_provider: str, **kwargs):
125 """Preview instagram link in the message via DDInstagram.
126
127 https://ddinstagram.com/
128
129 Args:
130 client (Client): The Pyrogram client.
131 message (Message): The trigger message object.
132 url (str, optional): Tnstagram link.
133 post_type (str): post type: "p" or "reel"
134 post_id (str): post id.
135 fallback (bool, optional): Fallback to other bots. Defaults to True.
136 """
137 kwargs |= {"target_mid": message.id}
138 if "ddinstagram" not in instagram_provider:
139 if "bridge" in instagram_provider:
140 await send_to_social_media_bridge(client, message, url, **kwargs)
141 return
142 api_url = f"{API.DDINSTAGRAM}/{post_type}/{post_id}"
143 logger.info(f"Instagram link preview for {api_url}")
144 headers = {"user-agent": TELEGRAM_UA}
145 resp = await hx_req(api_url, headers=headers, rformat="text")
146 if not resp.get("text"):
147 if "bridge" in instagram_provider:
148 await send_to_social_media_bridge(client, message, url, **kwargs)
149 return
150 soup = BeautifulSoup(resp["text"], "html.parser")
151 logger.trace(soup.prettify())
152
153 texts = ""
154 media = {}
155 if tag := soup.find("meta", attrs={"property": "twitter:title"}):
156 author = tag.get("content", "Unknown")
157 texts += f"🏞**[{author}]({url})\n"
158 if tag := soup.find("meta", attrs={"property": "og:description"}):
159 texts += str(tag.get("content", ""))
160 if (tag := soup.find("meta", attrs={"property": "twitter:image"})) and (img_url := tag.get("content")):
161 raw_url = f"{API.DDINSTAGRAM}{img_url}"
162 media["photo"] = await download_file(raw_url, path=f"{DOWNLOAD_DIR}/{post_id}.jpg", proxy=PROXY.INSTAGRAM, **kwargs)
163 if not bool(validate_img(media["photo"])):
164 await send_to_social_media_bridge(client, message, text=url, **kwargs)
165 return
166
167 if tag := soup.find("meta", attrs={"property": "og:video"}):
168 video_url = tag.get("content", "")
169 if video_url:
170 raw_url = f"{API.DDINSTAGRAM}{video_url}"
171 media["video"] = await download_file(raw_url, path=f"{DOWNLOAD_DIR}/{post_id}.mp4", proxy=PROXY.INSTAGRAM, **kwargs)
172 if not await is_valid_video_or_audio(media["video"]):
173 await send_to_social_media_bridge(client, message, text=url, **kwargs)
174 return
175
176 await send2tg(client, message, texts=texts, media=[media], **kwargs)
177 await modify_progress(del_status=True, **kwargs)