Commit e6ef94f

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-01-23 13:08:53
feat: use provided thumbnail for video
1 parent 739bcc2
Changed files (2)
src/preview/ytdlp.py
@@ -191,18 +191,22 @@ async def preview_ytdlp(
     texts = texts.strip()
     sent_messages: list[Message | None] = []  # 把发送的消息都记录下来
     target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
+    thumb = generate_cover(final_path)  # generate cover based on final_path
+    if not Path(thumb).is_file():
+        thumb = None
     # split large videos into multiple parts (less than 2GB)
     if video_path.is_file():
         if video_path.stat().st_size < MAX_FILE_BYTES:
             await modify_progress(text=f"🎬视频大小: {readable_size(path=video_path)}", **kwargs)
         else:
             await modify_progress(text="🎬视频大小超过Telegram限制(2000MB), 正在切分...", **kwargs)
-        videos = preprocess_media([{"video": video_path}])
+        videos = preprocess_media([{"video": video_path, "thumb": thumb}])
         if len(videos) > 1:
             await modify_progress(text=f"🎬视频已切分为{len(videos)}份, 开始上传...", **kwargs)
             await asyncio.sleep(1)
 
         for idx, video in enumerate(videos):
+            video["thumb"] = thumb
             caption = texts.replace("📝[", f"📝[P{idx + 1}-") if len(videos) > 1 else texts
             await modify_progress(text=f"⏫视频上传中-P{idx + 1}: {readable_size(path=video['video'])}\n🎬{Path(video['video']).name}", force_update=True, **kwargs)
             sent_messages.append(
@@ -218,7 +222,6 @@ async def preview_ytdlp(
     if audio_path.is_file():
         target_chat = target_chat if ytdlp_send_audio else TID.CHANNEL_YTDLP_BACKUP  # backup to channel if not send audio, so we can save it to db
         await modify_progress(text=f"⏫音频上传中: {readable_size(path=audio_path)}\n🎧{audio_path.name}", force_update=True, **kwargs)
-        thumb = generate_cover(final_path)  # generate cover based on final_path
         target_chat = to_int(target_chat)
         sent_messages.append(
             await client.send_audio(
src/message_utils.py
@@ -189,18 +189,24 @@ def preprocess_media(media: list[dict]) -> list[dict]:
     - photo's width and height ratio must be at most 20.
     - filesize < 2GB for video
 
-    Return a list of valid media info. The format must be:
-    {
-        "photo": "path/to/photo.jpg",
-    }
-    or
-    {
-        "video": "path/to/video.mp4",
-        "width": int,
-        "height": int,
-        "duration": int,
-        "thumb": "path/to/thumbnail.jpg" | None,
-    }
+    Args:
+        media (list[dict]): The list of media info.
+            format: {"photo": "path/to/photo.jpg"}
+                OR  { "video": "path/to/video.mp4",
+                      "thumb"(optional): "path/to/thumbnail.jpg" (if thumb is passed, use it. Otherwise, generate a new one)
+                    }
+
+    Returns:
+        list[dict]: The filtered media info.
+            {"photo": "path/to/photo.jpg"}
+            or
+            {
+                "video": "path/to/video.mp4",
+                "width": int,
+                "height": int,
+                "duration": int,
+                "thumb": "path/to/thumbnail.jpg" | None,
+            }
     """
     num_before = len(media)
     logger.trace(f"{num_before} media info before preprocess: {media}")
@@ -217,6 +223,7 @@ def preprocess_media(media: list[dict]) -> list[dict]:
 
     # Step-2: Videos
     for data in step1_res:
+        thumb = data.get("thumb")  # thumb is provided
         if video_path := data.get("video"):
             video_path = fix_video_rotation(video_path)
             if not is_valid_video(video_path):
@@ -225,7 +232,9 @@ def preprocess_media(media: list[dict]) -> list[dict]:
 
             # split large video files ( < 2GB)
             valid_videos = [x for x in split_large_video(video_path) if is_valid_video(x)]
-            thumbs = [generate_cover(x) for x in valid_videos]
+
+            # generate thumbnails for each video if thumb is not provided
+            thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
             for vpath, tpath in zip(valid_videos, thumbs, strict=True):
                 video_info = parse_media_info(vpath)
                 thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
@@ -700,3 +709,4 @@ async def telegram_uploading(current: int, total: int, *args):
 
 if __name__ == "__main__":
     print(sender_markdown_to_html("👤[@username](tg://user?id=123456789)//"))
+    print(preprocess_media([{"video": "~/tests/test.mp4", "thumb": "~/tests/test.jpg"}]))