main
 1#!/usr/bin/env python
 2# -*- coding: utf-8 -*-
 3import io
 4from pathlib import Path
 5
 6import anyio
 7from httpx import AsyncClient, AsyncHTTPTransport
 8from loguru import logger
 9
10from config import DB
11from networking import hx_req
12
13
14async def upload_pastbin(path: str | Path, ttl: int | str | None = None) -> tuple[str, str]:
15    """Upload to Pastbin Workders.
16
17    Returns:
18        download url, manage url
19
20    https://github.com/SharzyL/pastebin-worker
21    """
22    path = Path(path).expanduser().resolve()
23    if not path.is_file():
24        return "", ""
25    if path.stat().st_size > DB.PASTBIN_MAX_BYTES:
26        logger.warning(f"File size exceeds {DB.PASTBIN_MAX_BYTES} bytes, skipping: {path.name}")
27        return "", ""
28    logger.debug(f"Uploading {path.name} to: {DB.PASTBIN_SERVER}")
29
30    async with await anyio.open_file(path, "rb") as f:
31        content = await f.read()
32        payload = {"c": (path.name, io.BytesIO(content)), "p": "1"}
33        if ttl is not None:
34            payload |= {"e": str(ttl)}
35        res = await hx_req(DB.PASTBIN_SERVER, method="POST", files=payload, check_keys=["url", "manageUrl"])
36    url = res["url"]
37    logger.success(f"Uploaded {path.name} to {url}")
38    return url, res["manageUrl"]
39
40
41async def delete_pastbin(url: str):
42    """Delete file in Pastbin Workders.
43
44    https://github.com/SharzyL/pastebin-worker
45    """
46    if not url:
47        return
48    async with AsyncClient(http2=True, follow_redirects=True, transport=AsyncHTTPTransport(retries=3, http2=True)) as hx:
49        try:
50            resp = await hx.delete(url, timeout=30)
51            resp.raise_for_status()
52        except Exception as e:
53            logger.warning(f"DEL Pastbin failed for url={url}: {e}")
54            return
55        else:
56            logger.success(f"DEL Pastbin for url={url}")