# main.py #!/usr/bin/env python3 import os import re import json import time import tempfile import subprocess from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional, Dict, Tuple import boto3 import psycopg from dateutil import tz from PIL import Image, ImageOps, ImageEnhance import pytesseract import yaml if os.environ.get("DEBUG") == "1": import debugpy debugpy.listen(("0.0.0.0", 5678)) print("Waiting for debugger...") debugpy.wait_for_client() # ----------------------- # Config # ----------------------- UUID_RE = re.compile(r"^([0-9a-fA-F-]{36})\.(jpg|json)$") CAT_RE = re.compile(r"^([0-9a-fA-F-]{36})_categories\.json$") # Overlay fixed: "HH:MM:SS DD-MM-YYYY" OCR_DT_RE = re.compile( r"(?P\d{2}):(?P\d{2}):(?P\d{2})\s+(?P\d{2})-(?P\d{2})-(?P\d{4})" ) OCR_DT_RE_FLEX = re.compile( r"(?P\d{2}):(?P\d{2}):(?P\d{2})\s*(?P\d{2})[-.](?P\d{2})[-.](?P\d{4})" ) @dataclass class AppConfig: s3_endpoint: str s3_access_key: str s3_secret_key: str s3_bucket: str pg_dsn: str entrance_prefix: str = "icu/entrance/" processed_prefix: str = "icu/processed/" thumb_prefix: str = "icu/thumbnails/" min_age_seconds: int = 90 poll_seconds: int = 30 # OCR crop settings (tune if needed) ocr_crop_w_frac: float = 0.42 # right ~42% of width ocr_crop_h_frac: float = 0.22 # bottom ~22% of height thumb_max: int = 512 exiftool_timeout_seconds: int = 15 job_sleep_seconds: float = 1.0 def load_config(path: str) -> AppConfig: with open(path, "r", encoding="utf-8") as f: raw = yaml.safe_load(f) # allow env override for secret injection if desired def env_or(key: str, default=None): return os.environ.get(key, default) s3 = raw["s3"] pg = raw["postgres"] app = raw.get("app", {}) return AppConfig( s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]), s3_access_key=env_or("S3_ACCESS_KEY", s3["access_key"]), s3_secret_key=env_or("S3_SECRET_KEY", s3["secret_key"]), s3_bucket=env_or("S3_BUCKET", s3["bucket"]), pg_dsn=env_or("PG_DSN", pg["dsn"]), entrance_prefix=app.get("entrance_prefix", "icu/entrance/"), processed_prefix=app.get("processed_prefix", "icu/processed/"), thumb_prefix=app.get("thumb_prefix", "icu/thumbnails/"), min_age_seconds=int(app.get("min_age_seconds", 90)), poll_seconds=int(app.get("poll_seconds", 30)), ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)), ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)), thumb_max=int(app.get("thumb_max", 512)), exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)), job_sleep_seconds=float(app.get("job_sleep_seconds", 1.0)), ) # ----------------------- # Helpers # ----------------------- def is_old_enough(last_modified, min_age_seconds: int) -> bool: now = datetime.now(timezone.utc) return (now - last_modified).total_seconds() >= min_age_seconds def to_exif_datetime(dt: datetime) -> str: return dt.strftime("%Y:%m:%d %H:%M:%S") def tz_offset_str(dt: datetime) -> str: off = dt.utcoffset() if off is None: off = timezone.utc.utcoffset(dt) total = int(off.total_seconds()) sign = "+" if total >= 0 else "-" total = abs(total) hh = total // 3600 mm = (total % 3600) // 60 return f"{sign}{hh:02d}:{mm:02d}" def parse_servertime(meta: dict) -> datetime: s = meta.get("servertime") if not s: raise ValueError("metadata.servertime missing") # Example: "2025-06-30T04:03:04+0000" if re.match(r".*[+-]\d{4}$", s): return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z") # Fallback: ISO-ish return datetime.fromisoformat(s) def parse_customer_tz(meta: dict): # CustomerTimezone applies to OCR (camera overlay) name = meta.get("CustomerTimezone") or "UTC" z = tz.gettz(name) if z is None: raise ValueError(f"Unknown CustomerTimezone: {name}") return z, name def ocr_extract_timestamp(jpg_bytes: bytes, crop_w_frac: float, crop_h_frac: float, customer_tzinfo) -> datetime: from io import BytesIO im = Image.open(BytesIO(jpg_bytes)).convert("RGB") w, h = im.size x0 = int(w * (1.0 - crop_w_frac)) y0 = int(h * (1.0 - crop_h_frac)) crop = im.crop((x0, y0, w, h)) gray = ImageOps.grayscale(crop) gray = ImageOps.autocontrast(gray) gray = ImageEnhance.Contrast(gray).enhance(2.5) gray = ImageEnhance.Sharpness(gray).enhance(2.0) bw = gray.point(lambda p: 255 if p > 180 else 0) cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- ' text = pytesseract.image_to_string(bw, config=cfg).strip() # retry without thresholding text2 = pytesseract.image_to_string(gray, config=cfg).strip() dt_local_naive = _parse_ocr_datetime(text, text2) # Keep camera-local timezone (no UTC conversion requested) return dt_local_naive.replace(tzinfo=customer_tzinfo) def _parse_ocr_datetime(*texts: str) -> datetime: def build_dt(h, m, s, d, mo, y): return datetime(int(y), int(mo), int(d), int(h), int(m), int(s)) def valid_dt(h, m, s, d, mo, y) -> bool: try: build_dt(h, m, s, d, mo, y) return True except Exception: return False # 1) direct regex matches for t in texts: for rx in (OCR_DT_RE, OCR_DT_RE_FLEX): m = rx.search(t) if m and valid_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y")): return build_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y")) # 2) find time and date separately (handles missing space between time and date) time_rx = re.compile(r"(?P\d{2}):(?P\d{2}):(?P\d{2})") date_rx = re.compile(r"(?P\d{2})[-.](?P\d{2})[-.](?P\d{4})") for t in texts: tm = time_rx.search(t) dm = date_rx.search(t) if tm and dm and valid_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y")): return build_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y")) # 3) digits-only fallback: scan for HHMMSSDDMMYYYY for t in texts: digits = re.sub(r"\D", "", t) for i in range(0, max(0, len(digits) - 13)): chunk = digits[i:i + 14] if len(chunk) < 14: continue h, m, s = chunk[0:2], chunk[2:4], chunk[4:6] d, mo, y = chunk[6:8], chunk[8:10], chunk[10:14] if valid_dt(h, m, s, d, mo, y): return build_dt(h, m, s, d, mo, y) raise ValueError(f"OCR timestamp not found. OCR1='{texts[0] if texts else ''}' OCR2='{texts[1] if len(texts) > 1 else ''}'") def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime, *, timeout_seconds: int) -> bytes: """ DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00 """ dto = to_exif_datetime(dt_original_local) oto = tz_offset_str(dt_original_local) dtd = to_exif_datetime(dt_digitized_utc.astimezone(timezone.utc)) otd = "+00:00" with tempfile.TemporaryDirectory() as td: in_path = os.path.join(td, "in.jpg") with open(in_path, "wb") as f: f.write(jpg_in) cmd = [ "exiftool", "-overwrite_original", f"-DateTimeOriginal={dto}", f"-OffsetTimeOriginal={oto}", f"-DateTimeDigitized={dtd}", f"-OffsetTimeDigitized={otd}", f"-ModifyDate={dtd}", f"-OffsetTime={otd}", in_path, ] subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_seconds) with open(in_path, "rb") as f: return f.read() def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes: from io import BytesIO im = Image.open(BytesIO(jpg_bytes)).convert("RGB") im.thumbnail((thumb_max, thumb_max)) out = BytesIO() im.save(out, format="JPEG", quality=85, optimize=True) return out.getvalue() # ----------------------- # DB ops # ----------------------- def db_init(cur): cur.execute(""" CREATE TABLE IF NOT EXISTS remote_cam.import_job ( image_uuid text PRIMARY KEY, status text NOT NULL DEFAULT 'NEW', attempts integer NOT NULL DEFAULT 0, has_categories boolean, last_error text, entrance_jpg_key text, entrance_meta_key text, entrance_cat_key text, processed_jpg_key text, thumbnail_key text, created_ts timestamptz NOT NULL DEFAULT now(), updated_ts timestamptz NOT NULL DEFAULT now() ); """) cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);") cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);") cur.execute(""" CREATE OR REPLACE FUNCTION remote_cam.set_updated_ts() RETURNS trigger AS $$ BEGIN NEW.updated_ts = now(); RETURN NEW; END; $$ LANGUAGE plpgsql; """) cur.execute(""" DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'import_job_set_updated_ts') THEN CREATE TRIGGER import_job_set_updated_ts BEFORE UPDATE ON remote_cam.import_job FOR EACH ROW EXECUTE FUNCTION remote_cam.set_updated_ts(); END IF; END$$; """) def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]): cur.execute(""" INSERT INTO remote_cam.import_job (image_uuid, status, entrance_jpg_key, entrance_meta_key, entrance_cat_key, has_categories) VALUES (%s, 'NEW', %s, %s, %s, %s) ON CONFLICT (image_uuid) DO UPDATE SET entrance_jpg_key = EXCLUDED.entrance_jpg_key, entrance_meta_key = EXCLUDED.entrance_meta_key, entrance_cat_key = EXCLUDED.entrance_cat_key, has_categories = EXCLUDED.has_categories; """, (uuid, jpg_key, meta_key, cat_key, cat_key is not None)) def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, processed_key: Optional[str] = None, thumb_key: Optional[str] = None): cur.execute(""" UPDATE remote_cam.import_job SET status = %s, attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END, last_error = COALESCE(%s::text, last_error), processed_jpg_key = COALESCE(%s, processed_jpg_key), thumbnail_key = COALESCE(%s, thumbnail_key) WHERE image_uuid = %s; """, (status, status, err, processed_key, thumb_key, uuid)) def job_get(cur, uuid: str): cur.execute(""" SELECT status, processed_jpg_key, thumbnail_key, has_categories FROM remote_cam.import_job WHERE image_uuid = %s; """, (uuid,)) row = cur.fetchone() if not row: return None return { "status": row[0], "processed_jpg_key": row[1], "thumbnail_key": row[2], "has_categories": row[3], } STATUS_ORDER = [ "NEW", "META_OK", "OCR_OK", "EXIF_OK", "MOVED", "THUMB_OK", "CATEGORIES_OK", "CLEANED", ] def status_rank(status: Optional[str]) -> int: if status in STATUS_ORDER: return STATUS_ORDER.index(status) return -1 def insert_resource(cur, uuid: str, typ: str): cur.execute(""" INSERT INTO remote_cam.resource(resource_uuid, type, import_ts) VALUES (%s, %s, now()) ON CONFLICT DO NOTHING; """, (uuid, typ)) def insert_metadata(cur, uuid: str, meta: dict): def get(k): return meta.get(k) servertime = parse_servertime(meta) icd_raw = get("ImageCreationDate") icd = None if icd_raw: try: # "06:03:01 30.06.2025" -> assume UTC if no tz info icd = datetime.strptime(icd_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc) except Exception: icd = None lat = get("Latitude") lon = get("Longitude") lat_f = float(lat) if lat not in (None, "") else None lon_f = float(lon) if lon not in (None, "") else None dls_raw = get("DateTimeLastSettings") dls = None if dls_raw: try: dls = datetime.strptime(dls_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc) except Exception: dls = None cur.execute(""" INSERT INTO remote_cam.metadata( metadata_uuid, import_ts, "ImageCreationDate", "SizeInByte", "ImageFormat", "ImageNight", "DateTimeLastSettings", "BatteryStatus", "FirmwareVersion", "SignalStrength", "Temperature", "CoordinateSwitch", "Latitude", "Longitude", "WorkPeriod", "WorkStart", "WorkEnd", "ThumbnailSize", "PIRInterval", "TimeScan", imei, iccid, "INITKey", "CameraName", "CameraModel", "CameraLanguage", "CameraNetwork", "MobileNetworkCode", "MobileCountryCode", "LocationAreaCode", "CustomerTimezone", "CustomerLanguage", servertime, ocr_ts ) VALUES ( %s, now(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NULL ) ON CONFLICT (metadata_uuid) DO UPDATE SET import_ts = EXCLUDED.import_ts, "ImageCreationDate" = EXCLUDED."ImageCreationDate", "SizeInByte" = EXCLUDED."SizeInByte", "ImageFormat" = EXCLUDED."ImageFormat", "ImageNight" = EXCLUDED."ImageNight", "DateTimeLastSettings" = EXCLUDED."DateTimeLastSettings", "BatteryStatus" = EXCLUDED."BatteryStatus", "FirmwareVersion" = EXCLUDED."FirmwareVersion", "SignalStrength" = EXCLUDED."SignalStrength", "Temperature" = EXCLUDED."Temperature", "CoordinateSwitch" = EXCLUDED."CoordinateSwitch", "Latitude" = EXCLUDED."Latitude", "Longitude" = EXCLUDED."Longitude", "WorkPeriod" = EXCLUDED."WorkPeriod", "WorkStart" = EXCLUDED."WorkStart", "WorkEnd" = EXCLUDED."WorkEnd", "ThumbnailSize" = EXCLUDED."ThumbnailSize", "PIRInterval" = EXCLUDED."PIRInterval", "TimeScan" = EXCLUDED."TimeScan", imei = EXCLUDED.imei, iccid = EXCLUDED.iccid, "INITKey" = EXCLUDED."INITKey", "CameraName" = EXCLUDED."CameraName", "CameraModel" = EXCLUDED."CameraModel", "CameraLanguage" = EXCLUDED."CameraLanguage", "CameraNetwork" = EXCLUDED."CameraNetwork", "MobileNetworkCode" = EXCLUDED."MobileNetworkCode", "MobileCountryCode" = EXCLUDED."MobileCountryCode", "LocationAreaCode" = EXCLUDED."LocationAreaCode", "CustomerTimezone" = EXCLUDED."CustomerTimezone", "CustomerLanguage" = EXCLUDED."CustomerLanguage", servertime = EXCLUDED.servertime; """, ( uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"), dls, get("BatteryStatus"), get("FirmwareVersion"), get("SignalStrength"), get("Temperature"), get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"), get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"), get("PIRInterval"), get("TimeScan"), get("imei"), get("iccid"), get("INITKey"), get("CameraName"), get("CameraModel"), get("CameraLanguage"), get("CameraNetwork"), get("MobileNetworkCode"), get("MobileCountryCode"), get("LocationAreaCode"), get("CustomerTimezone"), get("CustomerLanguage"), servertime )) def update_ocr_ts(cur, uuid: str, ocr_ts: datetime): cur.execute(""" UPDATE remote_cam.metadata SET ocr_ts = %s WHERE metadata_uuid = %s; """, (ocr_ts, uuid)) def get_ocr_ts(cur, uuid: str) -> Optional[datetime]: cur.execute(""" SELECT ocr_ts FROM remote_cam.metadata WHERE metadata_uuid = %s; """, (uuid,)) row = cur.fetchone() return row[0] if row else None def insert_categories(cur, uuid: str, cat_json: dict): version = cat_json.get("version") cats = cat_json.get("categories") or [] cur.execute("DELETE FROM remote_cam.category WHERE image_uuid = %s;", (uuid,)) for c in cats: cur.execute(""" INSERT INTO remote_cam.category(image_uuid, category, score, version) VALUES (%s, %s, %s, %s); """, (uuid, c.get("category"), c.get("score"), version)) # ----------------------- # S3 ops wrapper # ----------------------- class S3: def __init__(self, cfg: AppConfig): self.cfg = cfg self.client = boto3.client( "s3", endpoint_url=cfg.s3_endpoint, aws_access_key_id=cfg.s3_access_key, aws_secret_access_key=cfg.s3_secret_key, ) def list_entrance_objects(self): paginator = self.client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=self.cfg.s3_bucket, Prefix=self.cfg.entrance_prefix): for obj in page.get("Contents", []): yield obj def get_bytes(self, key: str) -> bytes: resp = self.client.get_object(Bucket=self.cfg.s3_bucket, Key=key) return resp["Body"].read() def put_bytes(self, key: str, data: bytes, content_type: str): self.client.put_object(Bucket=self.cfg.s3_bucket, Key=key, Body=data, ContentType=content_type) def head(self, key: str): return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key) def delete_keys(self, keys): if not keys: return self.client.delete_objects( Bucket=self.cfg.s3_bucket, Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True}, ) # ----------------------- # Main processing # ----------------------- @dataclass class EntranceSet: uuid: str jpg_key: str meta_key: str cat_key: Optional[str] lm_jpg: datetime lm_meta: datetime lm_cat: Optional[datetime] def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]: groups: Dict[str, Dict[str, Tuple[str, datetime]]] = {} for obj in s3.list_entrance_objects(): key = obj["Key"] name = key.split("/")[-1] lm = obj["LastModified"] m1 = UUID_RE.match(name) if m1: uuid = m1.group(1) ext = m1.group(2) typ = "jpg" if ext == "jpg" else "meta" groups.setdefault(uuid, {})[typ] = (key, lm) continue m2 = CAT_RE.match(name) if m2: uuid = m2.group(1) groups.setdefault(uuid, {})["cat"] = (key, lm) continue ready: list[EntranceSet] = [] for uuid, parts in groups.items(): if "jpg" in parts and "meta" in parts: jpg_key, lm_j = parts["jpg"] meta_key, lm_m = parts["meta"] cat = parts.get("cat") cat_key = cat[0] if cat else None lm_c = cat[1] if cat else None if not (is_old_enough(lm_j, cfg.min_age_seconds) and is_old_enough(lm_m, cfg.min_age_seconds)): continue if cat and not is_old_enough(lm_c, cfg.min_age_seconds): continue ready.append(EntranceSet(uuid, jpg_key, meta_key, cat_key, lm_j, lm_m, lm_c)) return ready def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet): job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key) job = job_get(cur, s.uuid) or {} status_now = job.get("status", "NEW") # 1) metadata.json -> DB meta = json.loads(s3.get_bytes(s.meta_key)) insert_metadata(cur, s.uuid, meta) insert_resource(cur, s.uuid, "metadata") if status_rank("META_OK") >= status_rank(status_now): job_set_status(cur, s.uuid, "META_OK") status_now = "META_OK" # 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone jpg_bytes = s3.get_bytes(s.jpg_key) ocr_ts = None if status_rank(status_now) < status_rank("OCR_OK"): try: customer_tzinfo, _ = parse_customer_tz(meta) ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo) update_ocr_ts(cur, s.uuid, ocr_ts) insert_resource(cur, s.uuid, "image") job_set_status(cur, s.uuid, "OCR_OK") status_now = "OCR_OK" except Exception as e: job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e)) status_now = "OCR_FAIL" elif status_rank(status_now) >= status_rank("OCR_OK"): ocr_ts = get_ocr_ts(cur, s.uuid) # 3) EXIF write: # DateTimeOriginal = ocr_ts (local, keep offset tags) # DateTimeDigitized = servertime (UTC) jpg_exif = jpg_bytes if status_rank(status_now) < status_rank("MOVED"): if ocr_ts is None: job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing") status_now = "EXIF_SKIPPED" else: try: servertime = parse_servertime(meta) jpg_exif = exif_write_with_exiftool( jpg_bytes, ocr_ts, servertime, timeout_seconds=cfg.exiftool_timeout_seconds, ) job_set_status(cur, s.uuid, "EXIF_OK") status_now = "EXIF_OK" except Exception as e: job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e)) status_now = "EXIF_FAIL" # 4) Upload processed JPG (EXIF patched) processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{s.uuid}.jpg" if status_rank(status_now) < status_rank("MOVED"): s3.put_bytes(processed_key, jpg_exif, "image/jpeg") s3.head(processed_key) job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key) status_now = "MOVED" # 5) Thumbnail thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{s.uuid}.jpg" if status_rank(status_now) < status_rank("THUMB_OK"): if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes: jpg_exif = s3.get_bytes(processed_key) thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max) s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg") s3.head(thumb_key) job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key) status_now = "THUMB_OK" # 6) Optional categories if s.cat_key and status_rank(status_now) < status_rank("CATEGORIES_OK"): cat_json = json.loads(s3.get_bytes(s.cat_key)) insert_categories(cur, s.uuid, cat_json) insert_resource(cur, s.uuid, "classification") job_set_status(cur, s.uuid, "CATEGORIES_OK") status_now = "CATEGORIES_OK" # 7) Cleanup entrance only after verify if status_rank(status_now) >= status_rank("THUMB_OK"): try: s3.head(processed_key) s3.head(thumb_key) to_delete = [s.jpg_key, s.meta_key] if s.cat_key: to_delete.append(s.cat_key) s3.delete_keys(to_delete) job_set_status(cur, s.uuid, "CLEANED") status_now = "CLEANED" except Exception as e: job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e)) def run_once(cfg: AppConfig) -> int: s3c = S3(cfg) ready = collect_ready_sets(s3c, cfg) if not ready: return 0 with psycopg.connect(cfg.pg_dsn) as db: for s in ready: try: with db.transaction(): with db.cursor() as cur: process_one(cur, s3c, cfg, s) except Exception as e: with db.transaction(): with db.cursor() as cur: job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key) job_set_status(cur, s.uuid, "ERROR", err=str(e)) print(f"[ERROR] {s.uuid}: {e}") if cfg.job_sleep_seconds > 0: time.sleep(cfg.job_sleep_seconds) return len(ready) def main(): cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml") cfg = load_config(cfg_path) with psycopg.connect(cfg.pg_dsn) as db: with db.cursor() as cur: db_init(cur) db.commit() while True: n = run_once(cfg) if n == 0: time.sleep(cfg.poll_seconds) if __name__ == "__main__": main()