main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import json
  5import os
  6import random
  7import re
  8import string
  9import tempfile
 10from datetime import UTC, datetime
 11from decimal import Decimal
 12from pathlib import Path
 13from typing import Any
 14from zoneinfo import ZoneInfo
 15
 16import chardet
 17import markdown
 18import puremagic
 19import zhconv
 20from bilibili_api.utils.aid_bvid_transformer import aid2bvid, bvid2aid
 21from bs4 import BeautifulSoup
 22from bs4.element import PageElement
 23from glom import PathAccessError, glom
 24from loguru import logger
 25from markitdown import MarkItDown
 26from pyrogram.client import Client
 27from pyrogram.types import User
 28from yt_dlp.extractor import gen_extractors
 29
 30from config import CLEAN_OLD_FILES_OLDER_THAN_SECONDS, DOWNLOAD_DIR, TZ, cache
 31
 32# ruff: noqa: RUF001
 33
 34
 35def nowdt(tz: str = "UTC") -> datetime:
 36    return datetime.now(ZoneInfo(tz))
 37
 38
 39def nowstr(tz: str = TZ) -> str:
 40    now = nowdt(tz)
 41    return f"{now:%Y-%m-%d %H:%M:%S}"
 42
 43
 44def number(n: float | str | Decimal, precision: int = -1, *, sign: bool = False) -> str:
 45    """Normalize a number to its simplest decimal.
 46
 47    Example:
 48    "1.2340000" -> "1.234"
 49    1.000000 -> "1"
 50    """
 51    n = Decimal(n)
 52    if precision == -1:  # auto precision (up to 8 decimal places)
 53        return f"{n:.8f}".rstrip("0").rstrip(".")
 54    if precision == 0:
 55        return f"{n:.0f}"
 56    return f"{n:+.{precision}f}" if sign else f"{n:.{precision}f}"
 57
 58
 59def split_parts(first: int = 0, middle: int = 0, last: int = 0) -> dict:
 60    """Split a list of items into three parts: first, middle, and last.
 61
 62    Useful for determine the number of media files in master / reply / quote posts.
 63    """
 64    data = {
 65        "first": f"🏞P1-P{first}",
 66        "middle": f"🏞P{first + 1}-P{first + middle}",
 67        "last": f"🏞P{first + middle + 1}-P{first + middle + last}",
 68    }
 69    for k, v in data.items():
 70        idx1, idx2 = (s.strip("🏞P") for s in v.split("-"))
 71        if int(idx1) > int(idx2):
 72            data[k] = ""
 73        elif int(idx1) == int(idx2):
 74            data[k] = f"🏞P{idx1}"
 75    return data
 76
 77
 78def to_int(var: str | float) -> str | int:
 79    """Convert a string or float to an integer."""
 80    try:
 81        return int(float(var))
 82    except (ValueError, TypeError):
 83        return str(var)
 84
 85
 86def read_text(path: str | Path) -> str:
 87    """Read text file with any encoding."""
 88    path = Path(path).expanduser().resolve()
 89    if not path.is_file():
 90        logger.warning(f"File not found: {path.name}")
 91        return ""
 92    try:
 93        return path.read_text(encoding="utf-8")
 94    except UnicodeDecodeError:
 95        with path.open("rb") as f:  # Open in binary mode for detection
 96            raw_data = f.read()
 97            result = chardet.detect(raw_data)
 98            detected_encoding = result["encoding"]
 99            if detected_encoding:
