main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import random
  5import string
  6import time
  7from contextlib import suppress
  8
  9from glom import glom
 10from httpx import AsyncClient, AsyncHTTPTransport
 11from loguru import logger
 12
 13from emby.api import build_params, default_headers, get_item, get_playback_info, get_similar
 14from emby.constant import EMBY_PROXY, UA
 15from networking import hx_req
 16from utils import nowdt
 17
 18
 19async def play_item(credentials: dict, server_name: str, item: dict, play_seconds: float = 10) -> bool:
 20    item_id = item["Id"]
 21    await get_item(credentials, item_id)
 22    await get_similar(credentials, item_id)
 23
 24    playback_info = await get_playback_info(credentials, item_id)
 25    if playback_info.get("hx_error"):
 26        return False
 27
 28    play_session_id = playback_info["PlaySessionId"]
 29    random_id = "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(32))
 30    media_source_id = glom(playback_info, "MediaSources.0.Id", default=random_id)
 31    direct_stream_url = glom(playback_info, "MediaSources.0.DirectStreamUrl", default=None)
 32    headers = default_headers(credentials) | {"accept": "application/json, text/plain, */*", "content-type": "application/json", "accept-encoding": "gzip, deflate", "accept-language": "en-US,*"}
 33
 34    def get_playing_data(tick: int, *, init: bool = False, start: bool = False, update: bool = False, stop: bool = False) -> dict:
 35        data = {
 36            "AudioStreamIndex": -1,
 37            "CanSeek": "true",
 38        }
 39
 40        if start:
 41            data["EventName"] = "unpause"
 42        elif update:
 43            data["EventName"] = "timeupdate"
 44        elif stop:
 45            data["EventName"] = "pause"
 46        return data | {
 47            "IsMuted": "false",
 48            "IsPaused": str(init or start or stop).lower(),
 49            "ItemId": str(item_id),
 50            "MaxStreamingBitrate": 140000000,
 51            "MediaSourceId": str(media_source_id),
 52            "PlayMethod": "DirectStream",
 53            "PlaySessionId": str(play_session_id),
 54            "PlaybackRate": 1,
 55            "PlaybackStartTimeTicks": 0,
 56            "PlaylistIndex": 0,
 57            "PlaylistLength": 1,
 58            "PositionTicks": tick,
 59            "RepeatMode": "RepeatNone",
 60            "Shuffle": "false",
 61            "SubtitleOffset": 0,
 62            "SubtitleStreamIndex": -1,
 63            "VolumeLevel": 100,
 64        }
 65
 66    async def stream():
 67        stream_url = direct_stream_url or f"/Videos/{item_id}/stream"
 68        length = 0
 69        last_err_time = nowdt()
 70        speed_limit = 2 * 1024 * 1024  # 2 MB
 71        while True:
 72            async with (
 73                AsyncClient(headers=headers, proxy=EMBY_PROXY, verify=False, transport=AsyncHTTPTransport(), timeout=5) as client,  # noqa: S501
 74                client.stream(
 75                    "GET",
 76                    f"{credentials['Server']}/emby/{stream_url}",
 77                    follow_redirects=True,
 78                    headers={"User-Agent": UA, "Accept": "*/*", "Range": f"bytes={length}-", "Icy-MetaData": "1"},
 79                ) as resp,
 80            ):
 81                total_downloaded = 0
 82                start_time = time.monotonic()
 83                try:
 84                    async for chunk in resp.aiter_bytes(chunk_size=1024):
 85                        length += len(chunk)
 86                        total_downloaded += len(chunk)
 87                        del chunk
 88                        expected_time = total_downloaded / speed_limit
 89                        elapsed_time = time.monotonic() - start_time
 90                        sleep_time = expected_time - elapsed_time
 91                        if sleep_time > 0:
 92                            await asyncio.sleep(sleep_time)
 93                        await asyncio.sleep(random.random())
 94                        if random.random() < 0.01:
 95                            continue
 96                except Exception as e:
 97                    if (nowdt() - last_err_time).total_seconds() > 5:
 98                        logger.debug(f"{server_name}】《{item['Name']}》流媒体文件访问错误, 正在重试. {e}")
 99                        last_err_time = nowdt()
100                        continue
101                finally:
102                    await resp.aclose()
103
104    stream_task = asyncio.create_task(stream())
105    rt = random.uniform(5, 10)
106    logger.trace(f"{server_name}】《{item['Name']}》播放视频前等待 {rt:.0f}")
107    await asyncio.sleep(rt)
108    succ = False
109    try:
110        resp = await hx_req(
111            f"{credentials['Server']}/emby/Sessions/Playing",
112            "POST",
113            headers=headers,
114            params=build_params(credentials, params={"UserId": credentials["User"]["Id"]}, reqformat="json"),
115            json_data=get_playing_data(0),
116            transport=AsyncHTTPTransport(),
117            proxy=EMBY_PROXY,
118            verify=False,
119            rformat="content",
120            timeout=5,
121            silent=True,
122        )
123        if resp.get("hx_error"):
124            logger.error(f"{server_name}】《{item['Name']}》无法开始播放: {resp['hx_error']}")
125            return False
126        t = play_seconds
127
128        last_report_t = t
129        progress_errors = 0
130        report_interval = 10  # seconds
131        while t > 0:
132            if progress_errors > 20:
133                logger.error(f"{server_name}】《{item['Name']}》播放状态设定错误次数过多")
134                return False
135            if last_report_t and last_report_t - t > report_interval:
136                logger.trace(f"{server_name}】《{item['Name']}》正在播放 (还剩 {t:.0f} 秒).")
137                last_report_t = t
138            st = min(10, t)
139            await asyncio.sleep(st)
140            t -= st
141            tick = int((play_seconds - t) * 10000000)
142            try:
143                resp = await asyncio.wait_for(
144                    hx_req(
145                        f"{credentials['Server']}/emby/Sessions/Playing/Progress",
146                        "POST",
147                        headers=headers,
148                        json_data=get_playing_data(tick, update=True),
149                        transport=AsyncHTTPTransport(),
150                        proxy=EMBY_PROXY,
151                        verify=False,
152                        rformat="content",
153                        timeout=5,
154                        silent=True,
155                    ),
156                    10,
157                )
158                logger.info(f"{server_name}】《{item['Name']}》回传成功")
159                if 200 <= int(resp.get("status_code", 0)) < 300:
160                    succ = True
161            except Exception:
162                logger.debug(f"{server_name}】《{item['Name']}》播放状态设定错误")
163                progress_errors += 1
164        await asyncio.sleep(random.uniform(1, 3))
165    finally:
166        stream_task.cancel()
167        with suppress(asyncio.CancelledError):
168            await stream_task
169
170    try:
171        final_percentage = random.uniform(0.95, 1.0)
172        final_tick = int((play_seconds * final_percentage) // 10 * 10 * 10000000)
173        await hx_req(
174            f"{credentials['Server']}/emby/Sessions/Playing/Progress",
175            "POST",
176            headers=headers,
177            json_data=get_playing_data(final_tick, stop=True),
178            transport=AsyncHTTPTransport(),
179            proxy=EMBY_PROXY,
180            verify=False,
181            timeout=5,
182            rformat="content",
183            silent=True,
184        )
185        logger.success(f"{server_name}】《{item['Name']}》播放完成, 共 {play_seconds:.0f} 秒.")
186    except Exception as e:
187        logger.error(f"{server_name}】《{item['Name']}》由于连接错误或服务器错误无法停止播放: {e}")
188    return succ