main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import json
  4from pathlib import Path
  5from typing import Literal
  6
  7from bs4 import BeautifulSoup
  8from glom import Coalesce, flatten, glom
  9from loguru import logger
 10from pyrogram.client import Client
 11from pyrogram.types import Message
 12
 13from ai.utils import trim_none
 14from bridge.social import send_to_social_media_bridge
 15from config import AI, API, DOWNLOAD_DIR, PROVIDER, PROXY, TELEGRAM_UA, TOKEN, TZ
 16from database.r2 import get_cf_r2
 17from messages.database import copy_messages_from_db, save_messages
 18from messages.progress import modify_progress
 19from messages.sender import send2tg
 20from messages.utils import blockquote, summay_media
 21from multimedia import is_valid_video_or_audio, validate_img
 22from networking import download_file, download_media, hx_req
 23from preview.utils import add_summary_url
 24from summarize.summarize import summarize
 25from utils import nowstr, readable_count, true, ts_to_dt
 26
 27
 28async def preview_instagram(
 29    client: Client,
 30    message: Message,
 31    url: str = "",
 32    db_key: str = "",
 33    *,
 34    post_type: Literal["p", "story", "reel"] = "p",
 35    post_id: str = "",
 36    username: str = "",
 37    instagram_provider: str = PROVIDER.INSTAGRAM,
 38    instagram_comments: bool = True,
 39    summary_instagram: bool = False,
 40    summary_instagram_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
 41    show_author: bool = True,
 42    show_pubdate: bool = True,
 43    show_statistics: bool = True,
 44    show_description: bool = True,
 45    **kwargs,
 46):
 47    """Preview instagram link in the message.
 48
 49    Args:
 50        client (Client): The Pyrogram client.
 51        message (Message): The trigger message object.
 52        url (str, optional): Tnstagram link.
 53        db_key (str, optional): The cache key.
 54        instagram_provider (str, optional): The instagram extractor: tikhub, ddinstagram, bridge
 55        instagram_comments (bool, optional): Add instagram comments. Defaults to True.
 56    """
 57    if kv := await get_cf_r2(db_key):
 58        logger.debug(f"Instagram preview cache hit for key={db_key}")
 59        if await copy_messages_from_db(client, message, key=db_key, kv=kv, **kwargs):
 60            return
 61        logger.warning("❌从缓存中转发失败, 尝试重新解析...")
 62
 63    if kwargs.get("show_progress") and "progress" not in kwargs:
 64        res = await send2tg(client, message, texts=f"🔗正在解析Instagram链接\n{url}", **kwargs)
 65        kwargs["progress"] = res[0]
 66
 67    succ = False
 68    resp = {}
 69    if "tikhub" in instagram_provider:  # try tikhub
 70        api_url = f"{API.TIKHUB_INSTAGRAM_STORY}{username}" if post_type == "story" else f"{API.TIKHUB_INSTAGRAM}{url}"
 71        logger.info(f"Preview Instagram TikHub for {api_url}")
 72        headers = {"authorization": f"Bearer {TOKEN.TIKHUB}", "accept": "application/json"}
 73        resp = await hx_req(api_url, headers=headers, check_keys=["data"], check_kv={"code": 200})
 74        if not resp.get("hx_error"):
 75            succ = True
 76    if not succ:
 77        logger.error("❌Instagram解析失败, 使用DDInstagram预览")
 78        await preview_ddinstagram(client, message, url=url, instagram_provider=instagram_provider, **kwargs)
 79        return
 80
 81    data = resp["data"]
 82    if post_type == "story":
 83        await preview_story(client, message, data, username, post_id, db_key=db_key, **kwargs)
 84        return
 85    # parse media
 86    media = []
 87    if data.get("video_url"):  # reel
 88        media.append({"video": download_file(data.get("video_url", ""), proxy=PROXY.INSTAGRAM, **kwargs)})
 89    elif media_nodes := glom(data, "edge_sidecar_to_children.edges", default=[]):
 90        for node in media_nodes:
 91            ftype = "photo" if not glom(node, "node.is_video", default=False) else "video"
 92            media_url = glom(node, "node.display_url", default="") if ftype == "photo" else glom(node, "node.video_url", default="")
 93            media.append({ftype: download_file(media_url, proxy=PROXY.INSTAGRAM, **kwargs)})
 94    elif data.get("display_url"):
 95        media.append({"photo": download_file(data.get("display_url"), proxy=PROXY.INSTAGRAM, **kwargs)})
 96
 97    statistics = ""
 98    if like := glom(data, "edge_media_preview_like.count", default=0):
 99        statistics += f"❤️{readable_count(like)}"
