main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import json
  4import re
  5from datetime import UTC, datetime
  6from pathlib import Path
  7from zoneinfo import ZoneInfo
  8
  9from glom import Coalesce, glom
 10from loguru import logger
 11from pyrogram.client import Client
 12from pyrogram.types import Message
 13
 14from ai.utils import trim_none
 15from config import AI, PROXY, TELEGRAM_UA, TOKEN, TZ
 16from database.kv import get_cf_kv, set_cf_kv
 17from messages.progress import modify_progress
 18from messages.sender import send2tg
 19from messages.utils import summay_media
 20from networking import download_file, download_media, hx_req
 21from preview.utils import add_summary_url
 22from summarize.summarize import summarize
 23from utils import nowstr, number_to_emoji
 24
 25
 26async def preview_v2ex(
 27    client: Client,
 28    message: Message,
 29    url: str = "",
 30    topic_id: str = "",
 31    *,
 32    summary_v2ex: bool = True,
 33    summary_v2ex_model: str = AI.AI_SUMMARY_MODEL_ALIAS,
 34    **kwargs,
 35):
 36    """Preview v2ex link in the message.
 37
 38    Args:
 39        client (Client): The Pyrogram client.
 40        message (Message): The trigger message object.
 41        url (str, optional): v2ex link
 42        db_key (str, optional): The cache key.
 43    """
 44    if kwargs.get("show_progress") and "progress" not in kwargs:
 45        res = await send2tg(client, message, texts=f"🔗正在解析V2ex链接\n{url}", **kwargs)
 46        kwargs["progress"] = res[0]
 47    logger.info(f"v2ex link preview for {url}")
 48    token = await refresh_v2ex_token()
 49    if not token:
 50        await modify_progress(text="❌V2EX Token已失效, 请手动创建", force_update=True, **kwargs)
 51        return
 52    headers = {"Authorization": f"Bearer {token}"}
 53    topic_api = f"https://www.v2ex.com/api/v2/topics/{topic_id}"
 54    resp = await hx_req(topic_api, proxy=PROXY.V2EX, headers=headers, check_kv={"success": True, "result.id": topic_id})
 55    if error := resp.get("error"):
 56        await modify_progress(text=f"❌v2ex链接解析失败{url}\n{error}", force_update=True, **kwargs)
 57        return
 58    author = glom(resp, "result.member.username", default="V2EX_User")
 59    author_url = f"https://www.v2ex.com/member/{author}"
 60    title = glom(resp, "result.title", default="Title")
 61    ts = glom(resp, "result.created", default=0)
 62    texts = f"💻**[{author}]({author_url})**\n"
 63    texts += f"🕒{datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ)).strftime('%Y-%m-%d %H:%M:%S')}\n"
 64    texts += f"📝**[{title}]({url})**\n"
 65    content, img_urls = extract_and_remove_images_regex(glom(resp, "result.content", default=""))
 66    texts += content + "\n"
 67    if supplements := glom(resp, "result.supplements", default=[]):
 68        for idx, supp in enumerate(supplements):
 69            texts += f"\n补充留言{number_to_emoji(idx + 1)}:\n{supp.get('content', '')}\n"
 70
 71    media = await download_imgs(img_urls)
 72    if media:
 73        await modify_progress(text=f"⏬正在下载:\n{summay_media(media)}", force_update=True, **kwargs)
 74    media = await download_media(media, **kwargs)
 75    sent_messages = await send2tg(client, message, texts=texts, media=media, keep_file=True, **kwargs)
 76    await modify_progress(del_status=True, **kwargs)
 77    # Summary v2ex
 78    # find the first message that has a caption
 79    caption_msg = None
 80    for m in sent_messages:
 81        if isinstance(m, Message) and (m.caption or m.text):
 82            caption_msg = m
 83            break
 84    if summary_v2ex and caption_msg:
 85        await summarize_v2ex(caption_msg, resp.get("result", {}), media, summary_v2ex_model, url)
 86    # Clean up
 87    [Path(glom(x, Coalesce("photo", "video", "audio"))).unlink(missing_ok=True) for x in media]
 88
 89
 90def extract_and_remove_images_regex(markdown_text: str) -> tuple[str, list[str]]:
 91    """Extract images from markdown text and remove them from the text.
 92
 93    Returns:
 94        tuple[str, list[str]]: The markdown text without images and the extracted image URLs.
 95    """
 96    image_pattern = r'!\[([^\]]*)\]\((.*?)\s*(".*?")?\)'
 97
 98    image_urls = re.findall(image_pattern, markdown_text)
 99    urls = [url[1].strip() for url in image_urls]  # only need urls
