From 2294962d573653ab828ccaaeee8f91afefc8f7d4 Mon Sep 17 00:00:00 2001 From: Dom Date: Sat, 14 Feb 2026 12:17:06 +0000 Subject: [PATCH] Add job completion check and cleanup for duplicate entrance sets --- main.py | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/main.py b/main.py index facea1c..808db15 100644 --- a/main.py +++ b/main.py @@ -670,6 +670,30 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet): job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e)) +def is_completed_job_materialized(cur, s3: S3, cfg: AppConfig, uuid: str) -> bool: + job = job_get(cur, uuid) + if not job: + return False + if status_rank(job.get("status")) < status_rank("THUMB_OK"): + return False + + processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{uuid}.jpg" + thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{uuid}.jpg" + try: + s3.head(processed_key) + s3.head(thumb_key) + except Exception: + return False + return True + + +def cleanup_entrance_set(s3: S3, s: EntranceSet): + to_delete = [s.jpg_key, s.meta_key] + if s.cat_key: + to_delete.append(s.cat_key) + s3.delete_keys(to_delete) + + def run_once(cfg: AppConfig) -> int: s3c = S3(cfg) ready = collect_ready_sets(s3c, cfg) @@ -679,9 +703,16 @@ def run_once(cfg: AppConfig) -> int: with psycopg.connect(cfg.pg_dsn) as db: for s in ready: try: + skip_duplicate = False with db.transaction(): with db.cursor() as cur: - process_one(cur, s3c, cfg, s) + if is_completed_job_materialized(cur, s3c, cfg, s.uuid): + skip_duplicate = True + else: + process_one(cur, s3c, cfg, s) + if skip_duplicate: + cleanup_entrance_set(s3c, s) + print(f"[SKIP] duplicate already processed: {s.uuid}") except Exception as e: with db.transaction(): with db.cursor() as cur: