Commit 3ad2a13

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-08-08 03:13:53
feat(spotify): add support for Spotify links
1 parent 8d53fc7
src/bridge/social.py
@@ -20,7 +20,7 @@ SOCIAL_BOTS = {
     "KyDownloaderBot": ["bilibili", "douyin", "tiktok", "instagram", "instagram", "weibo", "x", "xiaohongshu", "youtube"],
     "DouYintg_bot": ["bilibili", "douyin", "tiktok", "instagram", "instagram", "weibo", "x", "xiaohongshu", "youtube"],
     "MultiSaverXbot": ["bilibili", "douyin", "tiktok", "instagram", "instagram", "weibo", "x", "xiaohongshu", "youtube"],
-    "Music163bot": ["music163"],
+    "Music163bot": ["music163", "spotify"],
 }
 
 CAPTIONS = {"MultiSaverXbot": ""}
@@ -36,6 +36,7 @@ async def send_to_social_media_bridge(client: Client, message: Message, url: str
         "target_cid": kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id,  # MSG-A's cid
         "target_mid": kwargs.get("target_mid"),  # disable reply, because the reply message may be sent as a series of messages.
         "url": url,
+        "caption": kwargs.get("caption"),
     }
 
     # add progress message
@@ -47,12 +48,16 @@ async def send_to_social_media_bridge(client: Client, message: Message, url: str
     # ! Warning: currently we can only handle single url at a time
     cache.set("social-global", "waiting-for-bots", ttl=120)
 
-    for bot, platforms in SOCIAL_BOTS.items():
+    for bot_name, platforms in SOCIAL_BOTS.items():
         if platform in platforms:
-            logger.warning(f"Trying {platform} bridge (@{bot}): {url}")
-            cache.set(f"social-{bot}", params, ttl=120)  # save params to cache for each bot
+            cache.set(f"social-{bot_name}", params, ttl=120)  # save params to cache for each bot
+            # determine the text to send
+            text = url
+            if kwargs.get(f"prefix-{bot_name}"):  # override the prefix
+                text = f"{kwargs[f'prefix-{bot_name}']}{url}"
             with contextlib.suppress(Exception):
-                await client.send_message(chat_id=f"@{bot}", text=url)
+                logger.warning(f"Trying {platform} bridge (@{bot_name}): {text}")
+                await client.send_message(chat_id=f"@{bot_name}", text=text)
 
 
 async def forward_social_media_results(client: Client, message: Message):
@@ -92,11 +97,15 @@ async def forward_social_media_results(client: Client, message: Message):
         return
 
     logger.info(f"Forwarding {info['mtype']} from @{info['handle']} -> chat={params['target_cid']}, id={params['target_mid']}")
+    # determine caption
+    caption = CAPTIONS.get(info["handle"], "")  # set globally for this bot
+    if params.get("caption"):  # set for this message
+        caption = params["caption"]
     await client.copy_message(
         chat_id=params["target_cid"],
         from_chat_id=info["cid"],
         message_id=info["mid"],
-        caption=CAPTIONS.get(info["handle"]),  # type: ignore
+        caption=caption,
         reply_parameters=ReplyParameters(message_id=params["target_mid"]),
     )
     # ! Because `cache` is not shared between different processes, we can't safely use it to store media_group_id
src/preview/spotify.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from typing import Literal
+
+from glom import glom
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from bridge.social import send_to_social_media_bridge
+from config import PREFIX, PROXY, TOKEN, cache
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from networking import download_file, hx_req
+from utils import seconds_to_time, zhcn
+
+
+async def preview_spotify(
+    client: Client,
+    message: Message,
+    resource: Literal["track", "album", "artist", "playlist"],
+    spotify_id: str,
+    url: str,
+    **kwargs,
+):
+    """Preview spotify info in the message."""
+    if kwargs.get("show_progress") and "progress" not in kwargs:
+        res = await send2tg(client, message, texts=f"🔗正在解析Spotify链接\n{url}", **kwargs)
+        kwargs["progress"] = res[0]
+    if resource == "track":
+        await preview_track(client, message, url, spotify_id, **kwargs)
+    elif resource == "album":
+        await preview_album(client, message, url, spotify_id, **kwargs)
+
+
+async def preview_track(client: Client, message: Message, url: str, spotify_id: str, **kwargs):
+    api = f"https://api.spotify.com/v1/tracks/{spotify_id}"
+    access_token = await get_access_token()
+    headers = {"Authorization": f"Bearer {access_token}"}
+    resp = await hx_req(api, headers=headers, proxy=PROXY.SPOTIFY, check_kv={"id": spotify_id})
+
+    status = f"🎧歌曲: [{resp['name']}]({url})\n"
+    if artists := resp.get("artists", []):
+        for artist in artists:
+            artist_url = glom(artist, "external_urls.spotify", default=url)
+            status += f"🎙歌手: [{artist['name']}]({artist_url})\n"
+    if duration_ms := resp.get("duration_ms"):
+        status += f"⏱时长: {seconds_to_time(duration_ms / 1000)}\n"
+    if album_name := glom(resp, "album.name", default=None):
+        album_url = glom(resp, "album.external_urls.spotify", default=url)
+        status += f"💿专辑: [{album_name}]({album_url})\n"
+    if release_date := glom(resp, "album.release_date", default=None):
+        status += f"🗓日期: {release_date}\n"
+
+    texts_to_other_bot = resp["name"]
+    if artist := glom(resp, "artists.0.name", default=None):
+        texts_to_other_bot = f"{artist} - {resp['name']}"
+    texts_to_other_bot = zhcn(texts_to_other_bot)
+    # add warning
+    status_warning = "\n⚠️我无法直接下载歌曲\n正在发送歌曲信息给第三方Bot:\n@Music163bot"
+    caption_warning = f"\n⚠️本歌曲由 @Music163bot 通过以下关键词搜索获得\n`{texts_to_other_bot}`\n"
+    caption_warning += "如不准确可私聊以下Bot自行搜索:\n @Music163bot @VmomoVBot\n"
+    caption_warning += f"您也发送以下命令在YouTube上查找:\n`{PREFIX.SEARCH_YOUTUBE} {texts_to_other_bot}`"
+    await modify_progress(text=status + status_warning, **kwargs)
+
+    kwargs |= {
+        "send_from_user": "",  # disable @send_user
+        "caption": status + caption_warning,
+        "prefix-Music163bot": "/music ",
+        "target_mid": message.id,  # record the trigger msg_id as target_mid in kwargs
+    }
+    await send_to_social_media_bridge(client, message, texts_to_other_bot, **kwargs)
+
+
+async def preview_album(client: Client, message: Message, url: str, spotify_id: str, **kwargs):
+    api = f"https://api.spotify.com/v1/albums/{spotify_id}"
+    access_token = await get_access_token()
+    headers = {"Authorization": f"Bearer {access_token}"}
+    resp = await hx_req(api, headers=headers, proxy=PROXY.SPOTIFY, check_kv={"id": spotify_id})
+    status = f"💿专辑: [{resp['name']}]({url})\n"
+    if artists := resp.get("artists", []):
+        for artist in artists:
+            artist_url = glom(artist, "external_urls.spotify", default=url)
+            status += f"🎙歌手: [{artist['name']}]({artist_url})\n"
+    if release_date := resp.get("release_date"):
+        status += f"🗓日期: {release_date}\n"
+    if tracks := glom(resp, "tracks.items", default=[]):
+        status += "🎧歌曲:\n"
+        for idx, track in enumerate(tracks):
+            track_url = glom(track, "external_urls.spotify", default=url)
+            status += f"{idx + 1}. [{track['name']}]({track_url}) ({seconds_to_time(track['duration_ms'] / 1000)})\n"
+    cover_url = glom(resp, "images.0.url", default="")
+    media = [{"photo": await download_file(cover_url, suffix=".jpg", proxy=PROXY.SPOTIFY)}] if cover_url else []
+    kwargs["send_from_user"] = ""  # disable @send_user
+    await send2tg(client, message, texts=status, media=media, **kwargs)
+    await modify_progress(del_status=True, **kwargs)
+
+
+@cache.memoize(ttl=3600)
+async def get_access_token():
+    api = "https://accounts.spotify.com/api/token"
+    headers = {"Content-Type": "application/x-www-form-urlencoded"}
+    data = f"grant_type=client_credentials&client_id={TOKEN.SPOTIFY_CLIENT_ID}&client_secret={TOKEN.SPOTIFY_CLIENT_SECRET}"
+    resp = await hx_req(api, method="POST", proxy=PROXY.SPOTIFY, content_data=data, headers=headers, check_keys=["access_token"])
+    return resp["access_token"]
src/config.py
@@ -56,6 +56,7 @@ class ENABLE:  # see fine-grained permission in `src/permission.py`
     WGET = os.getenv("ENABLE_WGET", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     GITHUB = os.getenv("ENABLE_GITHUB", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     MUSIC163 = os.getenv("ENABLE_MUSIC163", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+    SPOTIFY = os.getenv("ENABLE_SPOTIFY", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     XHS = os.getenv("ENABLE_XHS", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     YTDLP = os.getenv("ENABLE_YTDLP", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     RAW_IMG_CONVERT = os.getenv("ENABLE_RAW_IMG_CONVERT", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
@@ -158,6 +159,8 @@ class TOKEN:
     NEOCITIES_IV_HASH = os.getenv("NEOCITIES_INSTANTVIEW_HASH", "")
     R2_IV_HASH = os.getenv("R2_INSTANTVIEW_HASH", "")
     GITHUB = os.getenv("GITHUB_TOKEN", "")
+    SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "")
+    SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "")
 
 
 class PROXY:  # format: socks5://127.0.0.1:7890
@@ -172,6 +175,7 @@ class PROXY:  # format: socks5://127.0.0.1:7890
     TIKTOK = os.getenv("TIKTOK_PROXY", None)
     INSTAGRAM = os.getenv("INSTAGRAM_PROXY", None)
     TWITTER = os.getenv("TWITTER_PROXY", None)
+    SPOTIFY = os.getenv("SPOTIFY_PROXY", None)
     SUBTITLE = os.getenv("SUBTITLE_PROXY", None)
     YOUTUBE_SEARCH = os.getenv("YOUTUBE_SEARCH_PROXY", None)
     CRYPTO = os.getenv("CRYPTO_PROXY", None)
src/handler.py
@@ -32,6 +32,7 @@ from preview.github import preview_github
 from preview.instagram import preview_instagram
 from preview.netease import preview_music163
 from preview.reddit import preview_reddit
+from preview.spotify import preview_spotify
 from preview.twitter import preview_twitter
 from preview.wechat import preview_wechat
 from preview.weibo import preview_weibo
@@ -158,6 +159,7 @@ async def handle_social_media(
     github: bool = True,
     xhs: bool = True,
     music163: bool = True,
+    spotify: bool = True,
     ytdlp: bool = True,
     show_progress: bool = True,
     detail_progress: bool = False,
@@ -265,6 +267,8 @@ async def handle_social_media(
             return await preview_reddit(client, message, **kwargs)
         if music163 and matched["platform"] == "music163":
             return await preview_music163(client, message, **kwargs)
+        if spotify and matched["platform"] == "spotify":
+            return await preview_spotify(client, message, **kwargs)
         if matched["platform"].startswith("bilibili-"):  # this is not bilibili video, for videos, use yt-dlp
             return await preview_bilibili(client, message, **kwargs)
         sent_messages = []
@@ -344,6 +348,8 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefix: str):
         msg += "\n🏞Instagram"
     if permission["music163"]:
         msg += "\n🎧网易云音乐"
+    if permission["spotify"]:
+        msg += "\n🎧Spotify"
     if permission["reddit"]:
         msg += "\n🎈Reddit"
     if permission["wechat"]:
src/networking.py
@@ -357,6 +357,13 @@ async def match_social_media_link(text: str, *, flatten_first: bool = True) -> d
         url = f"https://github.com/{gh_user}/{gh_repo}"
         return {"url": url, "db_key": bare_url(url), "gh_user": gh_user, "gh_repo": gh_repo, "platform": "github"}
 
+    # https://open.spotify.com/track/0cOMncRq4cmDLO4tPQnkBF
+    if matched := re.search(r"(https?://)?open\.spotify\.com/(:?track|album|artist|playlist)/([a-zA-Z0-9]+)", text):
+        resource = matched.group(2)
+        spotify_id = matched.group(3)
+        url = matched.group(0)
+        return {"url": url, "db_key": bare_url(url), "resource": resource, "spotify_id": spotify_id, "platform": "spotify"}
+
     # https://music.163.com/song?id=2021343740
     # https://163cn.tv/HYHqZ6R
     # https://163cn.link/HYHqZ6R
src/permission.py
@@ -142,6 +142,8 @@ def check_service(cid: int | str, ctype: str) -> dict:
         permission["github"] = False
     if not ENABLE.MUSIC163:
         permission["music163"] = False
+    if not ENABLE.SPOTIFY:
+        permission["spotify"] = False
     if not ENABLE.DOUYIN:
         permission["douyin"] = False
     if not ENABLE.TIKTOK: