main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3from datetime import datetime
  4from pathlib import Path
  5
  6from loguru import logger
  7from pyrogram.types import Chat, Message
  8from pyrogram.types.messages_and_media.message import Str
  9
 10from ai.main import ai_text_generation
 11from asr.utils import audio_duration as get_media_duration
 12from config import PREFIX
 13from publish import telegraph_aipage
 14from schema import AIPage, ContentExtraction
 15from summarize.utils import generate_mermaid, parse_summary_sources, publish_mermaid
 16from utils import count_subtitles, digest, read_text, to_dt
 17
 18
 19async def summarize(
 20    sources: list[dict] | None = None,
 21    model: str = "gemini",
 22    title: str | None = None,
 23    author: str | None = None,
 24    url: str | None = None,
 25    date: str | datetime | None = None,
 26    description: str | dict | None = None,
 27    ttl: str | None = None,
 28    *,
 29    force_r2_page: bool = False,
 30    mermaid: bool = False,
 31    min_text_length: int | None = None,  # minimum text length to summarize
 32    min_audio_duration: float | None = None,  # minimum audio duration to summarize
 33    min_video_duration: float | None = None,  # minimum video duration to summarize
 34    max_audio_duration: float | None = None,  # maximum audio duration to summarize
 35    max_video_duration: float | None = None,  # maximum video duration to summarize
 36    max_audio_bytes: int | None = None,  # maximum audio bytes to summarize
 37    max_video_bytes: int | None = None,  # maximum video bytes to summarize
 38    min_num_image: int | None = None,  # minimum number of images to summarize
 39    max_num_image: int | None = None,  # maximum number of images to summarize
 40    min_num_video: int | None = None,  # minimum number of videos to summarize
 41    max_num_video: int | None = None,  # maximum number of videos to summarize
 42    min_num_audio: int | None = None,  # minimum number of audios to summarize
 43    max_num_audio: int | None = None,  # maximum number of audios to summarize
 44    skip_max_video_duration: float | None = None,  # skip max video duration if it is greater than this value
 45    skip_max_audio_duration: float | None = None,  # skip max audio duration if it is greater than this value
 46    skip_max_video_bytes: int | None = None,  # skip max video bytes if it is greater than this value
 47    skip_max_audio_bytes: int | None = None,  # skip max audio bytes if it is greater than this value
 48) -> dict:
 49    r"""Summarize the article or transcripts.
 50
 51    Args:
 52        sources (list[dict] | None): The sources to summary.
 53        # text
 54        {"type": "system_prompt", "text": "This is system prompt."}
 55        {"type": "text", "text": "Hello."}
 56        {"type": "text", "path": "/path/to/file.txt"}
 57        {"type": "transcripts", "text": "[00:00] Hello\n[00:01] a sentence"}
 58
 59        # media
 60        {"type": "image", "path": "/path/to/image.jpg", "mime_type": "image/jpeg"}
 61        {"type": "video", "path": "/path/to/video.mp4", "mime_type": "video/mp4"}
 62        {"type": "audio", "path": "/path/to/audio.mp3", "mime_type": "audio/mpeg"}
 63
 64        # file
 65        {"type": "file", "path": "/path/to/file.pdf", "mime_type": "application/pdf"}
 66
 67        # special
 68        {"type": "youtube", "url": "https://www.youtube.com/watch?v=videoid"}
 69
 70    Returns:
 71        The summary.
 72    """
 73    sources = filter_sources(
 74        sources,
 75        skip_max_video_duration=skip_max_video_duration,
 76        skip_max_audio_duration=skip_max_audio_duration,
 77        skip_max_video_bytes=skip_max_video_bytes,
 78        skip_max_audio_bytes=skip_max_audio_bytes,
 79    )
 80    if not sources:
 81        return {}
 82    title = title or "AI导读"
 83    if not is_eligible(
 84        sources,
 85        min_text_length=min_text_length,
 86        min_audio_duration=min_audio_duration,
 87        min_video_duration=min_video_duration,
 88        max_audio_duration=max_audio_duration,
 89        max_video_duration=max_video_duration,
 90        max_audio_bytes=max_audio_bytes,
 91        max_video_bytes=max_video_bytes,
 92        min_num_image=min_num_image,
 93        max_num_image=max_num_image,
 94        min_num_video=min_num_video,
 95        max_num_video=max_num_video,
 96        min_num_audio=min_num_audio,
 97        max_num_audio=max_num_audio,
 98    ):
 99        return {}
