main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import asyncio
4import contextlib
5import math
6from pathlib import Path
7
8from loguru import logger
9from PIL import Image
10from pyrogram.client import Client
11from pyrogram.types import Message
12
13from config import DB, DOWNLOAD_DIR, PREFIX, PROXY
14from messages.parser import parse_msg
15from messages.progress import modify_progress
16from messages.sender import send2tg
17from messages.utils import equal_prefix, remove_prefix, set_reaction, startswith_prefix
18from multimedia import is_valid_video_or_audio, parse_media_info
19from networking import download_file
20from utils import rand_string
21
22HELP = f"""💧**添加水印**
23`{PREFIX.WATERMARK}` 回复一条媒体消息
24对于图片会添加Gemini水印
25对于视频会添加Sora水印
26"""
27
28
29async def add_watermark(client: Client, message: Message, **kwargs):
30 """Add watermark to media files."""
31 if not startswith_prefix(message.content, prefix=PREFIX.WATERMARK):
32 return
33 # send docs if message == "/watermark"
34 if equal_prefix(message.content, prefix=PREFIX.WATERMARK) and not message.reply_to_message:
35 await send2tg(client, message, texts=HELP, **kwargs)
36 return
37 media_message = message.reply_to_message if message.reply_to_message else message
38 info = parse_msg(media_message)
39
40 if info["mtype"] not in ["video", "photo"]:
41 await send2tg(client, message, texts="❌请回复一条媒体消息\n\n" + HELP, **kwargs)
42 return
43 wm_name = remove_prefix(message.content, prefix=PREFIX.WATERMARK).strip()
44 await set_reaction(client, message, reaction="👌")
45 if info["mtype"] == "photo":
46 await add_image_watermark(client, media_message, wm_name)
47 await set_reaction(client, message, reaction="")
48 return
49 if info["mtype"] == "video":
50 await add_video_watermark(client, media_message, wm_name, **kwargs)
51 await set_reaction(client, message, reaction="")
52 return
53
54
55async def add_image_watermark(client: Client, message: Message, watermark_name: str = ""):
56 fpath: str = await client.download_media(message) # type: ignore
57 if not Path(fpath).is_file():
58 return
59
60 if not watermark_name:
61 watermark_name = "gemini"
62 processed_path = "/non-exist"
63 caption = ""
64 if watermark_name == "gemini":
65 wm_path = await download_file(
66 DB.CF_R2_PUBLIC_URL + "/Watermark/gemini.png",
67 Path(DOWNLOAD_DIR).joinpath("watermark/gemini.png"),
68 skip_exist=True,
69 proxy=PROXY.DOWNLOAD,
70 )
71 processed_path = gemini_watermark(fpath, wm_path)
72 caption = "已添加AI水印: Gemini"
73 if Path(processed_path).is_file():
74 await send2tg(client, message, texts=caption, media=[{"photo": processed_path}])
75 Path(fpath).unlink(missing_ok=True)
76 Path(processed_path).unlink(missing_ok=True)
77
78
79def gemini_watermark(img_path: str | Path, watermark_path: str | Path = "", margin=0) -> str:
80 """Add gemini wathermark to given image."""
81 try:
82 base_image = Image.open(img_path).convert("RGBA")
83 base_width, base_height = base_image.size
84
85 watermark = Image.open(watermark_path).convert("RGBA")
86 wm_width, wm_height = watermark.size
87
88 # resize wathermark
89 need_wm_width = int(base_width / 9)
90 need_wm_height = int(wm_height * (need_wm_width / wm_width))
91 watermark = watermark.resize((need_wm_width, need_wm_height), Image.Resampling.LANCZOS)
92 wm_width, wm_height = watermark.size
93
94 # calculate position (right bottom)
95 position_x = base_width - wm_width - margin
96 position_y = base_height - wm_height - margin
97 position = (position_x, position_y)
98
99 # paste watermark to transparent layer
100 transparent_layer = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
101 transparent_layer.paste(watermark, position)
102
103 # merge transparent layer
104 final_image = Image.alpha_composite(base_image, transparent_layer)
105 final_image = final_image.convert("RGB")
106
107 # save
108 out_path = Path(DOWNLOAD_DIR) / f"{rand_string()}.jpg"
109 final_image.save(out_path)
110 return out_path.as_posix()
111 except Exception as e:
112 logger.error(e)
113 return ""
114
115
116async def add_video_watermark(client: Client, message: Message, watermark_name: str = "", **kwargs):
117 kwargs["progress"] = kwargs.get("progress", await message.reply_text("⏬正在下载视频...", quote=True))
118 fpath: str = await client.download_media(message) # type: ignore
119 if not Path(fpath).is_file():
120 await modify_progress(text="❌视频下载失败", force_update=True, **kwargs)
121 return
122 processed_path = "/non-exist"
123 caption = ""
124 if not watermark_name:
125 watermark_name = "sora"
126 await modify_progress(text="⏳视频下载成功, 正在添加水印...\n此过程需要对视频重新编码, 请耐心等待", force_update=True, **kwargs)
127 if watermark_name == "sora":
128 processed_path = await sora_watermark(fpath)
129 caption = "已添加AI水印: Sora"
130
131 if Path(processed_path).is_file():
132 await modify_progress(text="✅水印添加成功, 正在发送视频...", force_update=True, **kwargs)
133 await send2tg(client, message, texts=caption, media=[{"video": processed_path}])
134 await modify_progress(del_status=True, **kwargs)
135 Path(fpath).unlink(missing_ok=True)
136 Path(processed_path).unlink(missing_ok=True)
137
138
139async def sora_watermark(video_path: str | Path) -> str:
140 """Add sora watermark to video."""
141 vinfo = await parse_media_info(video_path)
142 target_w, target_h = vinfo["width"], vinfo["height"]
143 orientation = "landscape" if vinfo["width"] > vinfo["height"] else "portrait"
144 # If the resolution of the video is too large, we resize it to 720P.
145 need_resize = False
146 if orientation == "landscape" and (vinfo["width"] > 1280 or vinfo["height"] > 720): # noqa: SIM114
147 need_resize = True
148 elif orientation == "portrait" and (vinfo["width"] > 720 or vinfo["height"] > 1280):
149 need_resize = True
150
151 main_video_stream = "[0:v]"
152 filter_prefix = ""
153 if need_resize:
154 if orientation == "landscape":
155 target_w, target_h = 1280, 720
156 # Scale the main video down to 720p, maintaining aspect ratio.
157 # Using -2 for height ensures it's an even number, required by many codecs.
158 scale_filter = f"scale={target_w}:-2"
159 else: # portrait
160 target_w, target_h = 720, 1280
161 scale_filter = f"scale=-2:{target_h}"
162 filter_prefix = f"[0:v]{scale_filter}[main_v];"
163 main_video_stream = "[main_v]" # The overlay will now use the scaled stream.
164
165 watermark_path = await create_sora_animation(target_w, target_h)
166 winfo = await parse_media_info(watermark_path)
167 num_loops = math.ceil(vinfo["raw_duration"] / winfo["raw_duration"])
168 filter_complex = (
169 (
170 # The prefix will be empty if no resize is needed.
171 f"{filter_prefix}"
172 # Scale the watermark to the TARGET resolution (either original or 720p).
173 f"[1:v]scale={target_w}:{target_h}[scaled];"
174 f"[scaled]colorkey=0x000000:0.3:0.2[keyed];"
175 # If only one loop, trim the watermark to the main video's duration.
176 f"[keyed]trim=end={vinfo['raw_duration']},setpts=PTS-STARTPTS[wm];"
177 # Overlay the watermark onto the (potentially scaled) main video stream.
178 f"{main_video_stream}[wm]overlay=0:0[v]"
179 )
180 if num_loops == 1
181 else (f"{filter_prefix}[1:v]scale={target_w}:{target_h}[scaled];[scaled]colorkey=0x000000:0.3:0.2[keyed];{main_video_stream}[keyed]overlay=0:0[v]")
182 )
183 out_path = Path(DOWNLOAD_DIR) / f"{rand_string()}.mp4"
184 command = ["ffmpeg", "-i", str(video_path)]
185 if num_loops > 1:
186 command.extend(["-stream_loop", f"{num_loops - 1}"])
187 command.extend(
188 [
189 "-i",
190 str(watermark_path),
191 "-filter_complex",
192 filter_complex,
193 "-map",
194 "[v]",
195 "-map",
196 "0:a?",
197 "-c:a",
198 "copy",
199 "-c:v",
200 "libx264",
201 "-t",
202 str(vinfo["raw_duration"]),
203 "-y",
204 out_path.as_posix(),
205 ]
206 )
207 logger.debug(" ".join(command))
208 with contextlib.suppress(Exception):
209 process = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
210 _, stderr = await process.communicate()
211 logger.trace(stderr.decode())
212 Path(watermark_path).unlink(missing_ok=True)
213 return out_path.as_posix() if await is_valid_video_or_audio(out_path) else "/non-exist"
214
215
216async def create_sora_animation(width: int, height: int) -> str:
217 """Create a Sora-style animated watermark video on a black background.
218
219 This function generates a video where a small sora watermark appears sequentially
220 at three different locations. It starts with a source watermark video, scales it down,
221 and then places it on a newly created black canvas with the specified dimensions.
222
223 The process involves:
224 1. Downloading a base watermark video ('sora.mp4').
225 2. Scaling the watermark to be approximately 1% of the total area of the
226 target resolution (width * height).
227 3. Creating a black background video with a duration of three times the
228 original watermark's length.
229 4. Placing the scaled watermark at three predefined positions over three
230 consecutive time intervals using a complex FFmpeg filter:
231 - 0 to T: Top-left area.
232 - T to 2T: Right-middle area.
233 - 2T to 3T: Left-bottom area.
234 (where T is the duration of the source watermark video).
235 5. The FFmpeg command internally splits the scaled watermark into three streams
236 and uses the 'setpts' filter to shift the timestamps of the second and
237 third streams, ensuring they play sequentially.
238
239 Args:
240 width (int): The width of the output video canvas in pixels.
241 height (int): The height of the output video canvas in pixels.
242
243 Returns:
244 str: The file path to the generated watermark video. If the creation
245 fails or the output file is invalid, it returns the string "/non-exist".
246 """
247 # The watermark is cropped from a 704x1280 video, and the size is 154x58 (~1% of the original size).
248 watermark_path = await download_file(DB.CF_R2_PUBLIC_URL + "/Watermark/sora.mp4", Path(DOWNLOAD_DIR).joinpath("watermark/sora.mp4"), skip_exist=True)
249 winfo = await parse_media_info(watermark_path)
250 scale_ratio = math.sqrt(width * height / (winfo["width"] * winfo["height"]) / 100)
251 wm_width = round(winfo["width"] * scale_ratio)
252 wm_height = round(winfo["height"] * scale_ratio)
253
254 x_left = 0.2271 * wm_width
255 y_top = 1.125 * wm_height if width > height else 1.986 * wm_height
256 y_middle = 0.5 * (height - wm_height)
257 x_right = width - wm_width - x_left
258 y_bottom = height - wm_height - y_top
259
260 filter_complex = f"[0:v]scale={wm_width}:{wm_height},split=3[in1][in2][in3]; [in2]setpts=PTS+{winfo['raw_duration']}/TB[in2_shifted]; [in3]setpts=PTS+{2 * winfo['raw_duration']}/TB[in3_shifted]; color=s={width}x{height}:c=black:d={3 * winfo['raw_duration']}[bg]; [bg][in1]overlay={x_left}:{y_top}:enable='between(t,0,{winfo['raw_duration']})'[bg1]; [bg1][in2_shifted]overlay={x_right}:{y_middle}:enable='between(t,{winfo['raw_duration']},{2 * winfo['raw_duration']})'[bg2]; [bg2][in3_shifted]overlay={x_left}:{y_bottom}:enable='between(t,{2 * winfo['raw_duration']},{3 * winfo['raw_duration']})'"
261
262 out_path = Path(DOWNLOAD_DIR) / f"{rand_string()}.mp4"
263 command = [
264 "ffmpeg",
265 "-i",
266 str(watermark_path),
267 "-filter_complex",
268 filter_complex,
269 "-c:v",
270 "libx264",
271 "-y",
272 out_path.as_posix(),
273 ]
274 logger.debug("Create mini sora watermark: " + " ".join(command))
275 with contextlib.suppress(Exception):
276 process = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
277 _, stderr = await process.communicate()
278 logger.trace(stderr.decode())
279 return out_path.as_posix() if await is_valid_video_or_audio(out_path) else "/non-exist"