main
1#!/venv/bin/python
2# -*- coding: utf-8 -*-
3import asyncio
4import os
5from collections import Counter
6from pathlib import Path
7
8import feedparser
9from glom import flatten, glom
10from loguru import logger
11from pyrogram.client import Client
12from pyrogram.errors import FloodWait
13
14from config import cutter
15from custom.rss import HEADERS
16from database.d1 import insert_d1, query_d1
17from database.turso import insert_statement, turso_exec, turso_parse_resp
18from history.utils import TURSO_KWARGS
19from networking import hx_req
20
21
22async def dnkt_attendance(client: Client):
23 """DNKT related."""
24 if os.getenv("DNKT_ATTENDANCE_DISABLED", "0") == "1":
25 return
26 remote_content = await hx_req("https://dl.zydou.me/d/checkin.atom", rformat="text", headers=HEADERS, timeout=60, max_retry=0, silent=True)
27 if not remote_content.get("text"):
28 return
29 parsed = feedparser.parse(remote_content["text"]) # do not parse feed url, because it doesn't support timeout.
30 for entry in await get_new_entries(parsed["entries"]): # old to latest
31 if not entry["texts"]:
32 continue
33 logger.info(entry["标题"])
34 await sync_dnkt_checkin_to_d1(entry)
35 await sync_dnkt_checkin_to_turso(entry)
36
37 tids = {"补卡": 967, "请假": 961, "调休": 965, "加班": 963}
38 await send_with_tid(client, entry["texts"], tids.get(entry["类型"], 1))
39
40
41async def send_with_tid(client: Client, texts: str, tid: int):
42 try:
43 await client.send_message(
44 chat_id=-1002820954593,
45 text=texts,
46 disable_notification=True,
47 message_thread_id=tid,
48 )
49 await asyncio.sleep(3)
50 except FloodWait as e:
51 logger.warning(e)
52 await asyncio.sleep(e.value) # type: ignore
53 return await send_with_tid(client, texts, tid)
54
55
56async def get_new_entries(entries: list[dict]) -> list[dict]:
57 """Get new entries from feed."""
58 max_processed_id = await get_max_processed_id()
59 try:
60 new_entries = []
61 for x in entries:
62 guid = int(x["id"])
63 if guid > max_processed_id:
64 entry = await gen_caption(x["contributors"], link=x["link"])
65 entry["db_key"] = f"TTL/30d/DNKT/考勤/{guid}"
66 new_entries.append(entry)
67 except Exception as e:
68 logger.error(f"Failed to get new entries: {e}")
69 new_entries = []
70 return sorted(new_entries, key=lambda x: int(x["id"]), reverse=False)
71
72
73async def get_max_processed_id() -> int:
74 resp = await query_d1(sql="SELECT id FROM 考勤管理 ORDER BY id DESC LIMIT 1", db_name="dnkt", silent=True)
75 max_id = int(glom(resp, "result.0.results.0.id", default=0))
76 if not max_id:
77 max_id = 0
78 params = TURSO_KWARGS | {"db_name": "dnkt"}
79 resp = await turso_exec(
80 [
81 {
82 "type": "execute",
83 "stmt": {"sql": "SELECT id FROM 考勤管理 ORDER BY id DESC LIMIT 1"},
84 }
85 ],
86 retry=2,
87 silent=True,
88 **params,
89 )
90 max_id = int(glom(resp, "results.0.response.result.rows.0.0.value", default=0))
91 return max_id
92
93
94async def gen_caption(info_list: list[dict], link: str) -> dict:
95 info = {
96 "id": Path(link).stem,
97 "类型": "",
98 "姓名": "",
99 "部门": "",
100 "审批人": "",
101 "日期": "",
102 "原因": "",
103 "创建日期": "",
104 "开始时间": "",
105 "结束时间": "",
106 "补卡时间": "",
107 "标题": "",
108 } # employee info
109 info |= {x["href"]: x["name"] for x in info_list} # merge employee info and checkin info
110
111 # guess department
112 if not info["部门"]:
113 department = await guess_department_from_d1(info["姓名"])
114 if not department:
115 department = await guess_department_from_turso(info["姓名"])
116 info["部门"] = department
117
118 texts = ""
119 category = info["类型"]
120 if "调休" in str(info):
121 category = "调休"
122 info["类型"] = "调休"
123 category_emoji = {"补卡": "💳", "请假": "❌", "调休": "🔄", "加班": "📚"}.get(category, "")
124 if info["部门"]:
125 texts += f"{category_emoji}{category}: #{info['部门']}"
126 else:
127 texts += f"{category_emoji}{category}"
128 if info["姓名"]:
129 texts += f"\n👤姓名: #{info['姓名']}"
130 if info["审批人"]:
131 texts += f"\n✏️审批: {info['审批人']}"
132 if info["日期"]:
133 texts += f"\n📅日期: [{info['日期']}]({link})"
134 texts += "\n🕒时间: "
135 if info["开始时间"]:
136 texts += f"{info['开始时间']}"
137 if info["补卡时间"] and not info["开始时间"] and not info["结束时间"]:
138 texts += f"{info['补卡时间']}"
139 if info["结束时间"]:
140 texts += f" - {info['结束时间']}"
141 if info["原因"]:
142 texts += f"\n📋原因: {info['原因']}"
143 info["texts"] = texts.strip()
144 return info
145
146
147async def sync_dnkt_checkin_to_d1(entry: dict) -> None:
148 """Sync checkin to D1 database."""
149 # from database.d1 import create_d1_table
150 # columns = "id INTEGER PRIMARY KEY, 类型 TEXT, 姓名 TEXT, 部门 TEXT, 审批人 TEXT, 日期 TEXT, 原因 TEXT, 开始时间 TEXT, 结束时间 TEXT, 补卡时间 TEXT, 创建日期 TEXT, 标题 TEXT, search_column TEXT"
151 # indexes = ["类型", "姓名", "部门", "审批人", "日期"]
152 # await create_d1_table(
153 # "考勤管理",
154 # columns,
155 # idx_cols=indexes,
156 # fts_on_col="id",
157 # fts_index_col="search_column",
158 # db_name="dnkt",
159 # silent=True,
160 # )
161 records = {
162 "id": int(entry["id"]),
163 "类型": entry["类型"],
164 "姓名": entry["姓名"],
165 "部门": entry["部门"],
166 "审批人": entry["审批人"],
167 "日期": entry["日期"],
168 "原因": entry["原因"],
169 "创建日期": entry["创建日期"],
170 "开始时间": entry["开始时间"],
171 "结束时间": entry["结束时间"],
172 "补卡时间": entry["补卡时间"],
173 "标题": entry["标题"],
174 "search_column": " ".join(cutter.cutword(entry["标题"] + "\n" + entry["原因"])),
175 }
176 await query_d1(**insert_d1("考勤管理", records, update_on_conflict="id"), db_name="dnkt", silent=True)
177
178
179async def sync_dnkt_checkin_to_turso(entry: dict) -> None:
180 """Sync checkin to Turso database."""
181 # from database.turso import turso_create_table
182 # columns = "id INTEGER PRIMARY KEY, 类型 TEXT, 姓名 TEXT, 部门 TEXT, 审批人 TEXT, 日期 TEXT, 原因 TEXT, 开始时间 TEXT, 结束时间 TEXT, 补卡时间 TEXT, 创建日期 TEXT, 标题 TEXT, search_column TEXT"
183 # indexes = ["类型", "姓名", "部门", "审批人", "日期"]
184 params = TURSO_KWARGS | {"db_name": "dnkt"}
185 # await turso_create_table(
186 # "考勤管理",
187 # columns,
188 # idx_cols=indexes,
189 # fts_on_col="id",
190 # fts_index_col="search_column",
191 # **params,
192 # )
193 records = {
194 "id": int(entry["id"]),
195 "类型": entry["类型"],
196 "姓名": entry["姓名"],
197 "部门": entry["部门"],
198 "审批人": entry["审批人"],
199 "日期": entry["日期"],
200 "原因": entry["原因"],
201 "创建日期": entry["创建日期"],
202 "开始时间": entry["开始时间"],
203 "结束时间": entry["结束时间"],
204 "补卡时间": entry["补卡时间"],
205 "标题": entry["标题"],
206 "search_column": " ".join(cutter.cutword(entry["标题"] + "\n" + entry["原因"])),
207 }
208 await turso_exec([insert_statement("考勤管理", records, update_on_conflict="id")], retry=2, silent=True, **params)
209
210
211async def guess_department_from_turso(name: str) -> str:
212 if not name:
213 logger.warning("姓名为空")
214 return ""
215 params = TURSO_KWARGS | {"db_name": "dnkt"}
216 resp = await turso_exec(
217 [
218 {
219 "type": "execute",
220 "stmt": {"sql": f"SELECT 部门 FROM 考勤管理 WHERE 姓名 = '{name}' ORDER BY id DESC LIMIT 100"},
221 }
222 ],
223 retry=2,
224 silent=True,
225 **params,
226 )
227 values = glom(resp, "results.0.response.result.rows.**.value", default=[])
228 counter = Counter(values)
229 department = ""
230 for x in counter.most_common():
231 if x[0]:
232 department = x[0]
233 logger.success(f"猜测【{name}】的部门为【{department}】")
234 return department
235 logger.warning(f"未找到【{name}】的部门")
236 return department
237
238
239async def guess_department_from_d1(name: str) -> str:
240 if not name:
241 logger.warning("姓名为空")
242 return ""
243 resp = await query_d1(sql=f"SELECT 部门 FROM 考勤管理 WHERE 姓名 = '{name}' ORDER BY id DESC LIMIT 100", db_name="dnkt", silent=True)
244 values = glom(resp, "result.0.results.*.部门", default=[])
245 counter = Counter(values)
246 department = ""
247 for x in counter.most_common():
248 if x[0]:
249 department = x[0]
250 logger.success(f"猜测【{name}】的部门为【{department}】")
251 return department
252 logger.warning(f"未找到【{name}】的部门")
253 return department
254
255
256async def sync_d1_to_turso():
257 params = TURSO_KWARGS | {"db_name": "dnkt"}
258 resp = await turso_exec(
259 [
260 {
261 "type": "execute",
262 "stmt": {"sql": "SELECT * FROM 考勤管理"},
263 }
264 ],
265 retry=2,
266 silent=True,
267 **params,
268 )
269 turso_rows = turso_parse_resp(resp)
270 turso_ids = {x["id"] for x in turso_rows}
271 resp = await query_d1(sql="SELECT * FROM 考勤管理", db_name="dnkt", silent=True)
272 d1_rows = resp["result"][0]["results"]
273 statements = []
274 for row in sorted(d1_rows, key=lambda x: x["id"]):
275 if row["id"] in turso_ids:
276 continue
277 statements.append(insert_statement("考勤管理", row, update_on_conflict="id"))
278 if len(statements) == 1024:
279 resp = await turso_exec(statements, silent=True, retry=2, **params)
280 statements = []
281
282 if statements:
283 resp = await turso_exec(statements, silent=True, retry=2, **params)
284
285
286async def fill_missing_department_in_turso():
287 params = TURSO_KWARGS | {"db_name": "dnkt"}
288 resp = await turso_exec(
289 [
290 {
291 "type": "execute",
292 "stmt": {"sql": "SELECT * FROM 考勤管理"},
293 }
294 ],
295 retry=2,
296 silent=True,
297 **params,
298 )
299 rows = turso_parse_resp(resp)
300 tasks = []
301 for row in sorted(rows, key=lambda x: x["id"]):
302 if row["部门"]:
303 continue
304 row["部门"] = await guess_department_from_turso(row["姓名"])
305 if not row["部门"]:
306 continue
307 tasks.append(turso_exec([insert_statement("考勤管理", row, update_on_conflict="id")], retry=2, silent=True, **params))
308 if len(tasks) == 100:
309 resp = await asyncio.gather(*tasks, return_exceptions=True)
310 num_success = sum([1 for x in flatten(glom(resp, "*.results.*.type")) if x == "ok"]) // 2
311 logger.success(f"Synced {num_success} messages to Turso")
312 tasks = []
313
314 if tasks:
315 resp = await asyncio.gather(*tasks, return_exceptions=True)
316 num_success = sum([1 for x in flatten(glom(resp, "*.results.*.type")) if x == "ok"]) // 2
317 logger.success(f"Synced {num_success} messages to Turso")
318
319
320async def fill_missing_department_in_d1():
321 resp = await query_d1(sql="SELECT * FROM 考勤管理", db_name="dnkt", silent=True)
322 rows = resp["result"][0]["results"]
323 tasks = []
324 for row in sorted(rows, key=lambda x: x["id"]):
325 if row["部门"]:
326 continue
327 row["部门"] = await guess_department_from_d1(row["姓名"])
328 if not row["部门"]:
329 continue
330 tasks.append(query_d1(**insert_d1("考勤管理", row, update_on_conflict="id"), db_name="dnkt", silent=True))
331 if len(tasks) == 32:
332 resp = await asyncio.gather(*tasks, return_exceptions=True)
333 num_success = sum([1 for x in glom(resp, "*.success") if x is True])
334 logger.success(f"Synced {num_success} messages to Turso")
335 tasks = []
336
337 if tasks:
338 resp = await asyncio.gather(*tasks, return_exceptions=True)
339 num_success = sum([1 for x in glom(resp, "*.success") if x is True])
340 logger.success(f"Synced {num_success} messages to Turso")