Commit 7fabb90
Changed files (5)
src/preview/github.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from datetime import UTC, datetime
+from zoneinfo import ZoneInfo
+
+from glom import glom
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import PROXY, TOKEN, TZ
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from networking import download_file, hx_req
+from utils import nowdt
+
+
+async def preview_github(client: Client, message: Message, url: str, gh_user: str = "", gh_repo: str = "", **kwargs):
+ """Preview github info in the message."""
+ if kwargs.get("show_progress") and "progress" not in kwargs:
+ res = await send2tg(client, message, texts=f"🔗正在解析GitHub链接\n{url}", **kwargs)
+ kwargs["progress"] = res[0]
+ headers = {"Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28"}
+ if TOKEN.GITHUB:
+ headers["Authorization"] = f"Bearer {TOKEN.GITHUB}"
+ api = f"https://api.github.com/repos/{gh_user}/{gh_repo}"
+ resp = await hx_req(api, headers=headers, proxy=PROXY.GITHUB, check_keys=["full_name"])
+ if error := resp.get("hx_error"):
+ await modify_progress(text=f"❌GitHub解析失败: {error}", force_update=True, **kwargs)
+ return
+ full_repo = resp["full_name"]
+ url = f"https://github.com/{full_repo}"
+ gh_user, gh_repo = full_repo.split("/") # correct uppercase / lowercase
+ msg = ""
+ if desc := resp["description"]:
+ msg += f"**{desc}**\n"
+
+ msg += f"📦[{full_repo}](https://github.com/{full_repo})\n"
+ if upstream := glom(resp, "parent.full_name", default=""):
+ msg += f"⬆️上游: [{upstream}](https://github.com/{upstream})\n"
+
+ msg += f"⭐️Star: {resp['stargazers_count']}\n"
+ msg += f"🔀Fork: {resp['forks_count']}\n"
+ msg += f"👁Watch: {resp['stargazers_count']}\n"
+ msg += f"❔Issues: {resp['open_issues_count']}\n"
+
+ if lcen := glom(resp, "license.spdx_id", default=""):
+ msg += f"📄License: {lcen}\n"
+ msg += f"🕒创建日期: {convert_dt(resp['created_at']):%Y-%m-%d %H:%M:%S}\n"
+ msg += f"🔄最新推送: {delta_time(resp['pushed_at'])}\n"
+ if tags := glom(resp, "topics", default=[]):
+ msg += "🏷️Tags:"
+ for tag in tags:
+ msg += f" #{tag}"
+ media = []
+ if readme := await github_readme(gh_user, gh_repo, headers=headers):
+ media = [{"document": readme}]
+ kwargs["send_from_user"] = "" # disable @send_user
+ await send2tg(client, message, texts=msg.strip(), media=media, **kwargs)
+ await modify_progress(del_status=True, **kwargs)
+
+
+async def github_readme(user: str, repo: str, headers: dict) -> str:
+ """Returns downloaded README path."""
+ api = f"https://api.github.com/repos/{user}/{repo}/readme"
+ resp = await hx_req(api, headers=headers, proxy=PROXY.GITHUB, check_kv={"type": "file"})
+ if not resp.get("download_url"):
+ return ""
+ return await download_file(resp["download_url"], headers=headers, skip_exist=False, proxy=PROXY.GITHUB)
+
+
+def convert_dt(dt: str) -> datetime:
+ """Convert 2017-04-18T22:02:38Z to datetime object."""
+ return datetime.strptime(dt[:-1], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=UTC).astimezone(ZoneInfo(TZ))
+
+
+def delta_time(dt: str) -> str:
+ """Time difference between the current time and the specified time.
+
+ dt format: 2017-04-18T22:02:38Z
+ Returns: 2d12h
+ """
+ time = convert_dt(dt)
+ now = nowdt(TZ)
+ delta = now - time
+ res = ""
+ if delta.days:
+ res += f"{delta.days}天"
+ minutes, seconds = divmod(delta.seconds, 60)
+ hours, minutes = divmod(minutes, 60)
+ if hours:
+ res += f"{hours}小时"
+ if minutes:
+ res += f"{minutes}分钟"
+ return res + "前" if res else "刚刚"
src/config.py
@@ -54,6 +54,7 @@ class ENABLE: # see fine-grained permission in `src/permission.py`
WECHAT = os.getenv("ENABLE_WECHAT", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
REDDIT = os.getenv("ENABLE_REDDIT", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
WGET = os.getenv("ENABLE_WGET", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+ GITHUB = os.getenv("ENABLE_GITHUB", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
XHS = os.getenv("ENABLE_XHS", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
YTDLP = os.getenv("ENABLE_YTDLP", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
RAW_IMG_CONVERT = os.getenv("ENABLE_RAW_IMG_CONVERT", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
@@ -155,6 +156,7 @@ class TOKEN:
NEOCITIES = os.getenv("NEOCITIES_USERPASS", "") # in "user,pass" format
NEOCITIES_IV_HASH = os.getenv("NEOCITIES_INSTANTVIEW_HASH", "")
R2_IV_HASH = os.getenv("R2_INSTANTVIEW_HASH", "")
+ GITHUB = os.getenv("GITHUB_TOKEN", "")
class PROXY: # format: socks5://127.0.0.1:7890
@@ -177,6 +179,7 @@ class PROXY: # format: socks5://127.0.0.1:7890
DOWNLOAD = os.getenv("DOWNLOAD_PROXY", None)
WEIBO = os.getenv("WEIBO_PROXY", None)
REDDIT = os.getenv("REDDIT_PROXY", None)
+ GITHUB = os.getenv("GITHUB_PROXY", None)
YTDLP = os.getenv("YTDLP_PROXY", None) # general proxy for ytdlp
YTDLP_FALLBACK = os.getenv("YTDLP_PROXY_FALLBACK", None) # fallback proxy for ytdlp
# for ytdlp proxy of specific sites (Like Bilibili), use this format: YTDLP_PROXY_BILIBILI
src/handler.py
@@ -28,6 +28,7 @@ from others.search_ytb import search_youtube
from permission import check_service
from preview.bilibili import preview_bilibili
from preview.douyin import preview_douyin
+from preview.github import preview_github
from preview.instagram import preview_instagram
from preview.reddit import preview_reddit
from preview.twitter import preview_twitter
@@ -153,6 +154,7 @@ async def handle_social_media(
twitter: bool = True,
weibo: bool = True,
reddit: bool = True,
+ github: bool = True,
xhs: bool = True,
ytdlp: bool = True,
show_progress: bool = True,
@@ -255,6 +257,8 @@ async def handle_social_media(
return await preview_xhs(client, message, **kwargs)
if xhs and matched["platform"] == "wechat":
return await preview_wechat(client, message, **kwargs)
+ if github and matched["platform"] == "github":
+ return await preview_github(client, message, **kwargs)
if reddit and matched["platform"] == "reddit":
return await preview_reddit(client, message, **kwargs)
if matched["platform"].startswith("bilibili-"): # this is not bilibili video, for videos, use yt-dlp
@@ -338,6 +342,8 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefix: str):
msg += "\n🎈Reddit"
if permission["wechat"]:
msg += "\n🟢微信文章"
+ if permission["github"]:
+ msg += "\n📦GitHub"
if permission["ytdlp"]:
msg += "\n🔴油管"
msg += "\n🅱️哔哩哔哩"
src/networking.py
@@ -354,6 +354,13 @@ async def match_social_media_link(text: str, *, flatten_first: bool = True) -> d
url = f"https://www.bilibili.com/opus/{post_id}"
return {"url": url, "db_key": url, "post_id": post_id, "platform": "bilibili-opus"}
+ # https://github.com/user-name/repo
+ if matched := re.search(r"(https?://)?github\.com/([a-zA-Z0-9]+(-[a-zA-Z0-9]+)*)/([a-zA-Z0-9_-]+)", text):
+ gh_user = matched.group(2)
+ gh_repo = matched.group(4)
+ url = f"https://github.com/{gh_user}/{gh_repo}"
+ return {"url": url, "db_key": bare_url(url), "gh_user": gh_user, "gh_repo": gh_repo, "platform": "github"}
+
# https://www.youtube.com/watch?v=D6aE2E0RHTc
if matched := re.search(r"(https?://)?(:?m\.|www\.)?youtube\.com/watch([^,,.。\s]+)", text):
queries = parse_qs(urlparse(matched.group(0)).query)
src/permission.py
@@ -116,6 +116,7 @@ def check_service(cid: int | str, ctype: str) -> dict:
"twitter": True,
"weibo": True,
"xhs": True,
+ "github": True,
"wechat": True,
"reddit": True,
"ytdlp": True,
@@ -136,6 +137,8 @@ def check_service(cid: int | str, ctype: str) -> dict:
permission["weibo"] = False
if not ENABLE.XHS:
permission["xhs"] = False
+ if not ENABLE.GITHUB:
+ permission["github"] = False
if not ENABLE.DOUYIN:
permission["douyin"] = False
if not ENABLE.TIKTOK: