Commit 4bd964b

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-05-10 04:18:26
feat(database): add `upload_uguu` function
1 parent 332381e
Changed files (1)
src/database.py
@@ -15,6 +15,7 @@ from pathlib import Path
 from urllib.parse import quote_plus, unquote_plus
 
 import anyio
+import puremagic
 from aioboto3 import Session
 from botocore.exceptions import ClientError
 from glom import glom
@@ -353,13 +354,38 @@ async def delete_alist(fname: str, *, ensure_ascii: bool = True) -> None:
     res = await hx_req(api, method="POST", headers=headers, post_json=payload, check_kv={"code": 200})
 
 
+def guess_mime(path: str | Path) -> str:
+    path = Path(path).expanduser().resolve()
+    if not path.is_file():
+        return ""
+    if mime_types := puremagic.magic_file(path):
+        return mime_types[0].mime_type
+    return ""
+
+
+async def upload_uguu(path: str | Path) -> str:
+    """Upload to https://uguu.se, return the url."""
+    path = Path(path).expanduser().resolve()
+    if not path.is_file():
+        return ""
+    api = "https://uguu.se/upload"
+    logger.debug(f"Uploading {path.name} to: https://Uguu.se")
+    async with await anyio.open_file(path, "rb") as f:
+        content = await f.read()
+        res = await hx_req(api, method="POST", files={"files[]": (path.name, io.BytesIO(content), guess_mime(path))}, check_kv={"success": True}, check_keys=["files.0.url"])
+    url = res["files"][0]["url"]
+    logger.success(f"Uploaded {path.name} to {url}")
+    return url
+
+
 if __name__ == "__main__":
     import asyncio
 
-    asyncio.run(list_alist())
-    asyncio.run(upload_alist("测试.mp3"))
-    asyncio.run(download_alist("测试.mp3"))
-    asyncio.run(delete_alist("测试.mp3"))
+    asyncio.run(upload_uguu("测试.mp3"))
+    # asyncio.run(list_alist())
+    # asyncio.run(upload_alist("测试.mp3"))
+    # asyncio.run(download_alist("测试.mp3"))
+    # asyncio.run(delete_alist("测试.mp3"))
     # asyncio.run(download_alist("test.py"))
     # asyncio.run(set_cf_r2("test2", metadata={"finished": "1"}))
     # asyncio.run(set_cf_r2("test2", data={"finished": "1"}))