main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import json
  5import re
  6import shutil
  7import zipfile
  8from datetime import datetime
  9from io import BytesIO
 10from pathlib import Path
 11from zoneinfo import ZoneInfo
 12
 13import anyio
 14from bs4 import BeautifulSoup
 15from glom import Coalesce, glom
 16from pyrogram.client import Client
 17from pyrogram.types import InputMediaDocument, Message
 18
 19from config import API, DOWNLOAD_DIR, PROXY, TOKEN, TZ
 20from custom.config import ACCOUNT_NAME
 21from database.kv import set_cf_kv
 22from messages.sender import send2tg
 23from messages.utils import remove_img_tag, set_reaction, smart_split
 24from networking import download_file, hx_req, match_social_media_link
 25from preview.twitter import parse_article
 26from summarize.summarize import summarize
 27from utils import convert2md, match_urls, rand_string
 28
 29
 30async def link_extract(client: Client, message: Message):
 31    if ACCOUNT_NAME != "bot":
 32        return
 33    if message.chat.id != -1002653997796:
 34        return
 35    if message.message_thread_id != 1111:
 36        return
 37    article = ""
 38    matched = await match_social_media_link(message.content)
 39    if matched["platform"] == "x":
 40        await set_reaction(client, message, "🕊")
 41        await parse_twitter(client, message, matched["post_id"])
 42    elif matched["platform"] == "wechat":
 43        await set_reaction(client, message, "🎄")
 44        await parse_wechat(client, message, f"{matched['url']}")
 45    elif matched["platform"] == "github":
 46        await set_reaction(client, message, "🆒")
 47        await parse_skills(message, matched["gh_user"], matched["gh_repo"])
 48    elif match_urls(message.content):
 49        await set_reaction(client, message, "👌")
 50        article = await parse_via_jina(message, match_urls(message.content)[0])
 51    if article:
 52        summary = await summarize([{"type": "text", "text": article}], model="general", description=article, url=matched.get("url"))
 53        if not summary.get("texts"):
 54            return
 55        texts = summary["texts"]
 56        await send2tg(client, message, texts=texts)
 57
 58
 59async def parse_twitter(client: Client, message: Message, pid: str):
 60    api_url = f"{API.FXTWITTER}/2/status/{pid}?lang=zh-cn"
 61    resp = await hx_req(api_url, proxy=PROXY.TWITTER, check_kv={"status.id": pid})
 62    if glom(resp, "status.article", default=None):
 63        await parse_twitter_article(client, message, resp)
 64        return
 65    url = glom(resp, "status.url", default="")
 66    author = glom(resp, "status.author.name", default="Anonymous")
 67    body = glom(resp, Coalesce("status.translation.text", "status.text"), default="")
 68    texts = f"🕊[{author}]({url})\n{body}"
 69    await send2tg(client, message, texts=texts)
 70
 71
 72async def parse_twitter_article(client: Client, message: Message, resp: dict):
 73    url = glom(resp, "status.url", default="")
 74    author = glom(resp, "status.author.name", default="Anonymous")
 75    article = await parse_article(resp["status"]["article"], author, url)
 76    title = glom(resp, "status.article.title", default="Unknown")
 77    date = glom(resp, "status.article.created_at", default="1970-01-01T00:00:00.000Z")
 78    dt = datetime.fromisoformat(date).astimezone(ZoneInfo(TZ))
 79    date_str = dt.strftime("%Y-%m-%d %H:%M:%S")
 80    markdown = convert2md(html=article["html"])
 81    posts = f"---\ntitle: {title}\n\ndate: {date_str}\n\nauthor: {author}\n\nurl: {url}\n\n---\n\n{markdown}"
 82    caption = f"🕊[{author}]({url})\n🕒{date_str}\n📝**[{title}]({url})**"
 83    post_path = Path(DOWNLOAD_DIR) / f"{sanitize_filename(title[:64])}.txt"
 84    async with await anyio.open_file(post_path, "w") as f:
 85        await f.write(posts)
 86    # download images
 87    tasks = [download_file(url, proxy=PROXY.TWITTER) for url in article["image_urls"] + article["video_urls"]]
 88    images = await asyncio.gather(*tasks, return_exceptions=True)
 89    images = [img for img in images if isinstance(img, str)]
 90    zip_path = post_path.with_name(f"{rand_string()}.zip")
 91    create_zip([post_path, *images], zip_name=zip_path)
 92    with BytesIO(posts.encode("utf-8")) as f:
 93        await client.send_media_group(
 94            message.chat.id,
 95            message_thread_id=message.message_thread_id,
 96            media=[
 97                InputMediaDocument(f, file_name=f"{title[:64]}.txt"),
 98                InputMediaDocument(zip_path.as_posix(), caption=caption, file_name=f"{title[:64]}.zip"),
 99            ],
100        )
101    zip_path.unlink(missing_ok=True)
102    post_path.unlink(missing_ok=True)
103
104
105async def parse_wechat(client: Client, message: Message, url: str):
106    headers = {
107        "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.69(0x18004539) NetType/4G Language/zh_CN"
108    }
109    resp = await hx_req(url, headers=headers, mobile=True, proxy=PROXY.WECHAT, rformat="content")
110    html = resp["content"].decode("utf-8")
111    soup = BeautifulSoup(html, "html.parser")
112    title_tag = soup.find("meta", property="og:title")
113    title = "Unknown"
114    if title_tag and title_tag.get("content"):
115        title = str(title_tag["content"])
116
117    desc = ""
118    desc_tag = soup.find("meta", attrs={"name": "description"})
119    if desc_tag and desc_tag.get("content"):
120        desc = desc_tag["content"]
121
122    date = ""
123    if match_date := re.search(r"createTime = '(.*)'", html):
124        date = match_date.group(1)
125
126    author = ""
127    author_tag = soup.find("meta", attrs={"name": "author"})
128    if author_tag and author_tag.get("content"):
129        author = str(author_tag["content"])
130    markdown = convert2md(html=str(soup))
131    posts = f"---\ntitle: {title}\n\ndate: {date}\n\nauthor: {author}\n\nurl: {url}\n\ndescription:{desc}\n\n---\n\n{markdown}"
132    caption = f"🕊[{author}]({url})\n🕒{date}\n📝**[{title}]({url})**"
133
134    post_path = Path(DOWNLOAD_DIR) / f"{sanitize_filename(title[:64])}.txt"
135    async with await anyio.open_file(post_path, "w") as f:
136        await f.write(posts)
137
138    # download images
139    _, image_urls = remove_img_tag(markdown)
140    image_urls = [url for url in image_urls if url.startswith("http")]
141    tasks = []
142    for img in image_urls:
143        suffix = ".jpg" if "mmbiz_jpg" in img or "wx_fmt=jpeg" in img else ".png"
144        path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}{suffix}"
145        tasks.append(download_file(img, path=path, headers=headers, proxy=PROXY.WECHAT))
146    images = await asyncio.gather(*tasks, return_exceptions=True)
147    images = [img for img in images if isinstance(img, str)]
148    zip_path = post_path.with_name(f"{rand_string()}.zip")
149    create_zip([post_path, *images], zip_name=zip_path)
150    with BytesIO(posts.encode("utf-8")) as f:
151        await client.send_media_group(
152            message.chat.id,
153            message_thread_id=message.message_thread_id,
154            media=[
155                InputMediaDocument(f, file_name=f"{title[:64]}.txt"),
156                InputMediaDocument(zip_path.as_posix(), caption=caption, file_name=f"{title[:64]}.zip"),
157            ],
158        )
159    zip_path.unlink(missing_ok=True)
160    post_path.unlink(missing_ok=True)
161
162
163async def parse_skills(message: Message, user: str, repo: str) -> str:
164    # get default branch
165    resp = await hx_req(
166        f"https://api.github.com/repos/{user}/{repo}",
167        headers={"Authorization": f"Bearer {TOKEN.GITHUB}", "Accept": "application/vnd.github.v3+json"},
168        check_kv={"name": repo},
169    )
170    if not resp.get("default_branch"):
171        await message.reply("Failed to get default branch", quote=True)
172        return ""
173
174    # download zip
175    zip_url = f"https://github.com/{user}/{repo}/archive/refs/heads/{resp['default_branch']}.zip"
176    zip_root = Path(DOWNLOAD_DIR) / f"skills/{user}/{repo}"
177    shutil.rmtree(zip_root, ignore_errors=True)
178    zip_path = zip_root / f"{repo}.zip"
179    await download_file(zip_url, headers={"Authorization": f"Bearer {TOKEN.GITHUB}"}, path=zip_path, stream=True, proxy=PROXY.GITHUB)
180    if not zip_path.is_file():
181        await message.reply("Failed to download zip file", quote=True)
182        return ""
183
184    # extract zip
185    with zipfile.ZipFile(zip_path) as zipf:
186        for file in zipf.filelist:
187            zipf.extract(file, path=zip_root)
188    zip_path.unlink(missing_ok=True)
189    # iter all files include subdirectories
190    skills = {}
191    for file in zip_root.rglob("*"):
192        if not file.is_file():
193            continue
194        if file.suffix.lower() not in {".md", ".txt"}:
195            continue
196        if any(x.lower() in file.name.lower() for x in ["license", "readme", "copyright"]):
197            continue
198        skills[file.name] = file.read_text()
199    skills_str = json.dumps(skills, ensure_ascii=False, indent=2)
200    with BytesIO(skills_str.encode("utf-8")) as f:
201        await message.reply_document(f, file_name=f"{repo}.json", quote=True)
202    shutil.rmtree(zip_root, ignore_errors=True)
203    await set_cf_kv(f"SKILLS-{repo}", skills)
204    return ""
205
206
207def create_zip(files: list[str | Path], zip_name: str | Path):
208    with zipfile.ZipFile(zip_name, "w") as zipf:
209        for file in files:
210            if Path(file).exists():
211                zipf.write(str(file), arcname=Path(file).name)
212                Path(file).unlink(missing_ok=True)
213
214
215def sanitize_filename(filename: str) -> str:
216    illegal_chars = r'[\\/:*?"<>|]'
217    sanitized = re.sub(illegal_chars, "_", filename)
218    return sanitized.strip()
219
220
221async def parse_via_jina(message: Message, url: str) -> str:
222    headers = {
223        "Content-Type": "application/json",
224        "Accept": "application/json",
225        "X-Engine": "browser",
226        "X-Keep-Img-Data-Url": "true",
227        "X-Md-Bullet-List-Marker": "-",
228        "X-Md-Em-Delimiter": "*",
229        "X-Return-Format": "markdown",
230    }
231    resp = await hx_req(
232        "https://r.jina.ai",
233        method="POST",
234        headers=headers,
235        json_data={"url": url},
236        check_kv={"code": 200},
237        max_retry=3,
238    )
239
240    if "hx_error" in resp:
241        error = await smart_split(str(resp["hx_error"]))
242        await message.reply(error[0], quote=True)
243        return ""
244    texts = ""
245    if title := glom(resp, "data.title", default=""):
246        texts += f"Title: {title}\n\n"
247
248    texts += f"URL: {url}\n\n"
249    if content := glom(resp, "data.content", default=""):
250        texts += content
251
252    filename = glom(resp, "data.title", default="链接解析")
253    with BytesIO(texts.encode("utf-8")) as f:
254        await message.reply_document(f, caption=f"[{filename}]({url})", file_name=f"{filename}.md", quote=True)
255    return texts