100
101    text_without_images = re.sub(image_pattern, "", markdown_text)
102
103    return text_without_images, urls
104
105
106async def download_imgs(img_urls: list[str]) -> list[dict]:
107    """Download images from img_urls."""
108    media = []
109    for img_url in img_urls:
110        # handle imgur.com
111        if img_url.startswith("https://i.imgur.com/"):
112            referer_url = f"https://imgur.com/{Path(img_url).stem}"
113            headers = {"Referer": referer_url, "User-Agent": TELEGRAM_UA}
114            media.append({"photo": download_file(img_url, proxy=PROXY.WARP, headers=headers)})
115        else:
116            media.append({"photo": download_file(img_url, proxy=PROXY.V2EX)})
117    return media
118
119
120async def refresh_v2ex_token() -> str:
121    """Refresh v2ex token.
122
123    V2EX API token expires after 180 days.
124    """
125
126    async def check_token(token: str) -> tuple[bool, int]:
127        resp = await hx_req(
128            "https://www.v2ex.com/api/v2/token",
129            proxy=PROXY.V2EX,
130            headers={"Authorization": f"Bearer {token}"},
131            check_kv={"success": True},
132            max_retry=0,
133        )
134        return bool(glom(resp, "result.token", default=None)), glom(resp, "result.expiration", default=0)
135
136    async def create_token(token: str) -> str:
137        resp = await hx_req(
138            "https://www.v2ex.com/api/v2/tokens",
139            method="POST",
140            proxy=PROXY.V2EX,
141            json_data={"scope": "everything", "expiration": 15552000},
142            headers={"Authorization": f"Bearer {token}"},
143            max_retry=0,
144            check_kv={"success": True},
145        )
146        return glom(resp, "result.token", default="")
147
148    if TOKEN.V2EX:
149        valid, ttl = await check_token(TOKEN.V2EX)
150        if valid:
151            if ttl < 86400 * 3:  # 3天之内
152                new_token = await create_token(TOKEN.V2EX)
153                logger.warning("V2EX Token即将失效, 正在重新创建...")
154                await set_cf_kv("v2ex_token", {"token": new_token})
155            return TOKEN.V2EX
156        logger.warning("V2EX Token已失效, 从KV获取...")
157    token = (await get_cf_kv("v2ex_token")).get("token", "")
158    valid, ttl = await check_token(token)
159    if valid:
160        if ttl < 86400 * 3:  # 3天之内
161            new_token = await create_token(token)
162            logger.warning("V2EX Token即将失效, 正在重新创建...")
163            await set_cf_kv("v2ex_token", {"token": new_token})
164        return token
165    return ""
166
167
168async def summarize_v2ex(message: Message, v2ex: dict, media_list: list[dict], model: str, url: str) -> Message:
169    """Generate source for AI summary."""
170
171    def date_str(ts: int) -> str | None:
172        if not ts:
173            return None
174        return f"{datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ)).strftime('%Y-%m-%d %H:%M:%S')}"
175
176    data = {
177        "platform": "V2EX",
178        "title": glom(v2ex, "title", default=""),
179        "author_name": glom(v2ex, "member.username", default="Anonymous"),
180        "url": url,
181        "content": glom(v2ex, Coalesce("content_rendered", "content"), default=None),
182    }
183
184    if ts := glom(v2ex, "created", default=0):
185        data["created_at"] = date_str(ts)
186    supplements = [
187        {
188            "created_at": date_str(supp.get("created", 0)),
189            "content": glom(supp, Coalesce("content_rendered", "content"), default=None),
190        }
191        for supp in glom(v2ex, "supplements", default=[])
192    ]
193    if supplements:
194        data["supplements"] = supplements
195    data = trim_none(data)
196    sources = []
197    min_text_length = 1000  # skip short tweets
198    min_video_duration = None
199    for media in media_list:
200        if media.get("photo"):
201            sources.append({"type": "image", "path": media["photo"]})
202        if media.get("video"):
203            min_text_length = None  # always summarize video
204            min_video_duration = 120
205            sources.append({"type": "video", "path": media["video"]})
206    sources.append({"type": "text", "text": json.dumps(data, ensure_ascii=False)})
207    summary = await summarize(
208        sources=sources,
209        model=model,
210        title=data["title"] or url,
211        author=data["author_name"],
212        url=url,
213        date=data.get("created_at") or nowstr(TZ),
214        min_text_length=min_text_length,
215        min_video_duration=min_video_duration,
216        max_video_duration=3600,  # skip long videos more than 1 hour
217    )
218    telegraph_url = summary.get("telegraph_url")
219    if not telegraph_url:
220        return message
221    return await add_summary_url(telegraph_url, message) or message