100    if comment := glom(data, "edge_media_to_parent_comment.count", default=0):
101        statistics += f"💬{readable_count(comment)}"
102
103    texts = ""
104    if true(show_author) and (fullname := glom(data, "owner.full_name", default="")):
105        texts += f"🏞**[{fullname}]({url})**\n"
106
107    if metadata_node := glom(data, "edge_media_to_caption.edges.0", default=None):
108        if true(show_pubdate) and (ts := glom(metadata_node, "node.created_at", default=0)):
109            create_time = f"{ts_to_dt(ts):%Y-%m-%d %H:%M:%S}"
110            texts += f"🕒{create_time}\n"
111        if true(show_statistics) and statistics:
112            texts += f"{statistics}\n"
113        if true(show_description) and (description := glom(metadata_node, "node.text", default="")):
114            texts += f"{description}\n"
115    # parse comments
116    comments = ""
117    if true(instagram_comments):
118        comment_nodes = glom(data, "edge_media_to_parent_comment.edges", default=[])
119        comment_nodes = sorted(comment_nodes, key=lambda x: glom(x, "node.created_at", default=0))
120        for idx, node in enumerate(comment_nodes):
121            if idx == 0:
122                comments += "💬**点击展开评论**:"
123            author = glom(node, "node.owner.username", default="user")
124            cmt = glom(node, "node.text", default="")
125            comments += f"\n💬**[{author}](https://www.instagram.com/{author})**: {cmt}"
126        comments = blockquote(comments)
127
128    await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
129    media = await download_media(media, **kwargs)
130    sent_messages = await send2tg(client, message, texts=texts.strip() + comments, media=media, keep_file=True, **kwargs)
131    await modify_progress(del_status=True, **kwargs)
132    # Summary instagram
133    # find the first message that has a caption
134    caption_msg = None
135    index = -1
136    for idx, m in enumerate(sent_messages):
137        if isinstance(m, Message) and (m.caption or m.text):
138            caption_msg = m
139            index = idx
140            break
141    if summary_instagram and caption_msg:
142        edited_msg = await summarize_instagram(caption_msg, data, media, summary_instagram_model, url)
143        sent_messages[index] = edited_msg
144    await save_messages(messages=sent_messages, key=db_key)
145    # Clean up
146    [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in data.get("media", [])]
147
148
149async def preview_story(client: Client, message: Message, data: dict, username: str, post_id: str, db_key: str, **kwargs):
150    items = flatten(glom(data, "reels_media.*.items", default=[]))
151    item = next((x for x in items if glom(x, "pk", default="") == post_id), None)
152    url = f"https://www.instagram.com/stories/{username}/{post_id}"
153    if not item:
154        await modify_progress(text=f"❌Instagram解析失败, 请访问{url}查看原始内容", force_update=True, **kwargs)
155        return
156
157    create_ts = glom(item, "taken_at", default=0)
158    expiring_ts = glom(item, "expiring_at", default=0)
159    texts = ""
160    fullname = glom(item, "story_music_stickers.0.display_artist", default=username)
161    media = []
162    if img_url := glom(item, "image_versions2.candidates.0.url", default=""):
163        media.append({"photo": download_file(img_url, proxy=PROXY.INSTAGRAM, **kwargs)})
164    if video_url := glom(item, "video_versions.0.url", default=""):
165        media.append({"video": download_file(video_url, proxy=PROXY.INSTAGRAM, **kwargs)})
166
167    texts += f"🏞**[{fullname}]({url})**"
168    if create_ts:
169        texts += f"\n🕒{ts_to_dt(create_ts):%Y-%m-%d %H:%M:%S}"
170    if expiring_ts:
171        texts += f"\n🔥{ts_to_dt(expiring_ts):%Y-%m-%d %H:%M:%S}"
172    await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
173    media = await download_media(media, **kwargs)
174    sent_messages = await send2tg(client, message, texts=texts.strip(), media=media, **kwargs)
175    await modify_progress(del_status=True, **kwargs)
176    await save_messages(messages=sent_messages, key=db_key)
177
178
179async def preview_ddinstagram(client: Client, message: Message, url: str, post_type: str, post_id: str, *, instagram_provider: str, **kwargs):
180    """Preview instagram link in the message via DDInstagram.
181
182    https://ddinstagram.com/
183
184    Args:
185        client (Client): The Pyrogram client.
186        message (Message): The trigger message object.
187        url (str, optional): Tnstagram link.
188        post_type (str): post type: "p" or "reel"
189        post_id (str): post id.
190        fallback (bool, optional): Fallback to other bots. Defaults to True.
191    """
192    kwargs |= {"target_mid": message.id}
193    if "ddinstagram" not in instagram_provider:
194        if "bridge" in instagram_provider:
195            await send_to_social_media_bridge(client, message, url, **kwargs)
196        return
197    api_url = f"{API.DDINSTAGRAM}/{post_type}/{post_id}"
198    logger.info(f"Instagram link preview for {api_url}")
199    headers = {"user-agent": TELEGRAM_UA}
200    resp = await hx_req(api_url, headers=headers, rformat="text")
201    if not resp.get("text"):
202        if "bridge" in instagram_provider:
203            await send_to_social_media_bridge(client, message, url, **kwargs)
204        return
205    soup = BeautifulSoup(resp["text"], "html.parser")
206    logger.trace(soup.prettify())
207
208    texts = ""
209    media = {}
210    if tag := soup.find("meta", attrs={"property": "twitter:title"}):
211        author = tag.get("content", "Unknown")
212        texts += f"🏞**[{author}]({url})\n"
213    if tag := soup.find("meta", attrs={"property": "og:description"}):
214        texts += str(tag.get("content", ""))
215    if (tag := soup.find("meta", attrs={"property": "twitter:image"})) and (img_url := tag.get("content")):
216        raw_url = f"{API.DDINSTAGRAM}{img_url}"
217        media["photo"] = await download_file(raw_url, path=f"{DOWNLOAD_DIR}/{post_id}.jpg", proxy=PROXY.INSTAGRAM, **kwargs)
218        if not bool(validate_img(media["photo"])):
219            await send_to_social_media_bridge(client, message, text=url, **kwargs)
220            return
221
222    if tag := soup.find("meta", attrs={"property": "og:video"}):
223        video_url = tag.get("content", "")
224        if video_url:
225            raw_url = f"{API.DDINSTAGRAM}{video_url}"
226            media["video"] = await download_file(raw_url, path=f"{DOWNLOAD_DIR}/{post_id}.mp4", proxy=PROXY.INSTAGRAM, **kwargs)
227            if not await is_valid_video_or_audio(media["video"]):
228                await send_to_social_media_bridge(client, message, text=url, **kwargs)
229                return
230
231    await send2tg(client, message, texts=texts, media=[media], **kwargs)
232    await modify_progress(del_status=True, **kwargs)
233
234
235async def summarize_instagram(message: Message, info: dict, media_list: list[dict], model: str, url: str) -> Message:
236    """Generate source for AI summary."""
237    data = {
238        "platform": "Instagram",
239        "author_name": glom(info, "owner.full_name", default=None),
240        "url": url,
241        "description": glom(info, "edge_media_to_caption.edges.0.node.text", default=None),
242    }
243    if ts := glom(data, "edge_media_to_caption.edges.0.node.created_at", default=0):
244        data["created_at"] = f"{ts_to_dt(ts):%Y-%m-%d %H:%M:%S}"
245    data = trim_none(data)
246    sources = []
247    min_text_length = 1000  # skip short tweets
248    min_video_duration = None
249    for media in media_list:
250        if media.get("photo"):
251            sources.append({"type": "image", "path": media["photo"]})
252        if media.get("video"):
253            min_text_length = None  # always summarize video
254            min_video_duration = 120
255            sources.append({"type": "video", "path": media["video"]})
256    sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
257    author_name = data.get("author_name", "Anonymous")
258    summary = await summarize(
259        sources=sources,
260        model=model,
261        title=f"🏞{author_name} - Instagram",
262        author=author_name,
263        url=url,
264        date=data.get("created_at") or nowstr(TZ),
265        min_text_length=min_text_length,
266        min_video_duration=min_video_duration,
267        max_video_duration=3600,  # skip long videos more than 1 hour
268    )
269    telegraph_url = summary.get("telegraph_url")
270    if not telegraph_url:
271        return message
272    return await add_summary_url(telegraph_url, message) or message