main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import io
  5import tempfile
  6from pathlib import Path
  7from urllib.parse import quote_plus
  8
  9import anyio
 10import markdown
 11from glom import Coalesce, glom
 12from httpx import AsyncClient
 13from loguru import logger
 14from telegraph.aio import Telegraph
 15
 16from config import DB, TOKEN, TZ
 17from database.r2 import set_cf_r2
 18from utils import nowdt, rand_string
 19
 20
 21async def publish_telegraph(
 22    title: str,
 23    texts: str | None = None,
 24    html: str = "",
 25    author: str | None = None,
 26    url: str | None = None,
 27    ttl: str = "forever",  # 12h, 7d, 1M, ...
 28) -> str:
 29    """Publish to Telegraph."""
 30
 31    def clean_html(s: str | None) -> str:
 32        # Revise Telegraph Tags
 33        s = str(s).replace("<h1>", "<h3>").replace("</h1>", "</h3>")
 34        return s.replace("<h2>", "<h3>").replace("</h2>", "</h3>")
 35
 36    if not (texts or html):
 37        return ""
 38    if not TOKEN.TELEGRAPH:
 39        return await publish_cf_r2(title, texts=texts, html=html, author=author, url=url)
 40    if texts and not html:
 41        html = markdown.markdown(texts)
 42    telegraph = Telegraph(access_token=TOKEN.TELEGRAPH)
 43    account_info = {}
 44    if not (author and url):
 45        with contextlib.suppress(Exception):
 46            account_info = await telegraph.get_account_info()
 47        if not author:
 48            author = glom(account_info, Coalesce("result.short_name", "result.author_name"), default=None)
 49        if not url:
 50            url = glom(account_info, "result.author_url", default=None)
 51    # sanitize
 52    title = title[:256]
 53    if isinstance(author, str):
 54        author = author[:128]
 55    if isinstance(url, str):
 56        url = url[:512]
 57    try:
 58        page = await telegraph.create_page(title=title[:256], author_name=author, author_url=url, html_content=clean_html(html))
 59        logger.info(f"⚡️Telegraph: {page['url']}")
 60        return page["url"]
 61    except Exception as e:
 62        logger.error(f"Telegraph publish error: {e}")
 63        return await publish_cf_r2(title, texts=texts, html=html, author=author, url=url, ttl=ttl)
 64
 65
 66async def publish_cf_r2(
 67    title: str,
 68    texts: str | None = None,
 69    html: str = "",
 70    author: str | None = None,
 71    url: str | None = None,
 72    ttl: str = "forever",
 73) -> str:
 74    """Publish to CF R2."""
 75    if not (texts or html):
 76        return ""
 77    if texts and not html:
 78        html = markdown.markdown(texts)
 79    now = nowdt(TZ)
 80    today = f"{now:%Y-%m-%d}"
 81    key = f"InstantView/{today}-{rand_string(8)}.html" if ttl == "forever" else f"TTL/{ttl}/{today}-{rand_string(8)}.html"
 82    if not url:
 83        url = "https://instantview.telegram.org"
 84    if not author:
 85        author = "BennyBot"
 86
 87    html = f'<h1 id="iv-title">{title}</h1><a href="{url}" id="iv-author">{author}</a>{html}'
 88    html = f'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>{title}</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css"></head><body><article>{html}</article></body></html>'
 89    if await set_cf_r2(key=key, data=html, metadata={"title": title, "author": author, "url": url}, mime_type="text/html") and DB.CF_R2_PUBLIC_URL and TOKEN.R2_IV_HASH:
 90        pub_url = f"{DB.CF_R2_PUBLIC_URL.rstrip('/')}/{key}"
 91        logger.info(f"⚡️CF R2: {pub_url}")
 92        return f"https://t.me/iv?url={quote_plus(pub_url)}&rhash={TOKEN.R2_IV_HASH}"
 93    return await publish_neocities(title, texts=texts, html=html, author=author, url=url)
 94
 95
 96async def publish_neocities(title: str, texts: str | None = None, html: str = "", author: str | None = None, url: str | None = None) -> str:
 97    """Publish to neocities.org ."""
 98    if not TOKEN.NEOCITIES:
 99        return ""
100    if not (texts or html):
101        return ""
102    if texts and not html:
103        html = markdown.markdown(texts)
104    base_url = "https://neocities.org/api/upload"
105    username, password = TOKEN.NEOCITIES.split(",")
106    now = nowdt(TZ)
107    today = f"{now:%Y-%m-%d}"
108    server_file = f"{today}/{rand_string(12)}.html"
109    pub_url = f"https://{username}.neocities.org/{server_file.removesuffix('.html')}"
110    if not url:
111        url = pub_url
112    if not author:
113        author = "BennyBot"
114
115    html = f'<h1 id="iv-title">{title}</h1><a href="{url}" id="iv-author">{author}</a>{html}'
116    html = f'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>{title}</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/water.css@2/out/water.css"></head><body><article>{html}</article></body></html>'
117
118    try:
119        with tempfile.NamedTemporaryFile("w", suffix=".html", delete=False) as tempf:
120            tempf.write(html)
121        async with await anyio.open_file(tempf.name, "rb") as f:
122            content = await f.read()
123            client = AsyncClient(http2=True, timeout=20)
124            await client.post(
125                base_url,
126                auth=(username, password),
127                files={server_file: (server_file, io.BytesIO(content), "text/html")},
128            )
129        Path(tempf.name).unlink(missing_ok=True)
130        logger.info(f"⚡️Neocities: {pub_url}")
131    except Exception as e:
132        logger.error(f"Neocities publish error: {e}")
133        return ""
134
135    return f"https://t.me/iv?url={quote_plus(pub_url)}&rhash={TOKEN.NEOCITIES_IV_HASH}" if TOKEN.NEOCITIES_IV_HASH else pub_url