main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import json
  5import re
  6import shutil
  7import zipfile
  8from datetime import datetime
  9from io import BytesIO
 10from pathlib import Path
 11from zoneinfo import ZoneInfo
 12
 13import anyio
 14from bs4 import BeautifulSoup
 15from glom import glom
 16from pyrogram.client import Client
 17from pyrogram.types import InputMediaDocument, Message
 18
 19from config import API, DOWNLOAD_DIR, PROXY, TOKEN, TZ
 20from custom.config import ACCOUNT_NAME
 21from database.kv import set_cf_kv
 22from messages.utils import remove_img_tag
 23from networking import download_file, hx_req, match_social_media_link
 24from preview.twitter import parse_article
 25from publish import publish_telegraph
 26from utils import convert_md, rand_string
 27
 28
 29async def link_extract(client: Client, message: Message):
 30    if ACCOUNT_NAME != "xiaohao":
 31        return
 32    if message.chat.id != -1002653997796:
 33        return
 34    if message.message_thread_id != 1111:
 35        return
 36
 37    matched = await match_social_media_link(message.content)
 38    if matched["platform"] == "x":
 39        await parse_twitter_post(client, message, matched["url"], matched["post_id"])
 40    elif matched["platform"] == "wechat":
 41        await parse_wechat(client, message, f"{matched['url']}")
 42    elif matched["platform"] == "github":
 43        await parse_skills(message, matched["gh_user"], matched["gh_repo"])
 44
 45
 46async def parse_twitter_post(client: Client, message: Message, url: str, pid: str):
 47    api_url = f"{API.FXTWITTER}/status/{pid}"
 48    resp = await hx_req(api_url, proxy=PROXY.TWITTER, check_kv={"tweet.id": pid})
 49    if not glom(resp, "tweet.article", default=None):
 50        await message.reply_text("❌εͺζ”―ζŒTwitter Posts")
 51        return
 52    article = parse_article(resp["tweet"]["article"])
 53    author = glom(resp, "tweet.author.name", default="Unknown")
 54    title = glom(resp, "tweet.article.title", default="Unknown")
 55    date = glom(resp, "tweet.article.created_at", default="1970-01-01T00:00:00.000Z")
 56    dt = datetime.fromisoformat(date).astimezone(ZoneInfo(TZ))
 57    date_str = dt.strftime("%Y-%m-%d %H:%M:%S")
 58    posts = f"---\ntitle: {title}\n\ndate: {date_str}\n\nauthor: {author}\n\nurl: {url}\n\n---\n\n{article['markdown']}"
 59    telegraph_url = await publish_telegraph(title=title, texts=article["markdown"], author=author, url=url) or url
 60    caption = f"πŸ•Š[{author}]({url})\nπŸ•’{date_str}\nπŸ“**[{title}]({telegraph_url})**"
 61    post_path = Path(DOWNLOAD_DIR) / f"{sanitize_filename(title[:64])}.txt"
 62    async with await anyio.open_file(post_path, "w") as f:
 63        await f.write(posts)
 64    # download images
 65    tasks = [download_file(url, proxy=PROXY.TWITTER) for url in article["image_urls"]]
 66    images = await asyncio.gather(*tasks, return_exceptions=True)
 67    images = [img for img in images if isinstance(img, str)]
 68    zip_path = post_path.with_name(f"{rand_string()}.zip")
 69    create_zip([post_path, *images], zip_name=zip_path)
 70    with BytesIO(posts.encode("utf-8")) as f:
 71        await client.send_media_group(
 72            message.chat.id,
 73            message_thread_id=message.message_thread_id,
 74            media=[
 75                InputMediaDocument(f, file_name=f"{title[:64]}.txt"),
 76                InputMediaDocument(zip_path.as_posix(), caption=caption, file_name=f"{title[:64]}.zip"),
 77            ],
 78        )
 79    zip_path.unlink(missing_ok=True)
 80    post_path.unlink(missing_ok=True)
 81
 82
 83async def parse_wechat(client: Client, message: Message, url: str):
 84    headers = {
 85        "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 18_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.69(0x18004539) NetType/4G Language/zh_CN"
 86    }
 87    resp = await hx_req(url, headers=headers, mobile=True, proxy=PROXY.WECHAT, rformat="content")
 88    html = resp["content"].decode("utf-8")
 89    soup = BeautifulSoup(html, "html.parser")
 90    title_tag = soup.find("meta", property="og:title")
 91    title = "Unknown"
 92    if title_tag and title_tag.get("content"):
 93        title = str(title_tag["content"])
 94
 95    desc = ""
 96    desc_tag = soup.find("meta", attrs={"name": "description"})
 97    if desc_tag and desc_tag.get("content"):
 98        desc = desc_tag["content"]
 99
