Commit 2fcbf68

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-10-20 11:52:45
feat(watermark): add watermarking feature for images and videos
1 parent ab8c457
src/messages/utils.py
@@ -11,7 +11,7 @@ from pyrogram.parser.parser import Parser
 from pyrogram.types import Message, ReactionTypeEmoji, ReplyParameters
 
 from config import TEXT_LENGTH
-from utils import myself, readable_size, to_int
+from utils import myself, readable_size, strings_list, to_int
 
 
 def startswith_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[str] | None = None) -> bool:
@@ -90,13 +90,15 @@ def equal_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[s
     return False
 
 
-def remove_prefix(s: str, prefix: str) -> str:
+def remove_prefix(s: str, prefix: str, separator: str = ",") -> str:
     """Remove the prefix from the string."""
     if not prefix:
         return s
-    if s.lower().startswith(prefix.lower()):
-        return s[len(prefix) :].lstrip()
-    return s
+    final = ""
+    for pfx in strings_list(prefix, separator=separator):
+        if s.lower().startswith(pfx.lower()):
+            final = s[len(pfx) :].lstrip()
+    return final
 
 
 def get_reply_to(msg_id: int, reply_msg_id: int | str) -> ReplyParameters:
src/others/watermark.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import asyncio
+import contextlib
+import math
+from pathlib import Path
+
+from loguru import logger
+from PIL import Image
+from pyrogram.client import Client
+from pyrogram.types import Message
+
+from config import DB, DOWNLOAD_DIR, PREFIX, PROXY
+from messages.parser import parse_msg
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import equal_prefix, remove_prefix, set_reaction, startswith_prefix
+from multimedia import is_valid_video_or_audio, parse_media_info
+from networking import download_file
+from utils import rand_string
+
+HELP = f"""💧**添加水印**
+`{PREFIX.WATERMARK}` 回复一条媒体消息
+对于图片会添加Gemini水印
+对于视频会添加Sora水印
+"""
+
+
+async def add_watermark(client: Client, message: Message, **kwargs):
+    """Add watermark to media files."""
+    if not startswith_prefix(message.content, prefix=PREFIX.WATERMARK):
+        return
+    # send docs if message == "/watermark"
+    if equal_prefix(message.content, prefix=PREFIX.WATERMARK) and not message.reply_to_message:
+        await send2tg(client, message, texts=HELP, **kwargs)
+        return
+    media_message = message.reply_to_message if message.reply_to_message else message
+    info = parse_msg(media_message)
+
+    if info["mtype"] not in ["video", "photo"]:
+        await send2tg(client, message, texts="❌请回复一条媒体消息\n\n" + HELP, **kwargs)
+        return
+    wm_name = remove_prefix(message.content, prefix=PREFIX.WATERMARK).strip()
+    await set_reaction(client, message, reaction="👌")
+    if info["mtype"] == "photo":
+        await add_image_watermark(client, media_message, wm_name)
+        await set_reaction(client, message, reaction="")
+        return
+    if info["mtype"] == "video":
+        await add_video_watermark(client, media_message, wm_name, **kwargs)
+        await set_reaction(client, message, reaction="")
+        return
+
+
+async def add_image_watermark(client: Client, message: Message, watermark_name: str = ""):
+    fpath: str = await client.download_media(message)  # type: ignore
+    if not Path(fpath).is_file():
+        return
+
+    if not watermark_name:
+        watermark_name = "gemini"
+    processed_path = "/non-exist"
+    caption = ""
+    if watermark_name == "gemini":
+        wm_path = await download_file(
+            DB.CF_R2_PUBLIC_URL + "/Watermark/gemini.png",
+            Path(DOWNLOAD_DIR).joinpath("watermark/gemini.png"),
+            skip_exist=True,
+            proxy=PROXY.GOOGLE_SEARCH,
+        )
+        processed_path = gemini_watermark(fpath, wm_path)
+        caption = "已添加AI水印: Gemini"
+    if Path(processed_path).is_file():
+        await send2tg(client, message, texts=caption, media=[{"photo": processed_path}])
+    Path(fpath).unlink(missing_ok=True)
+    Path(processed_path).unlink(missing_ok=True)
+
+
+def gemini_watermark(img_path: str | Path, watermark_path: str | Path = "", margin=0) -> str:
+    """Add gemini wathermark to given image."""
+    try:
+        base_image = Image.open(img_path).convert("RGBA")
+        base_width, base_height = base_image.size
+
+        watermark = Image.open(watermark_path).convert("RGBA")
+        wm_width, wm_height = watermark.size
+
+        # resize wathermark
+        need_wm_width = int(base_width / 9)
+        need_wm_height = int(wm_height * (need_wm_width / wm_width))
+        watermark = watermark.resize((need_wm_width, need_wm_height), Image.Resampling.LANCZOS)
+        wm_width, wm_height = watermark.size
+
+        # calculate position (right bottom)
+        position_x = base_width - wm_width - margin
+        position_y = base_height - wm_height - margin
+        position = (position_x, position_y)
+
+        # paste watermark to transparent layer
+        transparent_layer = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
+        transparent_layer.paste(watermark, position)
+
+        # merge transparent layer
+        final_image = Image.alpha_composite(base_image, transparent_layer)
+        final_image = final_image.convert("RGB")
+
+        # save
+        out_path = Path(DOWNLOAD_DIR) / f"{rand_string()}.jpg"
+        final_image.save(out_path)
+        return out_path.as_posix()
+    except Exception as e:
+        logger.error(e)
+        return ""
+
+
+async def add_video_watermark(client: Client, message: Message, watermark_name: str = "", **kwargs):
+    kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载视频...", quote=True))
+    fpath: str = await client.download_media(message)  # type: ignore
+    if not Path(fpath).is_file():
+        await modify_progress(text="❌视频下载失败", force_update=True, **kwargs)
+        return
+    processed_path = "/non-exist"
+    caption = ""
+    if not watermark_name:
+        watermark_name = "sora"
+    await modify_progress(text="⏳视频下载成功, 正在添加水印...\n此过程需要对视频重新编码, 请耐心等待", force_update=True, **kwargs)
+    if watermark_name == "sora":
+        processed_path = await sora_watermark(fpath)
+        caption = "已添加AI水印: Sora"
+
+    if Path(processed_path).is_file():
+        await modify_progress(text="✅水印添加成功, 正在发送视频...", force_update=True, **kwargs)
+        await send2tg(client, message, texts=caption, media=[{"video": processed_path}])
+        await modify_progress(del_status=True, **kwargs)
+    Path(fpath).unlink(missing_ok=True)
+    Path(processed_path).unlink(missing_ok=True)
+
+
+async def sora_watermark(video_path: str | Path) -> str:
+    """Add sora watermark to video."""
+    vinfo = await parse_media_info(video_path)
+    target_w, target_h = vinfo["width"], vinfo["height"]
+    orientation = "landscape" if vinfo["width"] > vinfo["height"] else "portrait"
+    # If the resolution of the video is too large, we resize it to 720P.
+    need_resize = False
+    if orientation == "landscape" and (vinfo["width"] > 1280 or vinfo["height"] > 720):  # noqa: SIM114
+        need_resize = True
+    elif orientation == "portrait" and (vinfo["width"] > 720 or vinfo["height"] > 1280):
+        need_resize = True
+
+    main_video_stream = "[0:v]"
+    filter_prefix = ""
+    if need_resize:
+        if orientation == "landscape":
+            target_w, target_h = 1280, 720
+            # Scale the main video down to 720p, maintaining aspect ratio.
+            # Using -2 for height ensures it's an even number, required by many codecs.
+            scale_filter = f"scale={target_w}:-2"
+        else:  # portrait
+            target_w, target_h = 720, 1280
+            scale_filter = f"scale=-2:{target_h}"
+        filter_prefix = f"[0:v]{scale_filter}[main_v];"
+        main_video_stream = "[main_v]"  # The overlay will now use the scaled stream.
+
+    watermark_path = await create_sora_animation(target_w, target_h)
+    winfo = await parse_media_info(watermark_path)
+    num_loops = math.ceil(vinfo["raw_duration"] / winfo["raw_duration"])
+    filter_complex = (
+        (
+            # The prefix will be empty if no resize is needed.
+            f"{filter_prefix}"
+            # Scale the watermark to the TARGET resolution (either original or 720p).
+            f"[1:v]scale={target_w}:{target_h}[scaled];"
+            f"[scaled]colorkey=0x000000:0.3:0.2[keyed];"
+            # If only one loop, trim the watermark to the main video's duration.
+            f"[keyed]trim=end={vinfo['raw_duration']},setpts=PTS-STARTPTS[wm];"
+            # Overlay the watermark onto the (potentially scaled) main video stream.
+            f"{main_video_stream}[wm]overlay=0:0[v]"
+        )
+        if num_loops == 1
+        else (f"{filter_prefix}[1:v]scale={target_w}:{target_h}[scaled];[scaled]colorkey=0x000000:0.3:0.2[keyed];{main_video_stream}[keyed]overlay=0:0[v]")
+    )
+    out_path = Path(DOWNLOAD_DIR) / f"{rand_string()}.mp4"
+    command = ["ffmpeg", "-i", str(video_path)]
+    if num_loops > 1:
+        command.extend(["-stream_loop", f"{num_loops - 1}"])
+    command.extend(
+        [
+            "-i",
+            str(watermark_path),
+            "-filter_complex",
+            filter_complex,
+            "-map",
+            "[v]",
+            "-map",
+            "0:a?",
+            "-c:a",
+            "copy",
+            "-c:v",
+            "libx264",
+            "-t",
+            str(vinfo["raw_duration"]),
+            "-y",
+            out_path.as_posix(),
+        ]
+    )
+    logger.debug(" ".join(command))
+    with contextlib.suppress(Exception):
+        process = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
+        _, stderr = await process.communicate()
+        logger.trace(stderr.decode())
+    Path(watermark_path).unlink(missing_ok=True)
+    return out_path.as_posix() if await is_valid_video_or_audio(out_path) else "/non-exist"
+
+
+async def create_sora_animation(width: int, height: int) -> str:
+    """Create a Sora-style animated watermark video on a black background.
+
+    This function generates a video where a small sora watermark appears sequentially
+    at three different locations. It starts with a source watermark video, scales it down,
+    and then places it on a newly created black canvas with the specified dimensions.
+
+    The process involves:
+    1. Downloading a base watermark video ('sora.mp4').
+    2. Scaling the watermark to be approximately 1% of the total area of the
+       target resolution (width * height).
+    3. Creating a black background video with a duration of three times the
+       original watermark's length.
+    4. Placing the scaled watermark at three predefined positions over three
+       consecutive time intervals using a complex FFmpeg filter:
+       - 0 to T: Top-left area.
+       - T to 2T: Right-middle area.
+       - 2T to 3T: Left-bottom area.
+       (where T is the duration of the source watermark video).
+    5. The FFmpeg command internally splits the scaled watermark into three streams
+       and uses the 'setpts' filter to shift the timestamps of the second and
+       third streams, ensuring they play sequentially.
+
+    Args:
+        width (int): The width of the output video canvas in pixels.
+        height (int): The height of the output video canvas in pixels.
+
+    Returns:
+        str: The file path to the generated watermark video. If the creation
+             fails or the output file is invalid, it returns the string "/non-exist".
+    """
+    # The watermark is cropped from a 704x1280 video, and the size is 154x58 (~1% of the original size).
+    watermark_path = await download_file(DB.CF_R2_PUBLIC_URL + "/Watermark/sora.mp4", Path(DOWNLOAD_DIR).joinpath("watermark/sora.mp4"), skip_exist=True)
+    winfo = await parse_media_info(watermark_path)
+    scale_ratio = math.sqrt(width * height / (winfo["width"] * winfo["height"]) / 100)
+    wm_width = round(winfo["width"] * scale_ratio)
+    wm_height = round(winfo["height"] * scale_ratio)
+
+    x_left = 0.2271 * wm_width
+    y_top = 1.125 * wm_height if width > height else 1.986 * wm_height
+    y_middle = 0.5 * (height - wm_height)
+    x_right = width - wm_width - x_left
+    y_bottom = height - wm_height - y_top
+
+    filter_complex = f"[0:v]scale={wm_width}:{wm_height},split=3[in1][in2][in3]; [in2]setpts=PTS+{winfo['raw_duration']}/TB[in2_shifted]; [in3]setpts=PTS+{2 * winfo['raw_duration']}/TB[in3_shifted]; color=s={width}x{height}:c=black:d={3 * winfo['raw_duration']}[bg]; [bg][in1]overlay={x_left}:{y_top}:enable='between(t,0,{winfo['raw_duration']})'[bg1]; [bg1][in2_shifted]overlay={x_right}:{y_middle}:enable='between(t,{winfo['raw_duration']},{2 * winfo['raw_duration']})'[bg2]; [bg2][in3_shifted]overlay={x_left}:{y_bottom}:enable='between(t,{2 * winfo['raw_duration']},{3 * winfo['raw_duration']})'"
+
+    out_path = Path(DOWNLOAD_DIR) / f"{rand_string()}.mp4"
+    command = [
+        "ffmpeg",
+        "-i",
+        str(watermark_path),
+        "-filter_complex",
+        filter_complex,
+        "-c:v",
+        "libx264",
+        "-y",
+        out_path.as_posix(),
+    ]
+    logger.debug("Create mini sora watermark: " + " ".join(command))
+    with contextlib.suppress(Exception):
+        process = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
+        _, stderr = await process.communicate()
+        logger.trace(stderr.decode())
+    return out_path.as_posix() if await is_valid_video_or_audio(out_path) else "/non-exist"
src/config.py
@@ -74,6 +74,7 @@ class ENABLE:  # see fine-grained permission in `src/permission.py`
     QUOTLY = os.getenv("ENABLE_QUOTLY", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     TMDB = os.getenv("ENABLE_TMDB", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     FFMPEG = os.getenv("ENABLE_FFMPEG", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+    WATERMARK = os.getenv("ENABLE_WATERMARK", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
 
 
 class PREFIX:
@@ -104,6 +105,7 @@ class PREFIX:
     TMDB = os.getenv("PREFIX_TMDB", "/tmdb").lower()
     FFMPEG_CUT = os.getenv("PREFIX_FFMPEG_CUT", "/cut").lower()
     FFMPEG_H264 = os.getenv("PREFIX_FFMPEG_H264", "/h264").lower()
+    WATERMARK = os.getenv("PREFIX_WATERMARK", "/wm, /watermark").lower()
 
 
 class API:
src/handler.py
@@ -28,6 +28,7 @@ from others.raw_img_file import convert_raw_img_file
 from others.search_google import search_google
 from others.search_ytb import search_youtube
 from others.tmdb import search_tmdb
+from others.watermark import add_watermark
 from permission import check_service
 from preview.bilibili import preview_bilibili
 from preview.douyin import preview_douyin
@@ -74,6 +75,7 @@ async def handle_utilities(
     ytb: bool = True,
     tmdb: bool = True,
     ffmpeg: bool = True,
+    watermark: bool = True,
     raw_img: bool = True,
     show_progress: bool = True,
     detail_progress: bool = False,
@@ -106,6 +108,7 @@ async def handle_utilities(
         summary (bool, optional): Enable AI summary. Defaults to True.
         tmdb (bool, optional): Enable TMDB query. Defaults to True.
         ffmpeg (bool, optional): Enable ffmpeg commands. Defaults to True.
+        watermark (bool, optional): Enable watermark commands. Defaults to True.
         raw_img (bool, optional): Enable convert raw image. Defaults to False.
         show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
         detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
@@ -153,6 +156,8 @@ async def handle_utilities(
     if ffmpeg:
         await ffmpeg_cut(client, message, **kwargs)
         await ffmpeg_h264(client, message, **kwargs)
+    if watermark:
+        await add_watermark(client, message, **kwargs)
 
 
 async def handle_social_media(
@@ -439,6 +444,8 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefix: str):
     if permission["ffmpeg"]:
         msg += f"\n✂️**视频切片**: `{PREFIX.FFMPEG_CUT}` 回复视频消息"
         msg += f"\n🎬**视频转码**: `{PREFIX.FFMPEG_H264}` 回复视频消息"
+    if permission["watermark"]:
+        msg += f"\n💧**添加水印**: `{PREFIX.WATERMARK}` 回复媒体消息"
 
     msg += "\n\n单独发送每个命令前缀本身可查看该命令详细使用说明"
     return msg
src/multimedia.py
@@ -42,6 +42,7 @@ async def parse_media_info(path: str | Path | None) -> dict:
             "name": path.stem,
             "path": path.resolve().as_posix(),
             "duration": math.floor(float(duration)),
+            "raw_duration": float(duration),
             "width": round(float(width)),
             "height": round(float(height)),
             "audio_codec": audio_stream.get("codec_name", ""),
src/permission.py
@@ -147,6 +147,7 @@ def check_service(cid: int | str, ctype: str) -> dict:
         "quotly": True,
         "tmdb": True,
         "ffmpeg": True,
+        "watermark": True,
     } | global_permissions()
 
     if ctype == "PRIVATE":
@@ -216,6 +217,8 @@ def check_service(cid: int | str, ctype: str) -> dict:
         permission["tmdb"] = False
     if not ENABLE.FFMPEG:
         permission["ffmpeg"] = False
+    if not ENABLE.WATERMARK:
+        permission["watermark"] = False
 
     """
     Set for specific chat