main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import contextlib
5import json
6import random
7from pathlib import Path
8
9from glom import glom
10from loguru import logger
11
12from config import DOWNLOAD_DIR, cache
13from database.r2 import list_cf_r2, set_cf_r2
14from emby.account import all_accounts, emby_login
15from emby.api import get_items, get_user, remove_from_resume
16from emby.constant import EMBY_KEEPALIVE_SECONDS
17from emby.play import play_item
18from utils import strings_list, to_int
19
20
21async def keepalive_emby():
22 if cache.get("keepalive_emby"):
23 return
24 cache.set("keepalive_emby", "1", 4 * 3600) # every 4 hours
25 accounts = await all_accounts()
26 r2 = await list_cf_r2("TTL/")
27 keys = glom(r2, "Contents.*.Key", default=[]) or []
28 for acc_name, config in accounts.items():
29 keepalive_day = to_int(config.get("keepalive", 0))
30 if not isinstance(keepalive_day, int) or keepalive_day <= 0:
31 continue
32 if next((x for x in keys if x.endswith(f"/emby/{acc_name}")), None):
33 continue
34 with contextlib.suppress(Exception):
35 await keepalive_single(acc_name, config)
36
37
38async def keepalive_single(acc_name: str, config: dict):
39 credentials = await emby_login(acc_name)
40 if not credentials:
41 return
42 refresh_credentials(acc_name, config, credentials)
43 user = await get_user(credentials)
44 if user.get("hx_error"):
45 logger.warning(f"【{acc_name}】服务器错误: {user.get('hx_error')}")
46 return
47 items = await get_items(credentials)
48 if not items:
49 logger.warning(f"【{acc_name}】保活失败: 无法获取首页中的视频项目")
50 return
51 duration = [int(x) for x in strings_list(EMBY_KEEPALIVE_SECONDS)]
52 need_seconds = random.uniform(*duration)
53 logger.debug(f"【{acc_name}】成功获取{len(items)}个首页视频项目, 共需播放 {need_seconds:.0f} 秒")
54
55 played_seconds = 0
56 num_played = 0
57 failed_items = []
58
59 while True:
60 if played_seconds >= need_seconds - 1:
61 break
62 shuffled_items = list(items.items())
63 random.shuffle(shuffled_items)
64
65 for item_id, item in shuffled_items:
66 if item_id in failed_items:
67 continue
68 total_seconds = item["RunTimeTicks"] / 10000000
69 play_seconds = total_seconds if need_seconds - played_seconds > total_seconds else max(need_seconds - played_seconds, 10)
70 logger.trace(f"【{acc_name}】开始播放《{item.get('Name', '(未命名视频)')}》({play_seconds:.0f} 秒).")
71 succ = await play_item(credentials, acc_name, item, play_seconds)
72 await asyncio.sleep(random.uniform(5, 10))
73 await remove_from_resume(credentials, item_id)
74 if not succ:
75 failed_items.append(item_id)
76 continue
77 num_played += 1
78 played_seconds += play_seconds
79 if played_seconds >= need_seconds - 1:
80 logger.success(f"【{acc_name}】保活成功, 共播放 {num_played} 个视频.")
81 keepalive_hour = int(config.get("keepalive", 0)) * 24 + random.randint(-24, 24)
82 await set_cf_r2(f"TTL/{max(1, keepalive_hour)}h/emby/{acc_name}", "1")
83 break
84 logger.debug(f"【{acc_name}】还需播放 {need_seconds - played_seconds:.0f} 秒.")
85 rt = random.uniform(5, 15)
86 logger.debug(f"【{acc_name}】等待 {rt:.0f} 秒后播放下一个.")
87 await asyncio.sleep(rt)
88 if len(failed_items) == len(items):
89 logger.warning(f"【{acc_name}】所有视频均播放失败, 保活失败. ")
90 return
91
92
93def refresh_credentials(acc_name: str, acc_config: dict, credentials: dict):
94 if acc_config.get("server") != credentials.get("Server"):
95 logger.warning(f"【{acc_name}】服务器地址已更改, 更新本地凭据.")
96 credentials["Server"] = acc_config["server"]
97 save_path = Path(DOWNLOAD_DIR) / "emby" / f"{acc_name}.json"
98 save_path.parent.mkdir(parents=True, exist_ok=True)
99 save_path.write_text(json.dumps(credentials, ensure_ascii=False, indent=2, sort_keys=True))