main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import json
5import os
6import random
7import re
8import string
9import tempfile
10from datetime import UTC, datetime
11from decimal import Decimal
12from pathlib import Path
13from typing import Any
14from zoneinfo import ZoneInfo
15
16import chardet
17import markdown
18import puremagic
19import zhconv
20from bilibili_api.utils.aid_bvid_transformer import aid2bvid, bvid2aid
21from bs4 import BeautifulSoup
22from bs4.element import PageElement
23from glom import PathAccessError, glom
24from loguru import logger
25from markitdown import MarkItDown
26from pyrogram.client import Client
27from pyrogram.types import User
28from yt_dlp.extractor import gen_extractors
29
30from config import CLEAN_OLD_FILES_OLDER_THAN_SECONDS, DOWNLOAD_DIR, TZ, cache
31
32# ruff: noqa: RUF001
33
34
35def nowdt(tz: str = "UTC") -> datetime:
36 return datetime.now(ZoneInfo(tz))
37
38
39def nowstr(tz: str = TZ) -> str:
40 now = nowdt(tz)
41 return f"{now:%Y-%m-%d %H:%M:%S}"
42
43
44def number(n: float | str | Decimal, precision: int = -1, *, sign: bool = False) -> str:
45 """Normalize a number to its simplest decimal.
46
47 Example:
48 "1.2340000" -> "1.234"
49 1.000000 -> "1"
50 """
51 n = Decimal(n)
52 if precision == -1: # auto precision (up to 8 decimal places)
53 return f"{n:.8f}".rstrip("0").rstrip(".")
54 if precision == 0:
55 return f"{n:.0f}"
56 return f"{n:+.{precision}f}" if sign else f"{n:.{precision}f}"
57
58
59def split_parts(first: int = 0, middle: int = 0, last: int = 0) -> dict:
60 """Split a list of items into three parts: first, middle, and last.
61
62 Useful for determine the number of media files in master / reply / quote posts.
63 """
64 data = {
65 "first": f"🏞P1-P{first}",
66 "middle": f"🏞P{first + 1}-P{first + middle}",
67 "last": f"🏞P{first + middle + 1}-P{first + middle + last}",
68 }
69 for k, v in data.items():
70 idx1, idx2 = (s.strip("🏞P") for s in v.split("-"))
71 if int(idx1) > int(idx2):
72 data[k] = ""
73 elif int(idx1) == int(idx2):
74 data[k] = f"🏞P{idx1}"
75 return data
76
77
78def to_int(var: str | float) -> str | int:
79 """Convert a string or float to an integer."""
80 try:
81 return int(float(var))
82 except (ValueError, TypeError):
83 return str(var)
84
85
86def read_text(path: str | Path) -> str:
87 """Read text file with any encoding."""
88 path = Path(path).expanduser().resolve()
89 if not path.is_file():
90 logger.warning(f"File not found: {path.name}")
91 return ""
92 try:
93 return path.read_text(encoding="utf-8")
94 except UnicodeDecodeError:
95 with path.open("rb") as f: # Open in binary mode for detection
96 raw_data = f.read()
97 result = chardet.detect(raw_data)
98 detected_encoding = result["encoding"]
99 if detected_encoding:
100 logger.success(f"File: `{path.name}` Encoding: {detected_encoding}")
101 return path.read_text(encoding=detected_encoding)
102 logger.warning(f"Could not detect encoding: {path.name}")
103 return ""
104
105
106def rand_string(length: int = 48) -> str:
107 return "".join(random.choices(string.ascii_letters + string.digits, k=length))
108
109
110def rand_number(length: int = 8) -> int:
111 return int("".join(random.choices(string.digits, k=length)))
112
113
114def true(value: Any) -> bool:
115 if not value:
116 return False
117 if isinstance(value, str):
118 return str(value).lower() not in {"0", "n", "na", "n/a", "no", "not", "f", "false", "off", "none", "null", "disable", "disabled"}
119 return True
120
121
122def remove_none_values(d: dict | list) -> dict:
123 """Recursively removes keys with None values from a nested dictionary.
124
125 Cleans None values from lists and processes nested structures.
126
127 Args:
128 d (dict | list): The input dict or list
129
130 Returns:
131 dict: A cleaned dictionary or list with None values removed.
132 """
133 if isinstance(d, dict): # If the input is a dictionary
134 cleaned_dict = {}
135 for key, value in d.items():
136 if isinstance(value, dict):
137 # Recursively clean nested dictionaries
138 nested_cleaned = remove_none_values(value)
139 if nested_cleaned: # Only add non-empty cleaned dict
140 cleaned_dict[key] = nested_cleaned
141 elif isinstance(value, list):
142 # Clean lists recursively
143 cleaned_list = [remove_none_values(item) if isinstance(item, dict | list) else item for item in value if item is not None]
144 if cleaned_list: # Only add non-empty cleaned lists
145 cleaned_dict[key] = cleaned_list
146 elif value is not None:
147 cleaned_dict[key] = value
148 return cleaned_dict
149 if isinstance(d, list): # If the input is a list
150 return [remove_none_values(item) if isinstance(item, dict | list) else item for item in d if item is not None] # type: ignore
151
152 return d # Return non-dict, non-list values as is
153
154
155def soup_to_text(soup: PageElement) -> str:
156 text = ""
157 if not hasattr(soup, "children"):
158 return soup.text
159 for tag in soup.children: # type: ignore
160 if tag.name == "img" and "alt" in tag.attrs:
161 text += tag["alt"]
162 elif tag.name == "br":
163 text += "\n"
164 elif hasattr(tag, "children"):
165 text += soup_to_text(tag)
166 else:
167 text += tag.text
168 return text
169
170
171def markdown_to_text(mkdown: str) -> str:
172 with contextlib.suppress(Exception):
173 html = markdown.markdown(mkdown).replace("\n", "<br>")
174 soup = BeautifulSoup(html, "html.parser")
175 return soup.get_text()
176 return mkdown
177
178
179def number_to_emoji(num: int | str, default: str | None = None) -> str:
180 """Convert a number to an emoji."""
181 num = str(num)
182 if default is None:
183 default = num
184 return {"0": "0️⃣", "1": "1️⃣", "2": "2️⃣", "3": "3️⃣", "4": "4️⃣", "5": "5️⃣", "6": "6️⃣", "7": "7️⃣", "8": "8️⃣", "9": "9️⃣", "10": "🔟"}.get(num, default)
185
186
187def seconds_to_hms(seconds: float | str) -> str:
188 """Convert seconds to hms format."""
189 seconds = int(float(seconds))
190 m, s = divmod(seconds, 60)
191 h, m = divmod(m, 60)
192 if h == 0:
193 return f"{m:02d}:{s:02d}"
194 return f"{h:02d}:{m:02d}:{s:02d}"
195
196
197def count_subtitles(texts: str) -> int:
198 """Count number of characters in texts after removing initial timestamp.
199
200 Args:
201 texts: Input string potentially containing [hh:ss] timestamps
202
203 Returns:
204 int: Character count after timestamp removal
205 """
206 cleaned_text = re.sub(r"^\[.*?\]\s?", "", texts, flags=re.MULTILINE)
207 return len(cleaned_text)
208
209
210def stringfy(d: dict) -> dict:
211 """Convert dict values to string.
212
213 Args:
214 d (dict | list): The input dict or list
215
216 Returns:
217 dict: A stringfy dictionary or list.
218 """
219 if isinstance(d, dict): # If the input is a dictionary
220 stringfy_dict = {}
221 for key, value in d.items():
222 if isinstance(value, dict | list | set):
223 stringfy_dict[key] = json.dumps(value)
224 else:
225 stringfy_dict[key] = unicode_to_ascii(value)
226 return stringfy_dict
227 return d # Return non-dict, non-list values as is
228
229
230def seconds_to_time(seconds: float) -> str:
231 """Seconds to time string.
232
233 100 -> "01:40"
234 1000 -> "16:40"
235 10000 -> "02:46:40"
236 100000 -> "27:46:40"
237 """
238 seconds = round(float(seconds))
239 m, s = divmod(seconds, 60)
240 h, m = divmod(m, 60)
241 if h:
242 return f"{h:02d}:{m:02d}:{s:02d}"
243 return f"{m:02d}:{s:02d}"
244
245
246def readable_time(seconds: str | float) -> str:
247 """Human readable time duration.
248
249 100 -> "1m40s"
250 1000 -> "16m40s"
251 10000 -> "2h46m40s"
252 100000 -> "1d3h46m40s"
253 """
254 try:
255 seconds = float(seconds)
256 except ValueError:
257 # already in reachable time
258 return str(seconds)
259 if seconds < 60:
260 return f"{seconds:.0f}s"
261 if seconds < 3600:
262 minutes, seconds = divmod(seconds, 60)
263 return f"{minutes:.0f}m{seconds:.0f}s"
264 if seconds < 86400:
265 hours, seconds = divmod(seconds, 3600)
266 minutes, seconds = divmod(seconds, 60)
267 return f"{hours:.0f}h{minutes:.0f}m{seconds:.0f}s"
268 days, seconds = divmod(seconds, 86400)
269 hours, seconds = divmod(seconds, 3600)
270 minutes, seconds = divmod(seconds, 60)
271 return f"{days:.0f}d{hours:.0f}h{minutes:.0f}m{seconds:.0f}s"
272
273
274def readable_size(num_bytes: str | float = 0, path: str | Path | None = None) -> str:
275 """Human readable file size."""
276 num_bytes = Path(path).stat().st_size if path is not None else float(num_bytes)
277 # for unit in ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"]:
278 for unit in ["B", "KB"]:
279 if abs(num_bytes) < 1024:
280 return f"{num_bytes:.1f} {unit}"
281 num_bytes /= 1024
282 return f"{num_bytes:.1f} MB"
283
284
285def readable_count(num: int | str) -> str:
286 count = to_int(num)
287 if not isinstance(count, int):
288 return str(num)
289 if count >= 100000:
290 return f"{count // 10000}万"
291 if count >= 10000:
292 m, n = divmod(count, 10000)
293 return f"{m}万" if n < 1000 else f"{m}.{n // 1000}万"
294 return str(count)
295
296
297def find_url(text: str) -> str:
298 if not isinstance(text, str):
299 return ""
300 regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
301 if matched := re.findall(regex, text):
302 url = matched[0][0]
303 logger.debug(f"URL found from message text: {url}")
304 return url
305 return ""
306
307
308def https_url(url: str) -> str:
309 return "https://" + str(url).removeprefix("https://").removeprefix("http://").lstrip("/").rstrip("/")
310
311
312def bare_url(url: str) -> str:
313 return str(url).removeprefix("https://").removeprefix("http://").lstrip("/").rstrip("/")
314
315
316def ts_to_dt(ts: str | float | None) -> datetime | None:
317 if not ts:
318 return None
319
320 try: # not number
321 ts = float(ts)
322 except ValueError:
323 return None
324
325 if 0 < float(ts) < 1:
326 return None
327 try:
328 return datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ))
329 except Exception as e:
330 if "out of range" in str(e):
331 return ts_to_dt(ts / 1000)
332 logger.error(e)
333 return None
334
335
336def slim_cid(cid: int | str) -> str:
337 return str(cid).strip().removeprefix("-100")
338
339
340def strings_list(value: str | None = None, *, env_key: str = "", separator: str = ",", shuffle: bool = False) -> list[str]:
341 """Get list from environment variable."""
342 if value is None:
343 value = os.getenv(env_key, "")
344 results = [s.strip() for s in value.split(separator) if s.strip()]
345 if shuffle:
346 random.shuffle(results)
347 return results
348
349
350def parse_time(timestr: str) -> dict[str, int]:
351 """Parse time string.
352
353 Support formats:
354 length= 4: yyyy
355 length= 6: yyyymm
356 length= 7: yyyy-mm
357 length= 8: yyyymmdd
358 length=10: yyyy-mm-dd
359 length=14: yyyymmddHHMMSS
360 length=15: yyyymmdd-HHMMSS
361 length=17: yyyymmdd HH:MM:SS
362 length=19: yyyy-mm-dd HH:MM:SS
363
364 Returns:
365 {"year": int, "month": int, "day": int, "hour": int, "minute": int, "second": int}
366 """
367 res = {"year": 0, "month": 0, "day": 0, "hour": 0, "minute": 0, "second": 0}
368 if not timestr:
369 return {}
370 if len(timestr) not in [4, 6, 7, 8, 10, 14, 15, 19]:
371 logger.warning(f"Invalid time format: {timestr}")
372 return res
373
374 # first 4 digits are year
375 res["year"] = int(timestr[:4])
376 if len(timestr) == 6: # yyyymm
377 res["month"] = int(timestr[4:6])
378 elif len(timestr) == 7: # yyyy-mm
379 res["month"] = int(timestr[5:7])
380 elif len(timestr) == 8: # yyyymmdd
381 res["month"] = int(timestr[4:6])
382 res["day"] = int(timestr[6:8])
383 elif len(timestr) == 10: # yyyy-mm-dd
384 res["month"] = int(timestr[5:7])
385 res["day"] = int(timestr[8:10])
386 elif len(timestr) == 14: # yyyymmddHHMMSS
387 res["month"] = int(timestr[4:6])
388 res["day"] = int(timestr[6:8])
389 res["hour"] = int(timestr[8:10])
390 res["minute"] = int(timestr[10:12])
391 res["second"] = int(timestr[12:14])
392 elif len(timestr) == 15: # yyyymmdd-HHMMSS
393 res["month"] = int(timestr[4:6])
394 res["day"] = int(timestr[6:8])
395 res["hour"] = int(timestr[9:11])
396 res["minute"] = int(timestr[11:13])
397 res["second"] = int(timestr[13:15])
398 elif len(timestr) == 17: # yyyymmdd HH:MM:SS
399 res["month"] = int(timestr[4:6])
400 res["day"] = int(timestr[6:8])
401 res["hour"] = int(timestr[9:11])
402 res["minute"] = int(timestr[12:14])
403 res["second"] = int(timestr[15:17])
404 elif len(timestr) == 19: # yyyy-mm-dd HH:MM:SS
405 res["month"] = int(timestr[5:7])
406 res["day"] = int(timestr[8:10])
407 res["hour"] = int(timestr[11:13])
408 res["minute"] = int(timestr[14:16])
409 res["second"] = int(timestr[17:19])
410 return res
411
412
413async def myself(client: Client) -> User:
414 """Get myself info."""
415 if cache.get("me"):
416 return cache.get("me")
417 try:
418 me = await client.get_me()
419 except Exception as e:
420 logger.error(e)
421 return User(id=1, is_bot=False)
422 cache.set("me", me, ttl=0)
423 return me
424
425
426async def i_am_bot(client: Client) -> bool:
427 """Check if this clinet is a bot or not."""
428 if cache.get("i_am_bot"):
429 return cache.get("i_am_bot")
430 try:
431 me = await myself(client)
432 except Exception as e:
433 logger.error(e)
434 return False
435 cache.set("i_am_bot", me.is_bot, ttl=0)
436 return me.is_bot
437
438
439def match_urls(text: str) -> list[str]:
440 """Match all urls in a text."""
441 res = re.findall(
442 r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))',
443 str(text),
444 )
445 return [https_url(x[0]) for x in res]
446
447
448def remove_dash(text: str) -> str:
449 if not text:
450 return ""
451 while "---" in text:
452 text = text.replace("---", "")
453 while "--" in text:
454 text = text.replace("--", "")
455 return text
456
457
458def remove_pound(text: str) -> str:
459 if not text:
460 return ""
461 while "# " in text:
462 text = text.replace("# ", " ")
463 return text
464
465
466def remove_consecutive_newlines(text: str, newline_level: int = 3) -> str:
467 if not text:
468 return ""
469 while "\n\n\n" in text:
470 text = text.replace("\n\n\n", "\n\n")
471 if newline_level == 2:
472 while "\n\n" in text:
473 text = text.replace("\n\n", "\n")
474 return text
475
476
477def is_supported_by_ytdlp(url: str) -> bool:
478 """Check if this url is supported by ytdlp."""
479 if "t.me" in url: # tg link
480 return False
481 extractors = gen_extractors()
482 extractors = [e for e in extractors if e.IE_NAME != "generic"] # filter out generic extractors
483 return any(extractor.suitable(url) for extractor in extractors)
484
485
486def guess_mime(path: str | Path) -> str:
487 path = Path(path).expanduser().resolve()
488 if not path.is_file():
489 return ""
490 with contextlib.suppress(Exception):
491 import magic # magic needs `libmagic` to be installed.
492
493 # `sudo apt-get install libmagic1` or `brew install libmagic`
494 return magic.from_file(path, mime=True)
495
496 # infer from `magic` failed
497 with contextlib.suppress(Exception):
498 return puremagic.from_file(path, mime=True)
499 return ""
500
501
502def unicode_to_ascii(text: str | float) -> str:
503 if not text:
504 return ""
505 return str(text).encode("unicode_escape").decode("ascii")
506
507
508def ascii_to_unicode(text: str) -> str:
509 if not text:
510 return ""
511 return bytes(str(text), "ascii").decode("unicode_escape")
512
513
514def save_txt(text: str, path: Path | str | None = None) -> str:
515 if path is None:
516 path = Path(DOWNLOAD_DIR) / f"{rand_string()}.txt"
517 Path(path).write_text(text)
518 return Path(path).as_posix()
519
520
521def check_data(text: str, check_keys: list[str] | None = None, check_kv: dict | None = None):
522 """Check if data contains required keys and key-value pairs.
523
524 Example data:
525 {
526 "foo": "bar",
527 "baz": {
528 "qux": "quux"
529 },
530 "lst": ["1", "2", "3"]
531 }
532
533 check_keys: ["foo", "baz.qux", "lst"]
534 check_kv: {"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]}
535 """
536 if not check_keys and not check_kv: # no need to check
537 return
538 try:
539 data = json.loads(text)
540 except json.JSONDecodeError:
541 logger.error(f"Failed to parse data as json: {text}")
542 raise
543
544 # ["foo", "baz.qux", "lst"]
545 if check_keys:
546 for key in check_keys:
547 try:
548 glom(data, key)
549 except PathAccessError as e:
550 logger.error(e)
551 raise
552
553 # {"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]}
554 if check_kv:
555 for key, required_value in check_kv.items():
556 try:
557 value = glom(data, key)
558 except PathAccessError as e:
559 logger.error(e)
560 raise
561
562 if str(value) != str(required_value): # convert to str to compare
563 msg = f"{data=}, {key=}, {value=}, but required: {required_value}"
564 logger.error(msg)
565 raise ValueError
566
567
568def cleanup_old_files(root: Path | str | None = None, duration: int = CLEAN_OLD_FILES_OLDER_THAN_SECONDS) -> None:
569 """Clean up files older than duration seconds."""
570 if root is None:
571 root = DOWNLOAD_DIR
572 root = Path(root).expanduser().resolve()
573 if not root.is_dir():
574 return
575 now = datetime.now(UTC).timestamp()
576 for path in root.glob("*"):
577 if not path.is_file():
578 continue
579 if all(now - x > duration for x in [path.stat().st_atime, path.stat().st_ctime, path.stat().st_mtime]):
580 logger.warning(f"Deleting old file: {path}")
581 path.unlink(missing_ok=True)
582
583
584def convert_md(path: str | Path | None = None, html: str | None = None) -> str:
585 """Convert to markdown format."""
586 md = MarkItDown()
587 if path is not None:
588 path = Path(path).expanduser().resolve()
589 if not path.is_file():
590 return ""
591 result = md.convert(path)
592 return result.text_content
593 if html is not None:
594 with tempfile.NamedTemporaryFile("w", suffix=".html", delete=False) as f:
595 f.write(html)
596 result = md.convert(f.name)
597 Path(f.name).unlink(missing_ok=True)
598 return result.text_content
599 return ""
600
601
602def convert_html(texts: str = "", path: str | Path | None = None) -> str:
603 """Convert to markdown format."""
604 if path is not None:
605 path = Path(path).expanduser().resolve()
606 if not path.is_file():
607 return ""
608 texts = read_text(path)
609 texts = markdown.markdown(texts)
610 return texts.replace("\n", "<br>")
611
612
613def av2bv(aid: int | str) -> str:
614 """Bilibili AV -> BV ID converter."""
615 aid = str(aid)
616 if aid[:3].upper() == "BV1": # BV1Y4UHYyE2z
617 return aid
618 aid = int(aid[2:]) if aid[:2].lower() == "av" else int(aid)
619 return aid2bvid(aid)
620
621
622def bv2av(bvid: str | int) -> int:
623 """Bilibili BV -> AV ID converter."""
624 bvid = str(bvid)
625 if bvid[:2].lower() == "av": # av113503016851915
626 return int(bvid[2:])
627 if bvid.isdigit(): # 113503016851915
628 return int(bvid)
629 assert bvid[:3].upper() == "BV1"
630 return bvid2aid(bvid)
631
632
633def zhcn(text: str) -> str:
634 """Convert zh-tw to zh-cn."""
635 return zhconv.convert_for_mw(text, locale="zh-cn")
636
637
638if __name__ == "__main__":
639 print(rand_string())
640 print(rand_number())
641 # print(cleanup_old_files())
642 print(readable_size(0))
643 print(readable_size(2000 * 1024 * 1024))
644 print(unicode_to_ascii("你好"))
645 print(unicode_to_ascii(1.1))
646 print(unicode_to_ascii("test"))
647 print(ascii_to_unicode("1.1"))
648 print(ascii_to_unicode("test"))
649 print(match_urls("http://a.com/BmT8gZ 匹配不到就删除了https://b.com/MxRdMO"))
650 print(is_supported_by_ytdlp("https://www.bilibili.com/video/BV15n61YtEmk"))
651 print(is_supported_by_ytdlp("https://t.me/c/1744444199/2475260"))
652 print(is_supported_by_ytdlp("https://test.com/"))
653 print(find_url("https://test.com/"))
654 print(find_url("test.com/"))
655
656 # assert av2bv("av113503016851915") == "BV1Y4UHYyE2z"
657 # assert av2bv("113503016851915") == "BV1Y4UHYyE2z"
658 # assert av2bv(113503016851915) == "BV1Y4UHYyE2z"
659 # assert av2bv("BV1Y4UHYyE2z") == "BV1Y4UHYyE2z"
660 # assert bv2av("BV1Y4UHYyE2z") == 113503016851915
661 # assert bv2av("113503016851915") == 113503016851915
662 # assert bv2av("av113503016851915") == 113503016851915
663 # assert bv2av(113503016851915) == 113503016851915