main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from collections import defaultdict
  4from typing import Literal
  5
  6import zhconv
  7from glom import Coalesce, glom
  8from pyrogram.client import Client
  9from pyrogram.types import Message
 10
 11from config import CAPTION_LENGTH, PREFIX, PROXY, TEXT_LENGTH, TOKEN
 12from messages.parser import parse_msg
 13from messages.sender import send2tg
 14from messages.utils import blockquote, count_without_entities, equal_prefix, remove_prefix, set_reaction, smart_split, startswith_prefix
 15from networking import download_file, download_first_success_urls, download_media, hx_req
 16from publish import publish_telegraph
 17from utils import seconds_to_hms, zhcn
 18
 19HELP = f"""
 20🎬**查询影视信息**
 21使用说明:
 221. `{PREFIX.TMDB}` + 关键词: 查询影视作品简介及ID
 232. `{PREFIX.TMDB}` + @演员名: 查询演员简介及ID
 243. `/{{ID}}` + : 根据ID查询详细信息
 25ID有三种前缀:
 26 - M: 电影
 27 - T: 电视剧
 28 - P: 演员
 29
 30⚠️注意: `@演员名` 和 `关键词` 不可组合使用
 31
 32示例:
 33 - `{PREFIX.TMDB} 泰坦尼克`: 查找“泰坦尼克”相关作品
 34 - `{PREFIX.TMDB} @莱昂纳多`: 查找演员“莱昂纳多”
 35 - `/M597`: 查询ID为597的电影详情
 36 - `/P6193`: 查询ID为6193的演员详情
 37"""
 38HEADERS = {"accept": "application/json", "Authorization": f"Bearer {TOKEN.TMDB}"}
 39
 40
 41async def search_tmdb(client: Client, message: Message, *, include_adult: bool = True, **kwargs) -> None:
 42    """TMDB command handler."""
 43    if not (startswith_prefix(message.content, prefix=PREFIX.TMDB) or message.content.startswith(("/M", "/T", "/P"))):
 44        return
 45    # send docs if without reply
 46    if equal_prefix(message.text, prefix=PREFIX.TMDB) and not message.reply_to_message:
 47        await send2tg(client, message, texts=HELP, **kwargs)
 48        return
 49    # reply a message with /tmdb
 50    if equal_prefix(message.text, prefix=PREFIX.TMDB) and message.reply_to_message:
 51        message = message.reply_to_message
 52    info = parse_msg(message, silent=True)
 53    await set_reaction(client, message, reaction="👌")
 54    query = remove_prefix(info["text"], prefix=PREFIX.TMDB)
 55    if query.startswith(("/M", "/T", "/P")):
 56        if not query[2:].isdigit():
 57            return
 58        resp = await get_details(query[1:])
 59    elif query.startswith("@"):
 60        resp = await search_people(query[1:], include_adult=include_adult)
 61    else:
 62        resp = await search_keyword(query, include_adult=include_adult)
 63    await send2tg(client, message, **resp, **kwargs)
 64    await set_reaction(client, message, reaction="")
 65
 66
 67async def search_keyword(query: str, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN", *, include_adult: bool = True) -> dict:
 68    """Search movie & TV by keyword.
 69
 70    query: 泰坦
 71
 72    Returns: {"texts": str}
 73    """
 74    retrieved_ids = set()
 75
 76    async def search(q: str, lang: Literal["zh-cn", "zh-tw"]) -> list:
 77        params = {"query": zhconv.convert(q, lang), "include_adult": str(include_adult).lower(), "language": tmdb_lang, "page": 1}
 78        url = "https://api.themoviedb.org/3/search/multi"
 79        resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"page": 1}, check_keys=["results"])
 80        if resp.get("hx_error"):
 81            return []
 82        retrieved = [x for x in resp["results"] if x.get("id") not in retrieved_ids and x.get("media_type") in ["movie", "tv"]]  # only movie & TV
 83        retrieved_ids.update(x.get("id", 0) for x in retrieved)
 84        return retrieved
 85
 86    results = await search(query, "zh-cn")
 87    results.extend(await search(query, "zh-tw"))
 88    final_msg = ""
 89    for item in results:
 90        this_msg = ""
 91        type_initial = item["media_type"][0].upper()  # M: movie, T: TV
 92        original_title = glom(item, Coalesce("original_title", "original_name"), default="")
 93        if title := glom(item, Coalesce("title", "original_title", "name", "original_name"), default=""):
 94            this_msg += f"/{type_initial}{item['id']}:[《{title}》](https://www.themoviedb.org/{item['media_type']}/{item['id']})"
 95
 96        if date := glom(item, Coalesce("release_date", "first_air_date"), default=""):
 97            this_msg += f"({date[:4]})"
 98
 99        if overview := item.get("overview"):