100                logger.success(f"File: `{path.name}` Encoding: {detected_encoding}")
101                return path.read_text(encoding=detected_encoding)
102    logger.warning(f"Could not detect encoding: {path.name}")
103    return ""
104
105
106def rand_string(length: int = 48) -> str:
107    return "".join(random.choices(string.ascii_letters + string.digits, k=length))
108
109
110def rand_number(length: int = 8) -> int:
111    return int("".join(random.choices(string.digits, k=length)))
112
113
114def true(value: Any) -> bool:
115    if not value:
116        return False
117    if isinstance(value, str):
118        return str(value).lower() not in {"0", "n", "na", "n/a", "no", "not", "f", "false", "off", "none", "null", "disable", "disabled"}
119    return True
120
121
122def remove_none_values(d: dict | list) -> dict:
123    """Recursively removes keys with None values from a nested dictionary.
124
125    Cleans None values from lists and processes nested structures.
126
127    Args:
128        d (dict | list): The input dict or list
129
130    Returns:
131        dict: A cleaned dictionary or list with None values removed.
132    """
133    if isinstance(d, dict):  # If the input is a dictionary
134        cleaned_dict = {}
135        for key, value in d.items():
136            if isinstance(value, dict):
137                # Recursively clean nested dictionaries
138                nested_cleaned = remove_none_values(value)
139                if nested_cleaned:  # Only add non-empty cleaned dict
140                    cleaned_dict[key] = nested_cleaned
141            elif isinstance(value, list):
142                # Clean lists recursively
143                cleaned_list = [remove_none_values(item) if isinstance(item, dict | list) else item for item in value if item is not None]
144                if cleaned_list:  # Only add non-empty cleaned lists
145                    cleaned_dict[key] = cleaned_list
146            elif value is not None:
147                cleaned_dict[key] = value
148        return cleaned_dict
149    if isinstance(d, list):  # If the input is a list
150        return [remove_none_values(item) if isinstance(item, dict | list) else item for item in d if item is not None]  # type: ignore
151
152    return d  # Return non-dict, non-list values as is
153
154
155def soup_to_text(soup: PageElement) -> str:
156    text = ""
157    if not hasattr(soup, "children"):
158        return soup.text
159    for tag in soup.children:  # type: ignore
160        if tag.name == "img" and "alt" in tag.attrs:
161            text += tag["alt"]
162        elif tag.name == "br":
163            text += "\n"
164        elif hasattr(tag, "children"):
165            text += soup_to_text(tag)
166        else:
167            text += tag.text
168    return text
169
170
171def markdown_to_text(mkdown: str) -> str:
172    with contextlib.suppress(Exception):
173        html = markdown.markdown(mkdown).replace("\n", "<br>")
174        soup = BeautifulSoup(html, "html.parser")
175        return soup.get_text()
176    return mkdown
177
178
179def number_to_emoji(num: int | str, default: str | None = None) -> str:
180    """Convert a number to an emoji."""
181    num = str(num)
182    if default is None:
183        default = num
184    return {"0": "0️⃣", "1": "1️⃣", "2": "2️⃣", "3": "3️⃣", "4": "4️⃣", "5": "5️⃣", "6": "6️⃣", "7": "7️⃣", "8": "8️⃣", "9": "9️⃣", "10": "🔟"}.get(num, default)
185
186
187def seconds_to_hms(seconds: float | str) -> str:
188    """Convert seconds to hms format."""
189    seconds = int(float(seconds))
190    m, s = divmod(seconds, 60)
191    h, m = divmod(m, 60)
192    if h == 0:
193        return f"{m:02d}:{s:02d}"
194    return f"{h:02d}:{m:02d}:{s:02d}"
195
196
197def count_subtitles(texts: str) -> int:
198    """Count number of characters in texts after removing initial timestamp.
199
200    Args:
201        texts: Input string potentially containing [hh:ss] timestamps
202
203    Returns:
204        int: Character count after timestamp removal
205    """
206    cleaned_text = re.sub(r"^\[.*?\]\s?", "", texts, flags=re.MULTILINE)
207    return len(cleaned_text)
208
209
210def stringfy(d: dict) -> dict:
211    """Convert dict values to string.
212
213    Args:
214        d (dict | list): The input dict or list
215
216    Returns:
217        dict: A stringfy dictionary or list.
218    """
219    if isinstance(d, dict):  # If the input is a dictionary
220        stringfy_dict = {}
221        for key, value in d.items():
222            if isinstance(value, dict | list | set):
223                stringfy_dict[key] = json.dumps(value)
224            else:
225                stringfy_dict[key] = unicode_to_ascii(value)
226        return stringfy_dict
227    return d  # Return non-dict, non-list values as is
228
229
230def seconds_to_time(seconds: float) -> str:
231    """Seconds to time string.
232
233    100 -> "01:40"
234    1000 -> "16:40"
235    10000 -> "02:46:40"
236    100000 -> "27:46:40"
237    """
238    seconds = round(float(seconds))
239    m, s = divmod(seconds, 60)
240    h, m = divmod(m, 60)
241    if h:
242        return f"{h:02d}:{m:02d}:{s:02d}"
243    return f"{m:02d}:{s:02d}"
244
245
246def readable_time(seconds: str | float) -> str:
247    """Human readable time duration.
248
249    100 -> "1m40s"
250    1000 -> "16m40s"
251    10000 -> "2h46m40s"
252    100000 -> "1d3h46m40s"
253    """
254    try:
255        seconds = float(seconds)
256    except ValueError:
257        # already in reachable time
258        return str(seconds)
259    if seconds < 60:
260        return f"{seconds:.0f}s"
261    if seconds < 3600:
262        minutes, seconds = divmod(seconds, 60)
263        return f"{minutes:.0f}m{seconds:.0f}s"
264    if seconds < 86400:
265        hours, seconds = divmod(seconds, 3600)
266        minutes, seconds = divmod(seconds, 60)
267        return f"{hours:.0f}h{minutes:.0f}m{seconds:.0f}s"
268    days, seconds = divmod(seconds, 86400)
269    hours, seconds = divmod(seconds, 3600)
270    minutes, seconds = divmod(seconds, 60)
271    return f"{days:.0f}d{hours:.0f}h{minutes:.0f}m{seconds:.0f}s"
272
273
274def readable_size(num_bytes: str | float = 0, path: str | Path | None = None) -> str:
275    """Human readable file size."""
276    num_bytes = Path(path).stat().st_size if path is not None else float(num_bytes)
277    # for unit in ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"]:
278    for unit in ["B", "KB"]:
279        if abs(num_bytes) < 1024:
280            return f"{num_bytes:.1f} {unit}"
281        num_bytes /= 1024
282    return f"{num_bytes:.1f} MB"
283
284
285def readable_count(num: int | str) -> str:
286    count = to_int(num)
287    if not isinstance(count, int):
288        return str(num)
289    if count >= 100000:
290        return f"{count // 10000}"
291    if count >= 10000:
292        m, n = divmod(count, 10000)
293        return f"{m}" if n < 1000 else f"{m}.{n // 1000}"
294    return str(count)
295
296
297def find_url(text: str) -> str:
298    if not isinstance(text, str):
299        return ""
300    regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
301    if matched := re.findall(regex, text):
302        url = matched[0][0]
303        logger.debug(f"URL found from message text: {url}")
304        return url
305    return ""
306
307
308def https_url(url: str) -> str:
309    return "https://" + str(url).removeprefix("https://").removeprefix("http://").lstrip("/").rstrip("/")
310
311
312def bare_url(url: str) -> str:
313    return str(url).removeprefix("https://").removeprefix("http://").lstrip("/").rstrip("/")
314
315
316def ts_to_dt(ts: str | float | None) -> datetime | None:
317    if not ts:
318        return None
319
320    try:  # not number
321        ts = float(ts)
322    except ValueError:
323        return None
324
325    if 0 < float(ts) < 1:
326        return None
327    try:
328        return datetime.fromtimestamp(ts, tz=UTC).astimezone(ZoneInfo(TZ))
329    except Exception as e:
330        if "out of range" in str(e):
331            return ts_to_dt(ts / 1000)
332        logger.error(e)
333        return None
334
335
336def slim_cid(cid: int | str) -> str:
337    return str(cid).strip().removeprefix("-100")
338
339
340def strings_list(value: str | None = None, *, env_key: str = "", separator: str = ",", shuffle: bool = False) -> list[str]:
341    """Get list from environment variable."""
342    if value is None:
343        value = os.getenv(env_key, "")
344    results = [s.strip() for s in value.split(separator) if s.strip()]
345    if shuffle:
346        random.shuffle(results)
347    return results
348
349
350def parse_time(timestr: str) -> dict[str, int]:
351    """Parse time string.
352
353    Support formats:
354        length= 4: yyyy
355        length= 6: yyyymm
356        length= 7: yyyy-mm
357        length= 8: yyyymmdd
358        length=10: yyyy-mm-dd
359        length=14: yyyymmddHHMMSS
360        length=15: yyyymmdd-HHMMSS
361        length=17: yyyymmdd HH:MM:SS
362        length=19: yyyy-mm-dd HH:MM:SS
363
364    Returns:
365    {"year": int, "month": int, "day": int, "hour": int, "minute": int, "second": int}
366    """
367    res = {"year": 0, "month": 0, "day": 0, "hour": 0, "minute": 0, "second": 0}
368    if not timestr:
369        return {}
370    if len(timestr) not in [4, 6, 7, 8, 10, 14, 15, 19]:
371        logger.warning(f"Invalid time format: {timestr}")
372        return res
373
374    # first 4 digits are year
375    res["year"] = int(timestr[:4])
376    if len(timestr) == 6:  # yyyymm
377        res["month"] = int(timestr[4:6])
378    elif len(timestr) == 7:  # yyyy-mm
379        res["month"] = int(timestr[5:7])
380    elif len(timestr) == 8:  # yyyymmdd
381        res["month"] = int(timestr[4:6])
382        res["day"] = int(timestr[6:8])
383    elif len(timestr) == 10:  # yyyy-mm-dd
384        res["month"] = int(timestr[5:7])
385        res["day"] = int(timestr[8:10])
386    elif len(timestr) == 14:  # yyyymmddHHMMSS
387        res["month"] = int(timestr[4:6])
388        res["day"] = int(timestr[6:8])
389        res["hour"] = int(timestr[8:10])
390        res["minute"] = int(timestr[10:12])
391        res["second"] = int(timestr[12:14])
392    elif len(timestr) == 15:  # yyyymmdd-HHMMSS
393        res["month"] = int(timestr[4:6])
394        res["day"] = int(timestr[6:8])
395        res["hour"] = int(timestr[9:11])
396        res["minute"] = int(timestr[11:13])
397        res["second"] = int(timestr[13:15])
398    elif len(timestr) == 17:  # yyyymmdd HH:MM:SS
399        res["month"] = int(timestr[4:6])
400        res["day"] = int(timestr[6:8])
401        res["hour"] = int(timestr[9:11])
402        res["minute"] = int(timestr[12:14])
403        res["second"] = int(timestr[15:17])
404    elif len(timestr) == 19:  # yyyy-mm-dd HH:MM:SS
405        res["month"] = int(timestr[5:7])
406        res["day"] = int(timestr[8:10])
407        res["hour"] = int(timestr[11:13])
408        res["minute"] = int(timestr[14:16])
409        res["second"] = int(timestr[17:19])
410    return res
411
412
413async def myself(client: Client) -> User:
414    """Get myself info."""
415    if cache.get("me"):
416        return cache.get("me")
417    try:
418        me = await client.get_me()
419    except Exception as e:
420        logger.error(e)
421        return User(id=1, is_bot=False)
422    cache.set("me", me, ttl=0)
423    return me
424
425
426async def i_am_bot(client: Client) -> bool:
427    """Check if this clinet is a bot or not."""
428    if cache.get("i_am_bot"):
429        return cache.get("i_am_bot")
430    try:
431        me = await myself(client)
432    except Exception as e:
433        logger.error(e)
434        return False
435    cache.set("i_am_bot", me.is_bot, ttl=0)
436    return me.is_bot
437
438
439def match_urls(text: str) -> list[str]:
440    """Match all urls in a text."""
441    res = re.findall(
442        r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))',
443        str(text),
444    )
445    return [https_url(x[0]) for x in res]
446
447
448def remove_dash(text: str) -> str:
449    if not text:
450        return ""
451    while "---" in text:
452        text = text.replace("---", "")
453    while "--" in text:
454        text = text.replace("--", "")
455    return text
456
457
458def remove_pound(text: str) -> str:
459    if not text:
460        return ""
461    while "# " in text:
462        text = text.replace("# ", " ")
463    return text
464
465
466def remove_consecutive_newlines(text: str, newline_level: int = 3) -> str:
467    if not text:
468        return ""
469    while "\n\n\n" in text:
470        text = text.replace("\n\n\n", "\n\n")
471    if newline_level == 2:
472        while "\n\n" in text:
473            text = text.replace("\n\n", "\n")
474    return text
475
476
477def is_supported_by_ytdlp(url: str) -> bool:
478    """Check if this url is supported by ytdlp."""
479    if "t.me" in url:  # tg link
480        return False
481    extractors = gen_extractors()
482    extractors = [e for e in extractors if e.IE_NAME != "generic"]  # filter out generic extractors
483    return any(extractor.suitable(url) for extractor in extractors)
484
485
486def guess_mime(path: str | Path) -> str:
487    path = Path(path).expanduser().resolve()
488    if not path.is_file():
489        return ""
490    with contextlib.suppress(Exception):
491        import magic  # magic needs `libmagic` to be installed.
492
493        # `sudo apt-get install libmagic1` or `brew install libmagic`
494        return magic.from_file(path, mime=True)
495
496    # infer from `magic` failed
497    with contextlib.suppress(Exception):
498        return puremagic.from_file(path, mime=True)
499    return ""
500
501
502def unicode_to_ascii(text: str | float) -> str:
503    if not text:
504        return ""
505    return str(text).encode("unicode_escape").decode("ascii")
506
507
508def ascii_to_unicode(text: str) -> str:
509    if not text:
510        return ""
511    return bytes(str(text), "ascii").decode("unicode_escape")
512
513
514def save_txt(text: str, path: Path | str | None = None) -> str:
515    if path is None:
516        path = Path(DOWNLOAD_DIR) / f"{rand_string()}.txt"
517    Path(path).write_text(text)
518    return Path(path).as_posix()
519
520
521def check_data(text: str, check_keys: list[str] | None = None, check_kv: dict | None = None):
522    """Check if data contains required keys and key-value pairs.
523
524    Example data:
525    {
526        "foo": "bar",
527        "baz": {
528            "qux": "quux"
529        },
530        "lst": ["1", "2", "3"]
531    }
532
533    check_keys: ["foo", "baz.qux", "lst"]
534    check_kv: {"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]}
535    """
536    if not check_keys and not check_kv:  # no need to check
537        return
538    try:
539        data = json.loads(text)
540    except json.JSONDecodeError:
541        logger.error(f"Failed to parse data as json: {text}")
542        raise
543
544    # ["foo", "baz.qux", "lst"]
545    if check_keys:
546        for key in check_keys:
547            try:
548                glom(data, key)
549            except PathAccessError as e:
550                logger.error(e)
551                raise
552
553    # {"foo": "bar", "baz.qux": "quux", "lst": ["1", "2", "3"]}
554    if check_kv:
555        for key, required_value in check_kv.items():
556            try:
557                value = glom(data, key)
558            except PathAccessError as e:
559                logger.error(e)
560                raise
561
562            if str(value) != str(required_value):  # convert to str to compare
563                msg = f"{data=}, {key=}, {value=}, but required: {required_value}"
564                logger.error(msg)
565                raise ValueError
566
567
568def cleanup_old_files(root: Path | str | None = None, duration: int = CLEAN_OLD_FILES_OLDER_THAN_SECONDS) -> None:
569    """Clean up files older than duration seconds."""
570    if root is None:
571        root = DOWNLOAD_DIR
572    root = Path(root).expanduser().resolve()
573    if not root.is_dir():
574        return
575    now = datetime.now(UTC).timestamp()
576    for path in root.glob("*"):
577        if not path.is_file():
578            continue
579        if all(now - x > duration for x in [path.stat().st_atime, path.stat().st_ctime, path.stat().st_mtime]):
580            logger.warning(f"Deleting old file: {path}")
581            path.unlink(missing_ok=True)
582
583
584def convert_md(path: str | Path | None = None, html: str | None = None) -> str:
585    """Convert to markdown format."""
586    md = MarkItDown()
587    if path is not None:
588        path = Path(path).expanduser().resolve()
589        if not path.is_file():
590            return ""
591        result = md.convert(path)
592        return result.text_content
593    if html is not None:
594        with tempfile.NamedTemporaryFile("w", suffix=".html", delete=False) as f:
595            f.write(html)
596        result = md.convert(f.name)
597        Path(f.name).unlink(missing_ok=True)
598        return result.text_content
599    return ""
600
601
602def convert_html(texts: str = "", path: str | Path | None = None) -> str:
603    """Convert to markdown format."""
604    if path is not None:
605        path = Path(path).expanduser().resolve()
606        if not path.is_file():
607            return ""
608        texts = read_text(path)
609    texts = markdown.markdown(texts)
610    return texts.replace("\n", "<br>")
611
612
613def av2bv(aid: int | str) -> str:
614    """Bilibili AV -> BV ID converter."""
615    aid = str(aid)
616    if aid[:3].upper() == "BV1":  # BV1Y4UHYyE2z
617        return aid
618    aid = int(aid[2:]) if aid[:2].lower() == "av" else int(aid)
619    return aid2bvid(aid)
620
621
622def bv2av(bvid: str | int) -> int:
623    """Bilibili BV -> AV ID converter."""
624    bvid = str(bvid)
625    if bvid[:2].lower() == "av":  # av113503016851915
626        return int(bvid[2:])
627    if bvid.isdigit():  # 113503016851915
628        return int(bvid)
629    assert bvid[:3].upper() == "BV1"
630    return bvid2aid(bvid)
631
632
633def zhcn(text: str) -> str:
634    """Convert zh-tw to zh-cn."""
635    return zhconv.convert_for_mw(text, locale="zh-cn")
636
637
638if __name__ == "__main__":
639    print(rand_string())
640    print(rand_number())
641    # print(cleanup_old_files())
642    print(readable_size(0))
643    print(readable_size(2000 * 1024 * 1024))
644    print(unicode_to_ascii("你好"))
645    print(unicode_to_ascii(1.1))
646    print(unicode_to_ascii("test"))
647    print(ascii_to_unicode("1.1"))
648    print(ascii_to_unicode("test"))
649    print(match_urls("http://a.com/BmT8gZ 匹配不到就删除了https://b.com/MxRdMO"))
650    print(is_supported_by_ytdlp("https://www.bilibili.com/video/BV15n61YtEmk"))
651    print(is_supported_by_ytdlp("https://t.me/c/1744444199/2475260"))
652    print(is_supported_by_ytdlp("https://test.com/"))
653    print(find_url("https://test.com/"))
654    print(find_url("test.com/"))
655
656    # assert av2bv("av113503016851915") == "BV1Y4UHYyE2z"
657    # assert av2bv("113503016851915") == "BV1Y4UHYyE2z"
658    # assert av2bv(113503016851915) == "BV1Y4UHYyE2z"
659    # assert av2bv("BV1Y4UHYyE2z") == "BV1Y4UHYyE2z"
660    # assert bv2av("BV1Y4UHYyE2z") == 113503016851915
661    # assert bv2av("113503016851915") == 113503016851915
662    # assert bv2av("av113503016851915") == 113503016851915
663    # assert bv2av(113503016851915) == 113503016851915