100    texts, transcripts, schema = parse_summary_sources(sources, mermaid=mermaid)
101    checksum = int(digest(sources, to_int=True))
102    ai_msg = Message(id=checksum, chat=Chat(id=checksum), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @{model} {texts or 'Summarize'}"))
103    summary = await ai_text_generation("fake", message=ai_msg, **schema)  # type: ignore
104    if not summary.get("texts", ""):
105        return {}
106    texts, mermaid_img_url, mermaid_pako_url = await parse_summary(summary["texts"])
107    page = AIPage(
108        title=title,
109        author=author,
110        url=url,
111        date=to_dt(date),
112        description=description,
113        summary=ContentExtraction.model_validate_json(summary["texts"]),
114        transcripts=transcripts,
115        mermaid_img=mermaid_img_url,
116        mermaid_url=mermaid_pako_url,
117    )
118    if telegraph_url := await telegraph_aipage(page, ttl=ttl, force_r2=force_r2_page):
119        summary["telegraph_url"] = telegraph_url
120        summary["texts"] = f"**🤖[AI导读]({telegraph_url})**\n" + texts
121    else:
122        summary["telegraph_url"] = None
123        summary["texts"] = "**🤖AI导读**\n" + texts
124    return summary
125
126
127async def parse_summary(texts: str) -> tuple[str, str, str]:
128    """Parse the summary JSON string.
129
130    Returns:
131        (summary_texts, mermaid_img_url, mermaid_pako_url)
132    """
133    img_url = ""
134    pako_url = ""
135    try:
136        summary = ContentExtraction.model_validate_json(texts)
137        if summary.mindmap:
138            mermaid = generate_mermaid(summary.mindmap)
139            img_url, pako_url = await publish_mermaid(mermaid)
140        parsed = f"{summary.overview}\n⚡️**章节速览**"
141        for section in summary.sections:
142            parsed += f"\n{section.emoji}**{section.title}**"
143            if section.start:
144                start = section.start.removeprefix("00:") if len(section.start) > 5 else section.start
145                parsed += f" [{start}]"
146            parsed += f"\n{section.content}"
147        logger.success(parsed)
148    except Exception as e:
149        logger.error(f"Error parsing summary: {e}")
150        return texts, "", ""
151    return parsed, img_url, pako_url
152
153
154def filter_sources(
155    sources: list[dict] | None,
156    skip_max_video_duration: float | None = None,  # skip max video duration if it is greater than this value
157    skip_max_audio_duration: float | None = None,  # skip max audio duration if it is greater than this value
158    skip_max_video_bytes: int | None = None,  # skip max video bytes if it is greater than this value
159    skip_max_audio_bytes: int | None = None,  # skip max audio bytes if it is greater than this value
160) -> list[dict]:
161    """Filter the sources by the given conditions.
162
163    Returns:
164        The filtered sources.
165    """
166    if not sources:
167        return []
168    filtered = []
169    for source in sources:
170        if skip_max_video_duration is not None and source["type"] == "video" and Path(source["path"]).is_file():
171            duration = get_media_duration(source["path"])
172            size = Path(source["path"]).stat().st_size
173            if duration > skip_max_video_duration:
174                logger.warning(f"Skip video {source['path']} due to duration {duration} > {skip_max_video_duration}")
175                continue
176            if isinstance(skip_max_video_bytes, int) and size > skip_max_video_bytes:
177                logger.warning(f"Skip video {source['path']} due to size {size} > {skip_max_video_bytes}")
178                continue
179        elif skip_max_audio_duration is not None and source["type"] == "audio" and Path(source["path"]).is_file():
180            duration = get_media_duration(source["path"])
181            size = Path(source["path"]).stat().st_size
182            if duration > skip_max_audio_duration:
183                logger.warning(f"Skip audio {source['path']} due to duration {duration} > {skip_max_audio_duration}")
184                continue
185            if isinstance(skip_max_audio_bytes, int) and size > skip_max_audio_bytes:
186                logger.warning(f"Skip audio {source['path']} due to size {size} > {skip_max_audio_bytes}")
187                continue
188        filtered.append(source)
189    return filtered
190
191
192def is_eligible(
193    sources: list[dict],
194    *,
195    min_text_length: int | None = None,  # minimum text length to summarize
196    min_audio_duration: float | None = None,  # minimum audio duration to summarize
197    min_video_duration: float | None = None,  # minimum video duration to summarize
198    max_audio_duration: float | None = None,  # maximum audio duration to summarize
199    max_video_duration: float | None = None,  # maximum video duration to summarize
200    max_audio_bytes: int | None = None,  # maximum audio bytes to summarize
201    max_video_bytes: int | None = None,  # maximum video bytes to summarize
202    min_num_image: int | None = None,  # minimum number of images to summarize
203    max_num_image: int | None = None,  # maximum number of images to summarize
204    min_num_video: int | None = None,  # minimum number of videos to summarize
205    max_num_video: int | None = None,  # maximum number of videos to summarize
206    min_num_audio: int | None = None,  # minimum number of audios to summarize
207    max_num_audio: int | None = None,  # maximum number of audios to summarize
208) -> bool:
209    r"""Check if the source is eligible for summarization.
210
211    Args:
212        sources (list[dict] | None): The sources to summary.
213        # text
214        {"type": "system_prompt", "text": "This is system prompt."}
215        {"type": "text", "text": "Hello."}
216        {"type": "text", "path": "/path/to/file.txt"}
217        {"type": "transcripts", "text": "[00:00] Hello\n[00:01] a sentence"}
218
219        # media
220        {"type": "image", "path": "/path/to/image.jpg", "mime_type (optional)": "image/jpeg"}
221        {"type": "video", "path": "/path/to/video.mp4", "mime_type (optional)": "video/mp4", "duration (optional)": 10.0, "size (optional)": 9999}
222        {"type": "audio", "path": "/path/to/audio.mp3", "mime_type (optional)": "audio/mpeg", "duration (optional)": 10.0, "size (optional)": 9999}
223
224        # file
225        {"type": "file", "path": "/path/to/file.pdf", "mime_type (optional)": "application/pdf"}
226
227    """
228    text_length = 0
229    audio_duration = 0
230    video_duration = 0
231    audio_bytes = 0
232    video_bytes = 0
233    num_image = 0
234    num_video = 0
235    num_audio = 0
236    # check text length
237    if isinstance(min_text_length, int):
238        for source in sources:
239            if source["type"] in ["text", "transcripts"] and source.get("text"):
240                text_length += count_subtitles(source["text"])
241            elif source["type"] == "text" and source.get("path"):
242                text_length += len(read_text(source["path"]))
243        if text_length < int(min_text_length):
244            logger.warning(f"Text length is too short: {text_length} < {min_text_length}")
245            return False
246
247    # check duration
248    if any(x is not None for x in [min_audio_duration, max_audio_duration, min_video_duration, max_video_duration]):
249        for source in sources:
250            if source["type"] == "audio" and Path(source["path"]).is_file():
251                audio_duration += get_media_duration(source["path"])
252            elif source["type"] == "video" and Path(source["path"]).is_file():
253                video_duration += get_media_duration(source["path"])
254
255        if min_video_duration is not None and video_duration < min_video_duration:
256            logger.warning(f"Video duration is too short: {video_duration} < {min_video_duration}")
257            return False
258        if max_video_duration is not None and video_duration > max_video_duration:
259            logger.warning(f"Video duration is too long: {video_duration} > {max_video_duration}")
260            return False
261        if min_audio_duration is not None and audio_duration < min_audio_duration:
262            logger.warning(f"Audio duration is too short: {audio_duration} < {min_audio_duration}")
263            return False
264        if max_audio_duration is not None and audio_duration > max_audio_duration:
265            logger.warning(f"Audio duration is too long: {audio_duration} > {max_audio_duration}")
266            return False
267
268    # check size
269    if isinstance(max_audio_bytes, int) or isinstance(max_video_bytes, int):
270        for source in sources:
271            if source["type"] == "audio" and Path(source["path"]).is_file():
272                audio_bytes += Path(source["path"]).stat().st_size
273            elif source["type"] == "video" and Path(source["path"]).is_file():
274                video_bytes += Path(source["path"]).stat().st_size
275
276        if isinstance(max_audio_bytes, int) and audio_bytes > max_audio_bytes:
277            logger.warning(f"Audio bytes is too large: {audio_bytes} > {max_audio_bytes}")
278            return False
279        if isinstance(max_video_bytes, int) and video_bytes > max_video_bytes:
280            logger.warning(f"Video bytes is too large: {video_bytes} > {max_video_bytes}")
281            return False
282
283    # check number of images, videos, and audios
284    if any(x is not None for x in [min_num_image, max_num_image, min_num_video, max_num_video, min_num_audio, max_num_audio]):
285        for source in sources:
286            if source["type"] == "image" and Path(source["path"]).is_file():
287                num_image += 1
288            elif source["type"] == "video" and Path(source["path"]).is_file():
289                num_video += 1
290            elif source["type"] == "audio" and Path(source["path"]).is_file():
291                num_audio += 1
292        if isinstance(min_num_image, int) and num_image < min_num_image:
293            logger.warning(f"Too few images to summarize: {num_image} < {min_num_image}")
294            return False
295        if isinstance(max_num_image, int) and num_image > max_num_image:
296            logger.warning(f"Too many images to summarize: {num_image} > {max_num_image}")
297            return False
298        if isinstance(min_num_video, int) and num_video < min_num_video:
299            logger.warning(f"Too few videos to summarize: {num_video} < {min_num_video}")
300            return False
301        if isinstance(max_num_video, int) and num_video > max_num_video:
302            logger.warning(f"Too many videos to summarize: {num_video} > {max_num_video}")
303            return False
304        if isinstance(min_num_audio, int) and num_audio < min_num_audio:
305            logger.warning(f"Too few audios to summarize: {num_audio} < {min_num_audio}")
306            return False
307        if isinstance(max_num_audio, int) and num_audio > max_num_audio:
308            logger.warning(f"Too many audios to summarize: {num_audio} > {max_num_audio}")
309            return False
310    return True