main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import io
4from pathlib import Path
5
6import anyio
7from httpx import AsyncClient, AsyncHTTPTransport
8from loguru import logger
9
10from config import DB
11from networking import hx_req
12
13
14async def upload_pastbin(path: str | Path, ttl: int | str | None = None) -> tuple[str, str]:
15 """Upload to Pastbin Workders.
16
17 Returns:
18 download url, manage url
19
20 https://github.com/SharzyL/pastebin-worker
21 """
22 path = Path(path).expanduser().resolve()
23 if not path.is_file():
24 return "", ""
25 if path.stat().st_size > DB.PASTBIN_MAX_BYTES:
26 logger.warning(f"File size exceeds {DB.PASTBIN_MAX_BYTES} bytes, skipping: {path.name}")
27 return "", ""
28 logger.debug(f"Uploading {path.name} to: {DB.PASTBIN_SERVER}")
29
30 async with await anyio.open_file(path, "rb") as f:
31 content = await f.read()
32 payload = {"c": (path.name, io.BytesIO(content)), "p": "1"}
33 if ttl is not None:
34 payload |= {"e": str(ttl)}
35 res = await hx_req(DB.PASTBIN_SERVER, method="POST", files=payload, check_keys=["url", "manageUrl"])
36 url = res["url"]
37 logger.success(f"Uploaded {path.name} to {url}")
38 return url, res["manageUrl"]
39
40
41async def delete_pastbin(url: str):
42 """Delete file in Pastbin Workders.
43
44 https://github.com/SharzyL/pastebin-worker
45 """
46 if not url:
47 return
48 async with AsyncClient(http2=True, follow_redirects=True, transport=AsyncHTTPTransport(retries=3, http2=True)) as hx:
49 try:
50 resp = await hx.delete(url, timeout=30)
51 resp.raise_for_status()
52 except Exception as e:
53 logger.warning(f"DEL Pastbin failed for url={url}: {e}")
54 return
55 else:
56 logger.success(f"DEL Pastbin for url={url}")