main
 1#!/usr/bin/env python
 2# -*- coding: utf-8 -*-
 3import asyncio
 4import contextlib
 5import json
 6import random
 7from pathlib import Path
 8
 9from glom import glom
10from loguru import logger
11
12from config import DOWNLOAD_DIR, cache
13from database.r2 import list_cf_r2, set_cf_r2
14from emby.account import all_accounts, emby_login
15from emby.api import get_items, get_user, remove_from_resume
16from emby.constant import EMBY_KEEPALIVE_SECONDS
17from emby.play import play_item
18from utils import strings_list, to_int
19
20
21async def keepalive_emby():
22    if cache.get("keepalive_emby"):
23        return
24    cache.set("keepalive_emby", "1", 4 * 3600)  # every 4 hours
25    accounts = await all_accounts()
26    r2 = await list_cf_r2("TTL/")
27    keys = glom(r2, "Contents.*.Key", default=[]) or []
28    for acc_name, config in accounts.items():
29        keepalive_day = to_int(config.get("keepalive", 0))
30        if not isinstance(keepalive_day, int) or keepalive_day <= 0:
31            continue
32        if next((x for x in keys if x.endswith(f"/emby/{acc_name}")), None):
33            continue
34        with contextlib.suppress(Exception):
35            await keepalive_single(acc_name, config)
36
37
38async def keepalive_single(acc_name: str, config: dict):
39    credentials = await emby_login(acc_name)
40    if not credentials:
41        return
42    refresh_credentials(acc_name, config, credentials)
43    user = await get_user(credentials)
44    if user.get("hx_error"):
45        logger.warning(f"{acc_name}】服务器错误: {user.get('hx_error')}")
46        return
47    items = await get_items(credentials)
48    if not items:
49        logger.warning(f"{acc_name}】保活失败: 无法获取首页中的视频项目")
50        return
51    duration = [int(x) for x in strings_list(EMBY_KEEPALIVE_SECONDS)]
52    need_seconds = random.uniform(*duration)
53    logger.debug(f"{acc_name}】成功获取{len(items)}个首页视频项目, 共需播放 {need_seconds:.0f}")
54
55    played_seconds = 0
56    num_played = 0
57    failed_items = []
58
59    while True:
60        if played_seconds >= need_seconds - 1:
61            break
62        shuffled_items = list(items.items())
63        random.shuffle(shuffled_items)
64
65        for item_id, item in shuffled_items:
66            if item_id in failed_items:
67                continue
68            total_seconds = item["RunTimeTicks"] / 10000000
69            play_seconds = total_seconds if need_seconds - played_seconds > total_seconds else max(need_seconds - played_seconds, 10)
70            logger.trace(f"{acc_name}】开始播放《{item.get('Name', '(未命名视频)')}》({play_seconds:.0f} 秒).")
71            succ = await play_item(credentials, acc_name, item, play_seconds)
72            await asyncio.sleep(random.uniform(5, 10))
73            await remove_from_resume(credentials, item_id)
74            if not succ:
75                failed_items.append(item_id)
76                continue
77            num_played += 1
78            played_seconds += play_seconds
79            if played_seconds >= need_seconds - 1:
80                logger.success(f"{acc_name}】保活成功, 共播放 {num_played} 个视频.")
81                keepalive_hour = int(config.get("keepalive", 0)) * 24 + random.randint(-24, 24)
82                await set_cf_r2(f"TTL/{max(1, keepalive_hour)}h/emby/{acc_name}", "1")
83                break
84            logger.debug(f"{acc_name}】还需播放 {need_seconds - played_seconds:.0f} 秒.")
85            rt = random.uniform(5, 15)
86            logger.debug(f"{acc_name}】等待 {rt:.0f} 秒后播放下一个.")
87            await asyncio.sleep(rt)
88        if len(failed_items) == len(items):
89            logger.warning(f"{acc_name}】所有视频均播放失败, 保活失败. ")
90            return
91
92
93def refresh_credentials(acc_name: str, acc_config: dict, credentials: dict):
94    if acc_config.get("server") != credentials.get("Server"):
95        logger.warning(f"{acc_name}】服务器地址已更改, 更新本地凭据.")
96        credentials["Server"] = acc_config["server"]
97        save_path = Path(DOWNLOAD_DIR) / "emby" / f"{acc_name}.json"
98        save_path.parent.mkdir(parents=True, exist_ok=True)
99        save_path.write_text(json.dumps(credentials, ensure_ascii=False, indent=2, sort_keys=True))