100    date = ""
101    if match_date := re.search(r"createTime = '(.*)'", html):
102        date = match_date.group(1)
103
104    author = ""
105    author_tag = soup.find("meta", attrs={"name": "author"})
106    if author_tag and author_tag.get("content"):
107        author = str(author_tag["content"])
108    markdown = convert_md(html=str(soup))
109    posts = f"---\ntitle: {title}\n\ndate: {date}\n\nauthor: {author}\n\nurl: {url}\n\ndescription:{desc}\n\n---\n\n{markdown}"
110    telegraph_url = await publish_telegraph(title=title, texts=markdown, author=author, url=url) or url
111    caption = f"πŸ•Š[{author}]({url})\nπŸ•’{date}\nπŸ“**[{title}]({telegraph_url})**"
112
113    post_path = Path(DOWNLOAD_DIR) / f"{sanitize_filename(title[:64])}.txt"
114    async with await anyio.open_file(post_path, "w") as f:
115        await f.write(posts)
116
117    # download images
118    _, image_urls = remove_img_tag(markdown)
119    image_urls = [url for url in image_urls if url.startswith("http")]
120    tasks = []
121    for img in image_urls:
122        suffix = ".jpg" if "mmbiz_jpg" in img or "wx_fmt=jpeg" in img else ".png"
123        path = Path(DOWNLOAD_DIR) / f"{rand_string(16)}{suffix}"
124        tasks.append(download_file(img, path=path, headers=headers, proxy=PROXY.WECHAT))
125    images = await asyncio.gather(*tasks, return_exceptions=True)
126    images = [img for img in images if isinstance(img, str)]
127    zip_path = post_path.with_name(f"{rand_string()}.zip")
128    create_zip([post_path, *images], zip_name=zip_path)
129    with BytesIO(posts.encode("utf-8")) as f:
130        await client.send_media_group(
131            message.chat.id,
132            message_thread_id=message.message_thread_id,
133            media=[
134                InputMediaDocument(f, file_name=f"{title[:64]}.txt"),
135                InputMediaDocument(zip_path.as_posix(), caption=caption, file_name=f"{title[:64]}.zip"),
136            ],
137        )
138    zip_path.unlink(missing_ok=True)
139    post_path.unlink(missing_ok=True)
140
141
142async def parse_skills(message: Message, user: str, repo: str):
143    # get default branch
144    resp = await hx_req(
145        f"https://api.github.com/repos/{user}/{repo}",
146        headers={"Authorization": f"Bearer {TOKEN.GITHUB}", "Accept": "application/vnd.github.v3+json"},
147        check_kv={"name": repo},
148    )
149    if not resp.get("default_branch"):
150        return await message.reply("Failed to get default branch", quote=True)
151
152    # download zip
153    zip_url = f"https://github.com/{user}/{repo}/archive/refs/heads/{resp['default_branch']}.zip"
154    zip_root = Path(DOWNLOAD_DIR) / f"skills/{user}/{repo}"
155    shutil.rmtree(zip_root, ignore_errors=True)
156    zip_path = zip_root / f"{repo}.zip"
157    await download_file(zip_url, headers={"Authorization": f"Bearer {TOKEN.GITHUB}"}, path=zip_path, stream=True, proxy=PROXY.GITHUB)
158    if not zip_path.is_file():
159        return await message.reply("Failed to download zip file", quote=True)
160
161    # extract zip
162    with zipfile.ZipFile(zip_path) as zipf:
163        for file in zipf.filelist:
164            zipf.extract(file, path=zip_root)
165    zip_path.unlink(missing_ok=True)
166    # iter all files include subdirectories
167    skills = {}
168    for file in zip_root.rglob("*"):
169        if not file.is_file():
170            continue
171        if file.suffix.lower() not in {".md", ".txt"}:
172            continue
173        if any(x.lower() in file.name.lower() for x in ["license", "readme", "copyright"]):
174            continue
175        skills[file.name] = file.read_text()
176    skills_str = json.dumps(skills, ensure_ascii=False, indent=2)
177    with BytesIO(skills_str.encode("utf-8")) as f:
178        await message.reply_document(f, file_name=f"{repo}.json", quote=True)
179    shutil.rmtree(zip_root, ignore_errors=True)
180    await set_cf_kv(f"SKILLS-{repo}", skills)
181    return skills
182
183
184def create_zip(files: list[str | Path], zip_name: str | Path):
185    with zipfile.ZipFile(zip_name, "w") as zipf:
186        for file in files:
187            if Path(file).exists():
188                zipf.write(str(file), arcname=Path(file).name)
189                Path(file).unlink(missing_ok=True)
190
191
192def sanitize_filename(filename: str) -> str:
193    illegal_chars = r'[\\/:*?"<>|]'
194    sanitized = re.sub(illegal_chars, "_", filename)
195    return sanitized.strip()