main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import random
5import string
6import time
7from contextlib import suppress
8
9from glom import glom
10from httpx import AsyncClient, AsyncHTTPTransport
11from loguru import logger
12
13from emby.api import build_params, default_headers, get_item, get_playback_info, get_similar
14from emby.constant import EMBY_PROXY, UA
15from networking import hx_req
16from utils import nowdt
17
18
19async def play_item(credentials: dict, server_name: str, item: dict, play_seconds: float = 10) -> bool:
20 item_id = item["Id"]
21 await get_item(credentials, item_id)
22 await get_similar(credentials, item_id)
23
24 playback_info = await get_playback_info(credentials, item_id)
25 if playback_info.get("hx_error"):
26 return False
27
28 play_session_id = playback_info["PlaySessionId"]
29 random_id = "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(32))
30 media_source_id = glom(playback_info, "MediaSources.0.Id", default=random_id)
31 direct_stream_url = glom(playback_info, "MediaSources.0.DirectStreamUrl", default=None)
32 headers = default_headers(credentials) | {"accept": "application/json, text/plain, */*", "content-type": "application/json", "accept-encoding": "gzip, deflate", "accept-language": "en-US,*"}
33
34 def get_playing_data(tick: int, *, init: bool = False, start: bool = False, update: bool = False, stop: bool = False) -> dict:
35 data = {
36 "AudioStreamIndex": -1,
37 "CanSeek": "true",
38 }
39
40 if start:
41 data["EventName"] = "unpause"
42 elif update:
43 data["EventName"] = "timeupdate"
44 elif stop:
45 data["EventName"] = "pause"
46 return data | {
47 "IsMuted": "false",
48 "IsPaused": str(init or start or stop).lower(),
49 "ItemId": str(item_id),
50 "MaxStreamingBitrate": 140000000,
51 "MediaSourceId": str(media_source_id),
52 "PlayMethod": "DirectStream",
53 "PlaySessionId": str(play_session_id),
54 "PlaybackRate": 1,
55 "PlaybackStartTimeTicks": 0,
56 "PlaylistIndex": 0,
57 "PlaylistLength": 1,
58 "PositionTicks": tick,
59 "RepeatMode": "RepeatNone",
60 "Shuffle": "false",
61 "SubtitleOffset": 0,
62 "SubtitleStreamIndex": -1,
63 "VolumeLevel": 100,
64 }
65
66 async def stream():
67 stream_url = direct_stream_url or f"/Videos/{item_id}/stream"
68 length = 0
69 last_err_time = nowdt()
70 speed_limit = 2 * 1024 * 1024 # 2 MB
71 while True:
72 async with (
73 AsyncClient(headers=headers, proxy=EMBY_PROXY, verify=False, transport=AsyncHTTPTransport(), timeout=5) as client, # noqa: S501
74 client.stream(
75 "GET",
76 f"{credentials['Server']}/emby/{stream_url}",
77 follow_redirects=True,
78 headers={"User-Agent": UA, "Accept": "*/*", "Range": f"bytes={length}-", "Icy-MetaData": "1"},
79 ) as resp,
80 ):
81 total_downloaded = 0
82 start_time = time.monotonic()
83 try:
84 async for chunk in resp.aiter_bytes(chunk_size=1024):
85 length += len(chunk)
86 total_downloaded += len(chunk)
87 del chunk
88 expected_time = total_downloaded / speed_limit
89 elapsed_time = time.monotonic() - start_time
90 sleep_time = expected_time - elapsed_time
91 if sleep_time > 0:
92 await asyncio.sleep(sleep_time)
93 await asyncio.sleep(random.random())
94 if random.random() < 0.01:
95 continue
96 except Exception as e:
97 if (nowdt() - last_err_time).total_seconds() > 5:
98 logger.debug(f"【{server_name}】《{item['Name']}》流媒体文件访问错误, 正在重试. {e}")
99 last_err_time = nowdt()
100 continue
101 finally:
102 await resp.aclose()
103
104 stream_task = asyncio.create_task(stream())
105 rt = random.uniform(5, 10)
106 logger.trace(f"【{server_name}】《{item['Name']}》播放视频前等待 {rt:.0f} 秒")
107 await asyncio.sleep(rt)
108 succ = False
109 try:
110 resp = await hx_req(
111 f"{credentials['Server']}/emby/Sessions/Playing",
112 "POST",
113 headers=headers,
114 params=build_params(credentials, params={"UserId": credentials["User"]["Id"]}, reqformat="json"),
115 json_data=get_playing_data(0),
116 transport=AsyncHTTPTransport(),
117 proxy=EMBY_PROXY,
118 verify=False,
119 rformat="content",
120 timeout=5,
121 silent=True,
122 )
123 if resp.get("hx_error"):
124 logger.error(f"【{server_name}】《{item['Name']}》无法开始播放: {resp['hx_error']}")
125 return False
126 t = play_seconds
127
128 last_report_t = t
129 progress_errors = 0
130 report_interval = 10 # seconds
131 while t > 0:
132 if progress_errors > 20:
133 logger.error(f"【{server_name}】《{item['Name']}》播放状态设定错误次数过多")
134 return False
135 if last_report_t and last_report_t - t > report_interval:
136 logger.trace(f"【{server_name}】《{item['Name']}》正在播放 (还剩 {t:.0f} 秒).")
137 last_report_t = t
138 st = min(10, t)
139 await asyncio.sleep(st)
140 t -= st
141 tick = int((play_seconds - t) * 10000000)
142 try:
143 resp = await asyncio.wait_for(
144 hx_req(
145 f"{credentials['Server']}/emby/Sessions/Playing/Progress",
146 "POST",
147 headers=headers,
148 json_data=get_playing_data(tick, update=True),
149 transport=AsyncHTTPTransport(),
150 proxy=EMBY_PROXY,
151 verify=False,
152 rformat="content",
153 timeout=5,
154 silent=True,
155 ),
156 10,
157 )
158 logger.info(f"【{server_name}】《{item['Name']}》回传成功")
159 if 200 <= int(resp.get("status_code", 0)) < 300:
160 succ = True
161 except Exception:
162 logger.debug(f"【{server_name}】《{item['Name']}》播放状态设定错误")
163 progress_errors += 1
164 await asyncio.sleep(random.uniform(1, 3))
165 finally:
166 stream_task.cancel()
167 with suppress(asyncio.CancelledError):
168 await stream_task
169
170 try:
171 final_percentage = random.uniform(0.95, 1.0)
172 final_tick = int((play_seconds * final_percentage) // 10 * 10 * 10000000)
173 await hx_req(
174 f"{credentials['Server']}/emby/Sessions/Playing/Progress",
175 "POST",
176 headers=headers,
177 json_data=get_playing_data(final_tick, stop=True),
178 transport=AsyncHTTPTransport(),
179 proxy=EMBY_PROXY,
180 verify=False,
181 timeout=5,
182 rformat="content",
183 silent=True,
184 )
185 logger.success(f"【{server_name}】《{item['Name']}》播放完成, 共 {play_seconds:.0f} 秒.")
186 except Exception as e:
187 logger.error(f"【{server_name}】《{item['Name']}》由于连接错误或服务器错误无法停止播放: {e}")
188 return succ