main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3from datetime import datetime
4from pathlib import Path
5
6from loguru import logger
7from pyrogram.types import Chat, Message
8from pyrogram.types.messages_and_media.message import Str
9
10from ai.main import ai_text_generation
11from asr.utils import audio_duration as get_media_duration
12from config import PREFIX
13from publish import telegraph_aipage
14from schema import AIPage, ContentExtraction
15from summarize.utils import generate_mermaid, parse_summary_sources, publish_mermaid
16from utils import count_subtitles, digest, read_text, to_dt
17
18
19async def summarize(
20 sources: list[dict] | None = None,
21 model: str = "gemini",
22 title: str | None = None,
23 author: str | None = None,
24 url: str | None = None,
25 date: str | datetime | None = None,
26 description: str | dict | None = None,
27 ttl: str | None = None,
28 *,
29 force_r2_page: bool = False,
30 mermaid: bool = False,
31 min_text_length: int | None = None, # minimum text length to summarize
32 min_audio_duration: float | None = None, # minimum audio duration to summarize
33 min_video_duration: float | None = None, # minimum video duration to summarize
34 max_audio_duration: float | None = None, # maximum audio duration to summarize
35 max_video_duration: float | None = None, # maximum video duration to summarize
36 max_audio_bytes: int | None = None, # maximum audio bytes to summarize
37 max_video_bytes: int | None = None, # maximum video bytes to summarize
38 min_num_image: int | None = None, # minimum number of images to summarize
39 max_num_image: int | None = None, # maximum number of images to summarize
40 min_num_video: int | None = None, # minimum number of videos to summarize
41 max_num_video: int | None = None, # maximum number of videos to summarize
42 min_num_audio: int | None = None, # minimum number of audios to summarize
43 max_num_audio: int | None = None, # maximum number of audios to summarize
44 skip_max_video_duration: float | None = None, # skip max video duration if it is greater than this value
45 skip_max_audio_duration: float | None = None, # skip max audio duration if it is greater than this value
46 skip_max_video_bytes: int | None = None, # skip max video bytes if it is greater than this value
47 skip_max_audio_bytes: int | None = None, # skip max audio bytes if it is greater than this value
48) -> dict:
49 r"""Summarize the article or transcripts.
50
51 Args:
52 sources (list[dict] | None): The sources to summary.
53 # text
54 {"type": "system_prompt", "text": "This is system prompt."}
55 {"type": "text", "text": "Hello."}
56 {"type": "text", "path": "/path/to/file.txt"}
57 {"type": "transcripts", "text": "[00:00] Hello\n[00:01] a sentence"}
58
59 # media
60 {"type": "image", "path": "/path/to/image.jpg", "mime_type": "image/jpeg"}
61 {"type": "video", "path": "/path/to/video.mp4", "mime_type": "video/mp4"}
62 {"type": "audio", "path": "/path/to/audio.mp3", "mime_type": "audio/mpeg"}
63
64 # file
65 {"type": "file", "path": "/path/to/file.pdf", "mime_type": "application/pdf"}
66
67 # special
68 {"type": "youtube", "url": "https://www.youtube.com/watch?v=videoid"}
69
70 Returns:
71 The summary.
72 """
73 sources = filter_sources(
74 sources,
75 skip_max_video_duration=skip_max_video_duration,
76 skip_max_audio_duration=skip_max_audio_duration,
77 skip_max_video_bytes=skip_max_video_bytes,
78 skip_max_audio_bytes=skip_max_audio_bytes,
79 )
80 if not sources:
81 return {}
82 title = title or "AI导读"
83 if not is_eligible(
84 sources,
85 min_text_length=min_text_length,
86 min_audio_duration=min_audio_duration,
87 min_video_duration=min_video_duration,
88 max_audio_duration=max_audio_duration,
89 max_video_duration=max_video_duration,
90 max_audio_bytes=max_audio_bytes,
91 max_video_bytes=max_video_bytes,
92 min_num_image=min_num_image,
93 max_num_image=max_num_image,
94 min_num_video=min_num_video,
95 max_num_video=max_num_video,
96 min_num_audio=min_num_audio,
97 max_num_audio=max_num_audio,
98 ):
99 return {}
100 texts, transcripts, schema = parse_summary_sources(sources, mermaid=mermaid)
101 checksum = int(digest(sources, to_int=True))
102 ai_msg = Message(id=checksum, chat=Chat(id=checksum), text=Str(f"{PREFIX.AI_TEXT_GENERATION} @{model} {texts or 'Summarize'}"))
103 summary = await ai_text_generation("fake", message=ai_msg, **schema) # type: ignore
104 if not summary.get("texts", ""):
105 return {}
106 texts, mermaid_img_url, mermaid_pako_url = await parse_summary(summary["texts"])
107 page = AIPage(
108 title=title,
109 author=author,
110 url=url,
111 date=to_dt(date),
112 description=description,
113 summary=ContentExtraction.model_validate_json(summary["texts"]),
114 transcripts=transcripts,
115 mermaid_img=mermaid_img_url,
116 mermaid_url=mermaid_pako_url,
117 )
118 if telegraph_url := await telegraph_aipage(page, ttl=ttl, force_r2=force_r2_page):
119 summary["telegraph_url"] = telegraph_url
120 summary["texts"] = f"**🤖[AI导读]({telegraph_url})**\n" + texts
121 else:
122 summary["telegraph_url"] = None
123 summary["texts"] = "**🤖AI导读**\n" + texts
124 return summary
125
126
127async def parse_summary(texts: str) -> tuple[str, str, str]:
128 """Parse the summary JSON string.
129
130 Returns:
131 (summary_texts, mermaid_img_url, mermaid_pako_url)
132 """
133 img_url = ""
134 pako_url = ""
135 try:
136 summary = ContentExtraction.model_validate_json(texts)
137 if summary.mindmap:
138 mermaid = generate_mermaid(summary.mindmap)
139 img_url, pako_url = await publish_mermaid(mermaid)
140 parsed = f"{summary.overview}\n⚡️**章节速览**"
141 for section in summary.sections:
142 parsed += f"\n{section.emoji}**{section.title}**"
143 if section.start:
144 start = section.start.removeprefix("00:") if len(section.start) > 5 else section.start
145 parsed += f" [{start}]"
146 parsed += f"\n{section.content}"
147 logger.success(parsed)
148 except Exception as e:
149 logger.error(f"Error parsing summary: {e}")
150 return texts, "", ""
151 return parsed, img_url, pako_url
152
153
154def filter_sources(
155 sources: list[dict] | None,
156 skip_max_video_duration: float | None = None, # skip max video duration if it is greater than this value
157 skip_max_audio_duration: float | None = None, # skip max audio duration if it is greater than this value
158 skip_max_video_bytes: int | None = None, # skip max video bytes if it is greater than this value
159 skip_max_audio_bytes: int | None = None, # skip max audio bytes if it is greater than this value
160) -> list[dict]:
161 """Filter the sources by the given conditions.
162
163 Returns:
164 The filtered sources.
165 """
166 if not sources:
167 return []
168 filtered = []
169 for source in sources:
170 if skip_max_video_duration is not None and source["type"] == "video" and Path(source["path"]).is_file():
171 duration = get_media_duration(source["path"])
172 size = Path(source["path"]).stat().st_size
173 if duration > skip_max_video_duration:
174 logger.warning(f"Skip video {source['path']} due to duration {duration} > {skip_max_video_duration}")
175 continue
176 if isinstance(skip_max_video_bytes, int) and size > skip_max_video_bytes:
177 logger.warning(f"Skip video {source['path']} due to size {size} > {skip_max_video_bytes}")
178 continue
179 elif skip_max_audio_duration is not None and source["type"] == "audio" and Path(source["path"]).is_file():
180 duration = get_media_duration(source["path"])
181 size = Path(source["path"]).stat().st_size
182 if duration > skip_max_audio_duration:
183 logger.warning(f"Skip audio {source['path']} due to duration {duration} > {skip_max_audio_duration}")
184 continue
185 if isinstance(skip_max_audio_bytes, int) and size > skip_max_audio_bytes:
186 logger.warning(f"Skip audio {source['path']} due to size {size} > {skip_max_audio_bytes}")
187 continue
188 filtered.append(source)
189 return filtered
190
191
192def is_eligible(
193 sources: list[dict],
194 *,
195 min_text_length: int | None = None, # minimum text length to summarize
196 min_audio_duration: float | None = None, # minimum audio duration to summarize
197 min_video_duration: float | None = None, # minimum video duration to summarize
198 max_audio_duration: float | None = None, # maximum audio duration to summarize
199 max_video_duration: float | None = None, # maximum video duration to summarize
200 max_audio_bytes: int | None = None, # maximum audio bytes to summarize
201 max_video_bytes: int | None = None, # maximum video bytes to summarize
202 min_num_image: int | None = None, # minimum number of images to summarize
203 max_num_image: int | None = None, # maximum number of images to summarize
204 min_num_video: int | None = None, # minimum number of videos to summarize
205 max_num_video: int | None = None, # maximum number of videos to summarize
206 min_num_audio: int | None = None, # minimum number of audios to summarize
207 max_num_audio: int | None = None, # maximum number of audios to summarize
208) -> bool:
209 r"""Check if the source is eligible for summarization.
210
211 Args:
212 sources (list[dict] | None): The sources to summary.
213 # text
214 {"type": "system_prompt", "text": "This is system prompt."}
215 {"type": "text", "text": "Hello."}
216 {"type": "text", "path": "/path/to/file.txt"}
217 {"type": "transcripts", "text": "[00:00] Hello\n[00:01] a sentence"}
218
219 # media
220 {"type": "image", "path": "/path/to/image.jpg", "mime_type (optional)": "image/jpeg"}
221 {"type": "video", "path": "/path/to/video.mp4", "mime_type (optional)": "video/mp4", "duration (optional)": 10.0, "size (optional)": 9999}
222 {"type": "audio", "path": "/path/to/audio.mp3", "mime_type (optional)": "audio/mpeg", "duration (optional)": 10.0, "size (optional)": 9999}
223
224 # file
225 {"type": "file", "path": "/path/to/file.pdf", "mime_type (optional)": "application/pdf"}
226
227 """
228 text_length = 0
229 audio_duration = 0
230 video_duration = 0
231 audio_bytes = 0
232 video_bytes = 0
233 num_image = 0
234 num_video = 0
235 num_audio = 0
236 # check text length
237 if isinstance(min_text_length, int):
238 for source in sources:
239 if source["type"] in ["text", "transcripts"] and source.get("text"):
240 text_length += count_subtitles(source["text"])
241 elif source["type"] == "text" and source.get("path"):
242 text_length += len(read_text(source["path"]))
243 if text_length < int(min_text_length):
244 logger.warning(f"Text length is too short: {text_length} < {min_text_length}")
245 return False
246
247 # check duration
248 if any(x is not None for x in [min_audio_duration, max_audio_duration, min_video_duration, max_video_duration]):
249 for source in sources:
250 if source["type"] == "audio" and Path(source["path"]).is_file():
251 audio_duration += get_media_duration(source["path"])
252 elif source["type"] == "video" and Path(source["path"]).is_file():
253 video_duration += get_media_duration(source["path"])
254
255 if min_video_duration is not None and video_duration < min_video_duration:
256 logger.warning(f"Video duration is too short: {video_duration} < {min_video_duration}")
257 return False
258 if max_video_duration is not None and video_duration > max_video_duration:
259 logger.warning(f"Video duration is too long: {video_duration} > {max_video_duration}")
260 return False
261 if min_audio_duration is not None and audio_duration < min_audio_duration:
262 logger.warning(f"Audio duration is too short: {audio_duration} < {min_audio_duration}")
263 return False
264 if max_audio_duration is not None and audio_duration > max_audio_duration:
265 logger.warning(f"Audio duration is too long: {audio_duration} > {max_audio_duration}")
266 return False
267
268 # check size
269 if isinstance(max_audio_bytes, int) or isinstance(max_video_bytes, int):
270 for source in sources:
271 if source["type"] == "audio" and Path(source["path"]).is_file():
272 audio_bytes += Path(source["path"]).stat().st_size
273 elif source["type"] == "video" and Path(source["path"]).is_file():
274 video_bytes += Path(source["path"]).stat().st_size
275
276 if isinstance(max_audio_bytes, int) and audio_bytes > max_audio_bytes:
277 logger.warning(f"Audio bytes is too large: {audio_bytes} > {max_audio_bytes}")
278 return False
279 if isinstance(max_video_bytes, int) and video_bytes > max_video_bytes:
280 logger.warning(f"Video bytes is too large: {video_bytes} > {max_video_bytes}")
281 return False
282
283 # check number of images, videos, and audios
284 if any(x is not None for x in [min_num_image, max_num_image, min_num_video, max_num_video, min_num_audio, max_num_audio]):
285 for source in sources:
286 if source["type"] == "image" and Path(source["path"]).is_file():
287 num_image += 1
288 elif source["type"] == "video" and Path(source["path"]).is_file():
289 num_video += 1
290 elif source["type"] == "audio" and Path(source["path"]).is_file():
291 num_audio += 1
292 if isinstance(min_num_image, int) and num_image < min_num_image:
293 logger.warning(f"Too few images to summarize: {num_image} < {min_num_image}")
294 return False
295 if isinstance(max_num_image, int) and num_image > max_num_image:
296 logger.warning(f"Too many images to summarize: {num_image} > {max_num_image}")
297 return False
298 if isinstance(min_num_video, int) and num_video < min_num_video:
299 logger.warning(f"Too few videos to summarize: {num_video} < {min_num_video}")
300 return False
301 if isinstance(max_num_video, int) and num_video > max_num_video:
302 logger.warning(f"Too many videos to summarize: {num_video} > {max_num_video}")
303 return False
304 if isinstance(min_num_audio, int) and num_audio < min_num_audio:
305 logger.warning(f"Too few audios to summarize: {num_audio} < {min_num_audio}")
306 return False
307 if isinstance(max_num_audio, int) and num_audio > max_num_audio:
308 logger.warning(f"Too many audios to summarize: {num_audio} > {max_num_audio}")
309 return False
310 return True