main
  1#!/venv/bin/python
  2# -*- coding: utf-8 -*-
  3import asyncio
  4import os
  5from collections import Counter
  6from pathlib import Path
  7
  8import feedparser
  9from glom import flatten, glom
 10from loguru import logger
 11from pyrogram.client import Client
 12from pyrogram.errors import FloodWait
 13
 14from config import cutter
 15from custom.rss import HEADERS
 16from database.d1 import insert_d1, query_d1
 17from database.turso import insert_statement, turso_exec, turso_parse_resp
 18from history.utils import TURSO_KWARGS
 19from networking import hx_req
 20
 21
 22async def dnkt_attendance(client: Client):
 23    """DNKT related."""
 24    if os.getenv("DNKT_ATTENDANCE_DISABLED", "0") == "1":
 25        return
 26    remote_content = await hx_req("https://dl.zydou.me/d/checkin.atom", rformat="text", headers=HEADERS, timeout=60, max_retry=0, silent=True)
 27    if not remote_content.get("text"):
 28        return
 29    parsed = feedparser.parse(remote_content["text"])  # do not parse feed url, because it doesn't support timeout.
 30    for entry in await get_new_entries(parsed["entries"]):  # old to latest
 31        if not entry["texts"]:
 32            continue
 33        logger.info(entry["标题"])
 34        await sync_dnkt_checkin_to_d1(entry)
 35        await sync_dnkt_checkin_to_turso(entry)
 36
 37        tids = {"补卡": 967, "请假": 961, "调休": 965, "加班": 963}
 38        await send_with_tid(client, entry["texts"], tids.get(entry["类型"], 1))
 39
 40
 41async def send_with_tid(client: Client, texts: str, tid: int):
 42    try:
 43        await client.send_message(
 44            chat_id=-1002820954593,
 45            text=texts,
 46            disable_notification=True,
 47            message_thread_id=tid,
 48        )
 49        await asyncio.sleep(3)
 50    except FloodWait as e:
 51        logger.warning(e)
 52        await asyncio.sleep(e.value)  # type: ignore
 53        return await send_with_tid(client, texts, tid)
 54
 55
 56async def get_new_entries(entries: list[dict]) -> list[dict]:
 57    """Get new entries from feed."""
 58    max_processed_id = await get_max_processed_id()
 59    try:
 60        new_entries = []
 61        for x in entries:
 62            guid = int(x["id"])
 63            if guid > max_processed_id:
 64                entry = await gen_caption(x["contributors"], link=x["link"])
 65                entry["db_key"] = f"TTL/30d/DNKT/考勤/{guid}"
 66                new_entries.append(entry)
 67    except Exception as e:
 68        logger.error(f"Failed to get new entries: {e}")
 69        new_entries = []
 70    return sorted(new_entries, key=lambda x: int(x["id"]), reverse=False)
 71
 72
 73async def get_max_processed_id() -> int:
 74    resp = await query_d1(sql="SELECT id FROM 考勤管理 ORDER BY id DESC LIMIT 1", db_name="dnkt", silent=True)
 75    max_id = int(glom(resp, "result.0.results.0.id", default=0))
 76    if not max_id:
 77        max_id = 0
 78        params = TURSO_KWARGS | {"db_name": "dnkt"}
 79        resp = await turso_exec(
 80            [
 81                {
 82                    "type": "execute",
 83                    "stmt": {"sql": "SELECT id FROM 考勤管理 ORDER BY id DESC LIMIT 1"},
 84                }
 85            ],
 86            retry=2,
 87            silent=True,
 88            **params,
 89        )
 90        max_id = int(glom(resp, "results.0.response.result.rows.0.0.value", default=0))
 91    return max_id
 92
 93
 94async def gen_caption(info_list: list[dict], link: str) -> dict:
 95    info = {
 96        "id": Path(link).stem,
 97        "类型": "",
 98        "姓名": "",
 99        "部门": "",
100        "审批人": "",
101        "日期": "",
102        "原因": "",
103        "创建日期": "",
104        "开始时间": "",
105        "结束时间": "",
106        "补卡时间": "",
107        "标题": "",
108    }  # employee info
109    info |= {x["href"]: x["name"] for x in info_list}  # merge employee info and checkin info
110
111    # guess department
112    if not info["部门"]:
113        department = await guess_department_from_d1(info["姓名"])
114        if not department:
115            department = await guess_department_from_turso(info["姓名"])
116        info["部门"] = department
117
118    texts = ""
119    category = info["类型"]
120    if "调休" in str(info):
121        category = "调休"
122        info["类型"] = "调休"
123    category_emoji = {"补卡": "💳", "请假": "", "调休": "🔄", "加班": "📚"}.get(category, "")
124    if info["部门"]:
125        texts += f"{category_emoji}{category}: #{info['部门']}"
126    else:
127        texts += f"{category_emoji}{category}"
128    if info["姓名"]:
129        texts += f"\n👤姓名: #{info['姓名']}"
130    if info["审批人"]:
131        texts += f"\n✏️审批: {info['审批人']}"
132    if info["日期"]:
133        texts += f"\n📅日期: [{info['日期']}]({link})"
134    texts += "\n🕒时间: "
135    if info["开始时间"]:
136        texts += f"{info['开始时间']}"
137    if info["补卡时间"] and not info["开始时间"] and not info["结束时间"]:
138        texts += f"{info['补卡时间']}"
139    if info["结束时间"]:
140        texts += f" - {info['结束时间']}"
141    if info["原因"]:
142        texts += f"\n📋原因: {info['原因']}"
143    info["texts"] = texts.strip()
144    return info
145
146
147async def sync_dnkt_checkin_to_d1(entry: dict) -> None:
148    """Sync checkin to D1 database."""
149    # from database.d1 import create_d1_table
150    # columns = "id INTEGER PRIMARY KEY, 类型 TEXT, 姓名 TEXT, 部门 TEXT, 审批人 TEXT, 日期 TEXT, 原因 TEXT, 开始时间 TEXT, 结束时间 TEXT, 补卡时间 TEXT, 创建日期 TEXT, 标题 TEXT, search_column TEXT"
151    # indexes = ["类型", "姓名", "部门", "审批人", "日期"]
152    # await create_d1_table(
153    #     "考勤管理",
154    #     columns,
155    #     idx_cols=indexes,
156    #     fts_on_col="id",
157    #     fts_index_col="search_column",
158    #     db_name="dnkt",
159    #     silent=True,
160    # )
161    records = {
162        "id": int(entry["id"]),
163        "类型": entry["类型"],
164        "姓名": entry["姓名"],
165        "部门": entry["部门"],
166        "审批人": entry["审批人"],
167        "日期": entry["日期"],
168        "原因": entry["原因"],
169        "创建日期": entry["创建日期"],
170        "开始时间": entry["开始时间"],
171        "结束时间": entry["结束时间"],
172        "补卡时间": entry["补卡时间"],
173        "标题": entry["标题"],
174        "search_column": " ".join(cutter.cutword(entry["标题"] + "\n" + entry["原因"])),
175    }
176    await query_d1(**insert_d1("考勤管理", records, update_on_conflict="id"), db_name="dnkt", silent=True)
177
178
179async def sync_dnkt_checkin_to_turso(entry: dict) -> None:
180    """Sync checkin to Turso database."""
181    # from database.turso import turso_create_table
182    # columns = "id INTEGER PRIMARY KEY, 类型 TEXT, 姓名 TEXT, 部门 TEXT, 审批人 TEXT, 日期 TEXT, 原因 TEXT, 开始时间 TEXT, 结束时间 TEXT, 补卡时间 TEXT, 创建日期 TEXT, 标题 TEXT, search_column TEXT"
183    # indexes = ["类型", "姓名", "部门", "审批人", "日期"]
184    params = TURSO_KWARGS | {"db_name": "dnkt"}
185    # await turso_create_table(
186    #     "考勤管理",
187    #     columns,
188    #     idx_cols=indexes,
189    #     fts_on_col="id",
190    #     fts_index_col="search_column",
191    #     **params,
192    # )
193    records = {
194        "id": int(entry["id"]),
195        "类型": entry["类型"],
196        "姓名": entry["姓名"],
197        "部门": entry["部门"],
198        "审批人": entry["审批人"],
199        "日期": entry["日期"],
200        "原因": entry["原因"],
201        "创建日期": entry["创建日期"],
202        "开始时间": entry["开始时间"],
203        "结束时间": entry["结束时间"],
204        "补卡时间": entry["补卡时间"],
205        "标题": entry["标题"],
206        "search_column": " ".join(cutter.cutword(entry["标题"] + "\n" + entry["原因"])),
207    }
208    await turso_exec([insert_statement("考勤管理", records, update_on_conflict="id")], retry=2, silent=True, **params)
209
210
211async def guess_department_from_turso(name: str) -> str:
212    if not name:
213        logger.warning("姓名为空")
214        return ""
215    params = TURSO_KWARGS | {"db_name": "dnkt"}
216    resp = await turso_exec(
217        [
218            {
219                "type": "execute",
220                "stmt": {"sql": f"SELECT 部门 FROM 考勤管理 WHERE 姓名 = '{name}' ORDER BY id DESC LIMIT 100"},
221            }
222        ],
223        retry=2,
224        silent=True,
225        **params,
226    )
227    values = glom(resp, "results.0.response.result.rows.**.value", default=[])
228    counter = Counter(values)
229    department = ""
230    for x in counter.most_common():
231        if x[0]:
232            department = x[0]
233            logger.success(f"猜测【{name}】的部门为【{department}")
234            return department
235    logger.warning(f"未找到【{name}】的部门")
236    return department
237
238
239async def guess_department_from_d1(name: str) -> str:
240    if not name:
241        logger.warning("姓名为空")
242        return ""
243    resp = await query_d1(sql=f"SELECT 部门 FROM 考勤管理 WHERE 姓名 = '{name}' ORDER BY id DESC LIMIT 100", db_name="dnkt", silent=True)
244    values = glom(resp, "result.0.results.*.部门", default=[])
245    counter = Counter(values)
246    department = ""
247    for x in counter.most_common():
248        if x[0]:
249            department = x[0]
250            logger.success(f"猜测【{name}】的部门为【{department}")
251            return department
252    logger.warning(f"未找到【{name}】的部门")
253    return department
254
255
256async def sync_d1_to_turso():
257    params = TURSO_KWARGS | {"db_name": "dnkt"}
258    resp = await turso_exec(
259        [
260            {
261                "type": "execute",
262                "stmt": {"sql": "SELECT * FROM 考勤管理"},
263            }
264        ],
265        retry=2,
266        silent=True,
267        **params,
268    )
269    turso_rows = turso_parse_resp(resp)
270    turso_ids = {x["id"] for x in turso_rows}
271    resp = await query_d1(sql="SELECT * FROM 考勤管理", db_name="dnkt", silent=True)
272    d1_rows = resp["result"][0]["results"]
273    statements = []
274    for row in sorted(d1_rows, key=lambda x: x["id"]):
275        if row["id"] in turso_ids:
276            continue
277        statements.append(insert_statement("考勤管理", row, update_on_conflict="id"))
278        if len(statements) == 1024:
279            resp = await turso_exec(statements, silent=True, retry=2, **params)
280            statements = []
281
282    if statements:
283        resp = await turso_exec(statements, silent=True, retry=2, **params)
284
285
286async def fill_missing_department_in_turso():
287    params = TURSO_KWARGS | {"db_name": "dnkt"}
288    resp = await turso_exec(
289        [
290            {
291                "type": "execute",
292                "stmt": {"sql": "SELECT * FROM 考勤管理"},
293            }
294        ],
295        retry=2,
296        silent=True,
297        **params,
298    )
299    rows = turso_parse_resp(resp)
300    tasks = []
301    for row in sorted(rows, key=lambda x: x["id"]):
302        if row["部门"]:
303            continue
304        row["部门"] = await guess_department_from_turso(row["姓名"])
305        if not row["部门"]:
306            continue
307        tasks.append(turso_exec([insert_statement("考勤管理", row, update_on_conflict="id")], retry=2, silent=True, **params))
308        if len(tasks) == 100:
309            resp = await asyncio.gather(*tasks, return_exceptions=True)
310            num_success = sum([1 for x in flatten(glom(resp, "*.results.*.type")) if x == "ok"]) // 2
311            logger.success(f"Synced {num_success} messages to Turso")
312            tasks = []
313
314    if tasks:
315        resp = await asyncio.gather(*tasks, return_exceptions=True)
316        num_success = sum([1 for x in flatten(glom(resp, "*.results.*.type")) if x == "ok"]) // 2
317        logger.success(f"Synced {num_success} messages to Turso")
318
319
320async def fill_missing_department_in_d1():
321    resp = await query_d1(sql="SELECT * FROM 考勤管理", db_name="dnkt", silent=True)
322    rows = resp["result"][0]["results"]
323    tasks = []
324    for row in sorted(rows, key=lambda x: x["id"]):
325        if row["部门"]:
326            continue
327        row["部门"] = await guess_department_from_d1(row["姓名"])
328        if not row["部门"]:
329            continue
330        tasks.append(query_d1(**insert_d1("考勤管理", row, update_on_conflict="id"), db_name="dnkt", silent=True))
331        if len(tasks) == 32:
332            resp = await asyncio.gather(*tasks, return_exceptions=True)
333            num_success = sum([1 for x in glom(resp, "*.success") if x is True])
334            logger.success(f"Synced {num_success} messages to Turso")
335            tasks = []
336
337    if tasks:
338        resp = await asyncio.gather(*tasks, return_exceptions=True)
339        num_success = sum([1 for x in glom(resp, "*.success") if x is True])
340        logger.success(f"Synced {num_success} messages to Turso")