Add job completion check and cleanup for duplicate entrance sets
This commit is contained in:
33
main.py
33
main.py
@@ -670,6 +670,30 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
|
|||||||
job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e))
|
job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
def is_completed_job_materialized(cur, s3: S3, cfg: AppConfig, uuid: str) -> bool:
|
||||||
|
job = job_get(cur, uuid)
|
||||||
|
if not job:
|
||||||
|
return False
|
||||||
|
if status_rank(job.get("status")) < status_rank("THUMB_OK"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{uuid}.jpg"
|
||||||
|
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{uuid}.jpg"
|
||||||
|
try:
|
||||||
|
s3.head(processed_key)
|
||||||
|
s3.head(thumb_key)
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_entrance_set(s3: S3, s: EntranceSet):
|
||||||
|
to_delete = [s.jpg_key, s.meta_key]
|
||||||
|
if s.cat_key:
|
||||||
|
to_delete.append(s.cat_key)
|
||||||
|
s3.delete_keys(to_delete)
|
||||||
|
|
||||||
|
|
||||||
def run_once(cfg: AppConfig) -> int:
|
def run_once(cfg: AppConfig) -> int:
|
||||||
s3c = S3(cfg)
|
s3c = S3(cfg)
|
||||||
ready = collect_ready_sets(s3c, cfg)
|
ready = collect_ready_sets(s3c, cfg)
|
||||||
@@ -679,9 +703,16 @@ def run_once(cfg: AppConfig) -> int:
|
|||||||
with psycopg.connect(cfg.pg_dsn) as db:
|
with psycopg.connect(cfg.pg_dsn) as db:
|
||||||
for s in ready:
|
for s in ready:
|
||||||
try:
|
try:
|
||||||
|
skip_duplicate = False
|
||||||
with db.transaction():
|
with db.transaction():
|
||||||
with db.cursor() as cur:
|
with db.cursor() as cur:
|
||||||
process_one(cur, s3c, cfg, s)
|
if is_completed_job_materialized(cur, s3c, cfg, s.uuid):
|
||||||
|
skip_duplicate = True
|
||||||
|
else:
|
||||||
|
process_one(cur, s3c, cfg, s)
|
||||||
|
if skip_duplicate:
|
||||||
|
cleanup_entrance_set(s3c, s)
|
||||||
|
print(f"[SKIP] duplicate already processed: {s.uuid}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with db.transaction():
|
with db.transaction():
|
||||||
with db.cursor() as cur:
|
with db.cursor() as cur:
|
||||||
|
|||||||
Reference in New Issue
Block a user