100            if original_title and original_title != title:  # title: 中文名, original_title: 英文名
101                overview = f"{original_title}》: {zhcn(overview)}"
102            this_msg += f"\n{blockquote(overview)}\n"
103        if await count_without_entities(final_msg + this_msg) > TEXT_LENGTH:
104            break
105        final_msg += f"\n{this_msg.strip()}"
106    if not final_msg:
107        return {"texts": "❌未找到相关作品"}
108
109    return {"texts": final_msg}
110
111
112async def search_people(query: str, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN", *, include_adult: bool = True) -> dict:
113    """Search people by keyword.
114
115    query: 莱昂纳多
116
117    Returns: {"texts": str}
118    """
119    retrieved_ids = set()
120
121    async def search(q: str, lang: Literal["zh-cn", "zh-tw"]) -> list:
122        params = {"query": zhconv.convert(q, lang), "include_adult": str(include_adult).lower(), "language": tmdb_lang, "page": 1}
123        url = "https://api.themoviedb.org/3/search/person"
124        resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"page": 1}, check_keys=["results"])
125        if resp.get("hx_error"):
126            return []
127        retrieved = [x for x in resp["results"] if x.get("id") not in retrieved_ids]  # only movie & TV
128        retrieved_ids.update(x.get("id", 0) for x in retrieved)
129        return retrieved
130
131    results = await search(query, "zh-cn")
132    results.extend(await search(query, "zh-tw"))
133    final_msg = ""
134    for item in results:
135        this_msg = ""
136        name = glom(item, Coalesce("name", "original_name"), default="")
137        url = f"https://www.themoviedb.org/person/{item['id']}"
138        this_msg += f"\n/P{item['id']}: {gender_emoji(item.get('gender'))}{'🔞' if item.get('adult') else ''}[{name}]({url})"
139        if item.get("original_name") and item["original_name"] != name:
140            this_msg += f"({item['original_name']})"
141        if item.get("known_for"):
142            known_for = f"代表作: {', '.join([glom(x, Coalesce('title', 'name'), default='') for x in item['known_for']])}"
143            this_msg += f"\n||{known_for}||\n" if item.get("adult") else f"\n{known_for}\n"
144        if await count_without_entities(final_msg + this_msg) > TEXT_LENGTH:
145            break
146        final_msg += f"\n\n{this_msg.strip()}"
147    if not final_msg:
148        return {"texts": "❌未找到相关演员"}
149
150    return {"texts": final_msg}
151
152
153async def get_details(query: str, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN") -> dict:
154    """Get Movie & TV details by tmdb id.
155
156    query: M597, T34691, P6193
157    """
158    if query.startswith("P"):
159        return await get_people_details(int(query[1:]), tmdb_lang)
160    tmdb_id = query[1:]
161    texts = ""
162    if not query.startswith(("M", "T")):
163        return {"texts": "❌未找到作品详情"}
164
165    media_type = "movie" if query[0].upper() == "M" else "tv"  # M: movie, T: tv
166    url = f"https://api.themoviedb.org/3/{media_type}/{tmdb_id}"
167    params = {"append_to_response": "credits,images", "language": tmdb_lang, "include_image_language": "en,cn,zh"}
168    resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"id": tmdb_id}, silent=True)
169    if resp.get("hx_error"):
170        return {"texts": resp["hx_error"]}
171    if title := glom(resp, Coalesce("title", "original_title", "name", "original_name"), default=""):
172        texts += f"标题:《{title}\n"
173
174    original_title = glom(resp, Coalesce("original_title", "original_name"), default="")
175    if original_title and (original_title != title):
176        texts += f"原名:《{original_title}\n"
177
178    if subtitle := resp.get("tagline"):
179        texts += f"副标题: {subtitle}\n"
180
181    if genres := glom(resp, "genres.*.name", default=[]):
182        genres_with_tag = [f"#{genre}" for genre in genres]
183        texts += f"类型: {', '.join(genres_with_tag)}\n"
184
185    if date := glom(resp, Coalesce("release_date", "first_air_date"), default=""):
186        texts += f"日期: {date}\n"
187
188    if duration := resp.get("runtime", 0):
189        texts += f"时长: {seconds_to_hms(duration * 60)}\n"
190    if country := resp.get("origin_country", []):
191        texts += f"地区: {''.join(country)}\n"
192
193    if rate := resp.get("vote_average", 0):
194        texts += f"评分: {rate}\n"
195
196    if company := glom(resp, "production_companies.*.name", default=[]):
197        texts += f"制片: {''.join(company)}\n"
198
199    if imdb_id := resp.get("imdb_id"):
200        texts += f"链接: [TMDB](https://www.themoviedb.org/{media_type}/{resp['id']}), [IMDB](https://www.imdb.com/title/{imdb_id})\n"
201    else:
202        texts += f"链接: [TMDB](https://www.themoviedb.org/{media_type}/{resp['id']})\n"
203    if overview := resp.get("overview"):
204        texts += f"简介: {zhcn(overview)}\n"
205
206    # choose poster language
207    media = []
208
209    posters = defaultdict(list)
210    for poster in glom(resp, "images.posters", default=[]):
211        posters[poster.get("iso_639_1", "unknown")].append(poster)
212    movie_lang = glom(
213        resp,
214        Coalesce(
215            "spoken_languages.0.iso_639_1",
216            "original_language",
217            "origin_country.0",
218            "production_countries.0.iso_3166_1",
219            "production_companies.0.origin_country",
220        ),
221        default="en",
222    ).lower()
223    lang_order = ["cn", "zh", "en"] if movie_lang in ["cn", "zh"] else ["en", "cn", "zh"]
224    url_paths = []
225    for lang in lang_order:
226        urls = [x.get("file_path", "") for x in sorted(posters.get(lang, []), key=lambda x: x.get("height", 0), reverse=True)]
227        url_paths.extend(urls)
228    img_urls = [f"https://image.tmdb.org/t/p/original{url}" for url in url_paths]
229    # add fallback posters
230    if resp.get("poster_path"):
231        img_urls.append(f"https://image.tmdb.org/t/p/original{resp['poster_path']}")
232    if resp.get("backdrop_path"):
233        img_urls.append(f"https://image.tmdb.org/t/p/original{resp['backdrop_path']}")
234    if img_path := await download_first_success_urls(img_urls, proxy=PROXY.TMDB):
235        media.append({"photo": img_path, "has_spoiler": resp.get("adult", False)})
236
237    # process casts
238    for idx, cast in enumerate(glom(resp, "credits.cast", default=[])):
239        if idx == 0:
240            texts += "\n演员表:\n"
241        emoji = gender_emoji(cast.get("gender"))
242        name = glom(cast, Coalesce("name", "original_name"), default="")
243        if character := cast.get("character"):
244            texts += f"{emoji}[{name}](https://www.themoviedb.org/person/{cast['id']}): {character}\n"
245        else:
246            texts += f"{emoji}[{name}](https://www.themoviedb.org/person/{cast['id']})\n"
247
248    # limit to single message
249    texts = (await smart_split(texts, CAPTION_LENGTH))[0] if media else (await smart_split(texts, TEXT_LENGTH))[0]
250    return {"texts": texts, "media": media}
251
252
253async def get_people_details(people_id: int, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN") -> dict:
254    url = f"https://api.themoviedb.org/3/person/{people_id}"
255    params = {"append_to_response": "external_ids,combined_credits,images", "language": tmdb_lang}
256    resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"id": people_id}, silent=True)
257    if resp.get("hx_error"):
258        return {"texts": resp["hx_error"]}
259    texts = gender_emoji(resp.get("gender", ""))
260    if name := glom(resp, Coalesce("name", "original_name"), default=""):
261        texts += f"{name}"
262    if birth := resp.get("birthday"):
263        texts += f"\n生日: {birth}"
264    if death := resp.get("deathday"):
265        texts += f"\n逝世: {death}"
266    if location := resp.get("place_of_birth"):
267        texts += f"\n出生地: {location}"
268
269    # external links
270    texts += f"\n链接: [TMDB](https://www.themoviedb.org/person/{people_id})"
271    if external_ids := resp.get("external_ids", {}):
272        if imdb_id := external_ids.get("imdb_id"):
273            texts += f", [IMDB](https://www.imdb.com/name/{imdb_id})"
274        if facebook_id := external_ids.get("facebook_id"):
275            texts += f", [FB](https://www.facebook.com/{facebook_id})"
276        if instagram_id := external_ids.get("instagram_id"):
277            texts += f", [Ins](https://www.instagram.com/{instagram_id})"
278        if twitter_id := external_ids.get("twitter_id"):
279            texts += f", [X](https://www.twitter.com/{twitter_id})"
280        if youtube_id := external_ids.get("youtube_id"):
281            texts += f", [油管](https://www.youtube.com/{youtube_id})"
282
283    productions_for_caption = []
284    productions_for_html = ""
285    # process casts
286    casts = [x for x in glom(resp, "combined_credits.cast", default=[]) if x.get("media_type") in ["movie", "tv"]]
287    casts = sorted(casts, key=lambda x: glom(x, Coalesce("release_date", "first_air_date"), default=""), reverse=True)
288    for item in casts:
289        type_initial = item["media_type"][0].upper()  # M: movie, T: tv
290        title = glom(item, Coalesce("title", "original_title", "name", "original_name"), default="")
291        full_date = glom(item, Coalesce("release_date", "first_air_date"), default="")
292        date = full_date[:4] if full_date else ""
293        if date:
294            productions_for_caption.append(f"/{type_initial}{item['id']}: {title} ({date})\n")
295            productions_for_html += f'<br>{type_initial}{item["id"]}: <a href="https://www.themoviedb.org/{item["media_type"]}/{item["id"]}">{title}</a> ({full_date})'
296        else:
297            productions_for_caption.append(f"/{type_initial}{item['id']}: {title}\n")
298            productions_for_html += f'<br>{type_initial}{item["id"]}: <a href="https://www.themoviedb.org/{item["media_type"]}/{item["id"]}">{title}</a>'
299        if overview := item.get("overview"):
300            productions_for_html += f"<br>简介: {overview}"
301
302    # process images
303    media = []
304    if images := glom(resp, "images.profiles", default=[]):
305        images = sorted(images, key=lambda x: x.get("width", 0), reverse=True)[:10]  # kepp 10 images
306        media = [{"photo": download_file(f"https://image.tmdb.org/t/p/original{img.get('file_path')}", proxy=PROXY.TMDB)} for img in images]
307        media = await download_media(media)
308    telegraph_url = await publish_telegraph(title=name, html=productions_for_html.strip("<br>"), author=name, url=f"https://www.themoviedb.org/person/{people_id}", ttl="1d")
309
310    description = f"简介: {zhcn(resp['biography'])}" if resp.get("biography") else ""
311    description = description.replace("\\n", "\n")
312    max_length = CAPTION_LENGTH if media else TEXT_LENGTH
313    if await count_without_entities(f"{texts}\n{description}") > max_length - 10:  # long desc
314        if telegraph_url:
315            texts += f"\n[查看作品列表]({telegraph_url})"
316        texts = (await smart_split(f"{texts}\n{description}", max_length))[0]
317    else:  # short desc
318        texts += f"\n{description}"
319        texts += f"\n[作品列表]({telegraph_url}):"
320        productions = "".join(productions_for_caption)
321        texts = (await smart_split(f"{texts}\n{productions}", max_length))[0]
322
323    return {"texts": texts, "media": media}
324
325
326def gender_emoji(gender: str | int) -> str:
327    """Gender emoji.
328
329    0: Not set / not specified
330    1: Female
331    2: Male
332    3: Non-binary
333    """
334    if str(gender) == "1":
335        return "🚺"
336    if str(gender) == "2":
337        return "🚹"
338    return ""