main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from collections import defaultdict
4from typing import Literal
5
6import zhconv
7from glom import Coalesce, glom
8from pyrogram.client import Client
9from pyrogram.types import Message
10
11from config import CAPTION_LENGTH, PREFIX, PROXY, TEXT_LENGTH, TOKEN
12from messages.parser import parse_msg
13from messages.sender import send2tg
14from messages.utils import blockquote, count_without_entities, equal_prefix, remove_prefix, set_reaction, smart_split, startswith_prefix
15from networking import download_file, download_first_success_urls, download_media, hx_req
16from publish import publish_telegraph
17from utils import seconds_to_hms, zhcn
18
19HELP = f"""
20🎬**查询影视信息**
21使用说明:
221. `{PREFIX.TMDB}` + 关键词: 查询影视作品简介及ID
232. `{PREFIX.TMDB}` + @演员名: 查询演员简介及ID
243. `/{{ID}}` + : 根据ID查询详细信息
25ID有三种前缀:
26 - M: 电影
27 - T: 电视剧
28 - P: 演员
29
30⚠️注意: `@演员名` 和 `关键词` 不可组合使用
31
32示例:
33 - `{PREFIX.TMDB} 泰坦尼克`: 查找“泰坦尼克”相关作品
34 - `{PREFIX.TMDB} @莱昂纳多`: 查找演员“莱昂纳多”
35 - `/M597`: 查询ID为597的电影详情
36 - `/P6193`: 查询ID为6193的演员详情
37"""
38HEADERS = {"accept": "application/json", "Authorization": f"Bearer {TOKEN.TMDB}"}
39
40
41async def search_tmdb(client: Client, message: Message, *, include_adult: bool = True, **kwargs) -> None:
42 """TMDB command handler."""
43 if not (startswith_prefix(message.content, prefix=PREFIX.TMDB) or message.content.startswith(("/M", "/T", "/P"))):
44 return
45 # send docs if without reply
46 if equal_prefix(message.text, prefix=PREFIX.TMDB) and not message.reply_to_message:
47 await send2tg(client, message, texts=HELP, **kwargs)
48 return
49 # reply a message with /tmdb
50 if equal_prefix(message.text, prefix=PREFIX.TMDB) and message.reply_to_message:
51 message = message.reply_to_message
52 info = parse_msg(message, silent=True)
53 await set_reaction(client, message, reaction="👌")
54 query = remove_prefix(info["text"], prefix=PREFIX.TMDB)
55 if query.startswith(("/M", "/T", "/P")):
56 if not query[2:].isdigit():
57 return
58 resp = await get_details(query[1:])
59 elif query.startswith("@"):
60 resp = await search_people(query[1:], include_adult=include_adult)
61 else:
62 resp = await search_keyword(query, include_adult=include_adult)
63 await send2tg(client, message, **resp, **kwargs)
64 await set_reaction(client, message, reaction="")
65
66
67async def search_keyword(query: str, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN", *, include_adult: bool = True) -> dict:
68 """Search movie & TV by keyword.
69
70 query: 泰坦
71
72 Returns: {"texts": str}
73 """
74 retrieved_ids = set()
75
76 async def search(q: str, lang: Literal["zh-cn", "zh-tw"]) -> list:
77 params = {"query": zhconv.convert(q, lang), "include_adult": str(include_adult).lower(), "language": tmdb_lang, "page": 1}
78 url = "https://api.themoviedb.org/3/search/multi"
79 resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"page": 1}, check_keys=["results"])
80 if resp.get("hx_error"):
81 return []
82 retrieved = [x for x in resp["results"] if x.get("id") not in retrieved_ids and x.get("media_type") in ["movie", "tv"]] # only movie & TV
83 retrieved_ids.update(x.get("id", 0) for x in retrieved)
84 return retrieved
85
86 results = await search(query, "zh-cn")
87 results.extend(await search(query, "zh-tw"))
88 final_msg = ""
89 for item in results:
90 this_msg = ""
91 type_initial = item["media_type"][0].upper() # M: movie, T: TV
92 original_title = glom(item, Coalesce("original_title", "original_name"), default="")
93 if title := glom(item, Coalesce("title", "original_title", "name", "original_name"), default=""):
94 this_msg += f"/{type_initial}{item['id']}:[《{title}》](https://www.themoviedb.org/{item['media_type']}/{item['id']})"
95
96 if date := glom(item, Coalesce("release_date", "first_air_date"), default=""):
97 this_msg += f"({date[:4]})"
98
99 if overview := item.get("overview"):
100 if original_title and original_title != title: # title: 中文名, original_title: 英文名
101 overview = f"《{original_title}》: {zhcn(overview)}"
102 this_msg += f"\n{blockquote(overview)}\n"
103 if await count_without_entities(final_msg + this_msg) > TEXT_LENGTH:
104 break
105 final_msg += f"\n{this_msg.strip()}"
106 if not final_msg:
107 return {"texts": "❌未找到相关作品"}
108
109 return {"texts": final_msg}
110
111
112async def search_people(query: str, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN", *, include_adult: bool = True) -> dict:
113 """Search people by keyword.
114
115 query: 莱昂纳多
116
117 Returns: {"texts": str}
118 """
119 retrieved_ids = set()
120
121 async def search(q: str, lang: Literal["zh-cn", "zh-tw"]) -> list:
122 params = {"query": zhconv.convert(q, lang), "include_adult": str(include_adult).lower(), "language": tmdb_lang, "page": 1}
123 url = "https://api.themoviedb.org/3/search/person"
124 resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"page": 1}, check_keys=["results"])
125 if resp.get("hx_error"):
126 return []
127 retrieved = [x for x in resp["results"] if x.get("id") not in retrieved_ids] # only movie & TV
128 retrieved_ids.update(x.get("id", 0) for x in retrieved)
129 return retrieved
130
131 results = await search(query, "zh-cn")
132 results.extend(await search(query, "zh-tw"))
133 final_msg = ""
134 for item in results:
135 this_msg = ""
136 name = glom(item, Coalesce("name", "original_name"), default="")
137 url = f"https://www.themoviedb.org/person/{item['id']}"
138 this_msg += f"\n/P{item['id']}: {gender_emoji(item.get('gender'))}{'🔞' if item.get('adult') else ''}[{name}]({url})"
139 if item.get("original_name") and item["original_name"] != name:
140 this_msg += f"({item['original_name']})"
141 if item.get("known_for"):
142 known_for = f"代表作: {', '.join([glom(x, Coalesce('title', 'name'), default='') for x in item['known_for']])}"
143 this_msg += f"\n||{known_for}||\n" if item.get("adult") else f"\n{known_for}\n"
144 if await count_without_entities(final_msg + this_msg) > TEXT_LENGTH:
145 break
146 final_msg += f"\n\n{this_msg.strip()}"
147 if not final_msg:
148 return {"texts": "❌未找到相关演员"}
149
150 return {"texts": final_msg}
151
152
153async def get_details(query: str, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN") -> dict:
154 """Get Movie & TV details by tmdb id.
155
156 query: M597, T34691, P6193
157 """
158 if query.startswith("P"):
159 return await get_people_details(int(query[1:]), tmdb_lang)
160 tmdb_id = query[1:]
161 texts = ""
162 if not query.startswith(("M", "T")):
163 return {"texts": "❌未找到作品详情"}
164
165 media_type = "movie" if query[0].upper() == "M" else "tv" # M: movie, T: tv
166 url = f"https://api.themoviedb.org/3/{media_type}/{tmdb_id}"
167 params = {"append_to_response": "credits,images", "language": tmdb_lang, "include_image_language": "en,cn,zh"}
168 resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"id": tmdb_id}, silent=True)
169 if resp.get("hx_error"):
170 return {"texts": resp["hx_error"]}
171 if title := glom(resp, Coalesce("title", "original_title", "name", "original_name"), default=""):
172 texts += f"标题:《{title}》\n"
173
174 original_title = glom(resp, Coalesce("original_title", "original_name"), default="")
175 if original_title and (original_title != title):
176 texts += f"原名:《{original_title}》\n"
177
178 if subtitle := resp.get("tagline"):
179 texts += f"副标题: {subtitle}\n"
180
181 if genres := glom(resp, "genres.*.name", default=[]):
182 genres_with_tag = [f"#{genre}" for genre in genres]
183 texts += f"类型: {', '.join(genres_with_tag)}\n"
184
185 if date := glom(resp, Coalesce("release_date", "first_air_date"), default=""):
186 texts += f"日期: {date}\n"
187
188 if duration := resp.get("runtime", 0):
189 texts += f"时长: {seconds_to_hms(duration * 60)}\n"
190 if country := resp.get("origin_country", []):
191 texts += f"地区: {'、'.join(country)}\n"
192
193 if rate := resp.get("vote_average", 0):
194 texts += f"评分: {rate}\n"
195
196 if company := glom(resp, "production_companies.*.name", default=[]):
197 texts += f"制片: {'、'.join(company)}\n"
198
199 if imdb_id := resp.get("imdb_id"):
200 texts += f"链接: [TMDB](https://www.themoviedb.org/{media_type}/{resp['id']}), [IMDB](https://www.imdb.com/title/{imdb_id})\n"
201 else:
202 texts += f"链接: [TMDB](https://www.themoviedb.org/{media_type}/{resp['id']})\n"
203 if overview := resp.get("overview"):
204 texts += f"简介: {zhcn(overview)}\n"
205
206 # choose poster language
207 media = []
208
209 posters = defaultdict(list)
210 for poster in glom(resp, "images.posters", default=[]):
211 posters[poster.get("iso_639_1", "unknown")].append(poster)
212 movie_lang = glom(
213 resp,
214 Coalesce(
215 "spoken_languages.0.iso_639_1",
216 "original_language",
217 "origin_country.0",
218 "production_countries.0.iso_3166_1",
219 "production_companies.0.origin_country",
220 ),
221 default="en",
222 ).lower()
223 lang_order = ["cn", "zh", "en"] if movie_lang in ["cn", "zh"] else ["en", "cn", "zh"]
224 url_paths = []
225 for lang in lang_order:
226 urls = [x.get("file_path", "") for x in sorted(posters.get(lang, []), key=lambda x: x.get("height", 0), reverse=True)]
227 url_paths.extend(urls)
228 img_urls = [f"https://image.tmdb.org/t/p/original{url}" for url in url_paths]
229 # add fallback posters
230 if resp.get("poster_path"):
231 img_urls.append(f"https://image.tmdb.org/t/p/original{resp['poster_path']}")
232 if resp.get("backdrop_path"):
233 img_urls.append(f"https://image.tmdb.org/t/p/original{resp['backdrop_path']}")
234 if img_path := await download_first_success_urls(img_urls, proxy=PROXY.TMDB):
235 media.append({"photo": img_path, "has_spoiler": resp.get("adult", False)})
236
237 # process casts
238 for idx, cast in enumerate(glom(resp, "credits.cast", default=[])):
239 if idx == 0:
240 texts += "\n演员表:\n"
241 emoji = gender_emoji(cast.get("gender"))
242 name = glom(cast, Coalesce("name", "original_name"), default="")
243 if character := cast.get("character"):
244 texts += f"{emoji}[{name}](https://www.themoviedb.org/person/{cast['id']}): {character}\n"
245 else:
246 texts += f"{emoji}[{name}](https://www.themoviedb.org/person/{cast['id']})\n"
247
248 # limit to single message
249 texts = (await smart_split(texts, CAPTION_LENGTH))[0] if media else (await smart_split(texts, TEXT_LENGTH))[0]
250 return {"texts": texts, "media": media}
251
252
253async def get_people_details(people_id: int, tmdb_lang: Literal["en-US", "zh-CN"] = "zh-CN") -> dict:
254 url = f"https://api.themoviedb.org/3/person/{people_id}"
255 params = {"append_to_response": "external_ids,combined_credits,images", "language": tmdb_lang}
256 resp = await hx_req(url, headers=HEADERS, params=params, proxy=PROXY.TMDB, check_kv={"id": people_id}, silent=True)
257 if resp.get("hx_error"):
258 return {"texts": resp["hx_error"]}
259 texts = gender_emoji(resp.get("gender", ""))
260 if name := glom(resp, Coalesce("name", "original_name"), default=""):
261 texts += f"{name}"
262 if birth := resp.get("birthday"):
263 texts += f"\n生日: {birth}"
264 if death := resp.get("deathday"):
265 texts += f"\n逝世: {death}"
266 if location := resp.get("place_of_birth"):
267 texts += f"\n出生地: {location}"
268
269 # external links
270 texts += f"\n链接: [TMDB](https://www.themoviedb.org/person/{people_id})"
271 if external_ids := resp.get("external_ids", {}):
272 if imdb_id := external_ids.get("imdb_id"):
273 texts += f", [IMDB](https://www.imdb.com/name/{imdb_id})"
274 if facebook_id := external_ids.get("facebook_id"):
275 texts += f", [FB](https://www.facebook.com/{facebook_id})"
276 if instagram_id := external_ids.get("instagram_id"):
277 texts += f", [Ins](https://www.instagram.com/{instagram_id})"
278 if twitter_id := external_ids.get("twitter_id"):
279 texts += f", [X](https://www.twitter.com/{twitter_id})"
280 if youtube_id := external_ids.get("youtube_id"):
281 texts += f", [油管](https://www.youtube.com/{youtube_id})"
282
283 productions_for_caption = []
284 productions_for_html = ""
285 # process casts
286 casts = [x for x in glom(resp, "combined_credits.cast", default=[]) if x.get("media_type") in ["movie", "tv"]]
287 casts = sorted(casts, key=lambda x: glom(x, Coalesce("release_date", "first_air_date"), default=""), reverse=True)
288 for item in casts:
289 type_initial = item["media_type"][0].upper() # M: movie, T: tv
290 title = glom(item, Coalesce("title", "original_title", "name", "original_name"), default="")
291 full_date = glom(item, Coalesce("release_date", "first_air_date"), default="")
292 date = full_date[:4] if full_date else ""
293 if date:
294 productions_for_caption.append(f"/{type_initial}{item['id']}: {title} ({date})\n")
295 productions_for_html += f'<br>{type_initial}{item["id"]}: <a href="https://www.themoviedb.org/{item["media_type"]}/{item["id"]}">{title}</a> ({full_date})'
296 else:
297 productions_for_caption.append(f"/{type_initial}{item['id']}: {title}\n")
298 productions_for_html += f'<br>{type_initial}{item["id"]}: <a href="https://www.themoviedb.org/{item["media_type"]}/{item["id"]}">{title}</a>'
299 if overview := item.get("overview"):
300 productions_for_html += f"<br>简介: {overview}"
301
302 # process images
303 media = []
304 if images := glom(resp, "images.profiles", default=[]):
305 images = sorted(images, key=lambda x: x.get("width", 0), reverse=True)[:10] # kepp 10 images
306 media = [{"photo": download_file(f"https://image.tmdb.org/t/p/original{img.get('file_path')}", proxy=PROXY.TMDB)} for img in images]
307 media = await download_media(media)
308 telegraph_url = await publish_telegraph(title=name, html=productions_for_html.strip("<br>"), author=name, url=f"https://www.themoviedb.org/person/{people_id}", ttl="1d")
309
310 description = f"简介: {zhcn(resp['biography'])}" if resp.get("biography") else ""
311 description = description.replace("\\n", "\n")
312 max_length = CAPTION_LENGTH if media else TEXT_LENGTH
313 if await count_without_entities(f"{texts}\n{description}") > max_length - 10: # long desc
314 if telegraph_url:
315 texts += f"\n[查看作品列表]({telegraph_url})"
316 texts = (await smart_split(f"{texts}\n{description}", max_length))[0]
317 else: # short desc
318 texts += f"\n{description}"
319 texts += f"\n[作品列表]({telegraph_url}):"
320 productions = "".join(productions_for_caption)
321 texts = (await smart_split(f"{texts}\n{productions}", max_length))[0]
322
323 return {"texts": texts, "media": media}
324
325
326def gender_emoji(gender: str | int) -> str:
327 """Gender emoji.
328
329 0: Not set / not specified
330 1: Female
331 2: Male
332 3: Non-binary
333 """
334 if str(gender) == "1":
335 return "🚺"
336 if str(gender) == "2":
337 return "🚹"
338 return ""