diff --git a/config.yaml b/config.yaml index b2ae806..656146a 100644 --- a/config.yaml +++ b/config.yaml @@ -8,6 +8,10 @@ s3: postgres: dsn: "postgresql://postgres:DFGk5s9H21lKao2K@136.243.41.58:7777/meles" +deermapper-api: + base_url: "https://webapp.deermapper.net/api/icu" + apiKey: "695bc217-3b40-48bb-bb12-17fc5b08b320" + app: entrance_prefix: "icu/entrance/" processed_prefix: "icu/processed/" @@ -15,8 +19,17 @@ app: min_age_seconds: 90 poll_seconds: 30 # OCR crop tunables (defaults are usually fine) - ocr_crop_w_frac: 0.42 - ocr_crop_h_frac: 0.22 + ocr_crop_w_frac: 0.63 + ocr_crop_h_frac: 0.05 thumb_max: 512 exiftool_timeout_seconds: 15 - job_sleep_seconds: 1.0 + job_sleep_seconds: 0.0 + parallel_workers: 6 + s3_max_pool_connections: 48 + # Optional processing switches: + # - false => image still gets moved + thumbnail, and backfill flag is set in import_job + enable_ocr: true + enable_exif: true + enable_deermapper_api: false + deermapper_api_timeout_seconds: 20 + deermapper_api_image_field: "image" diff --git a/main.py b/main.py index 808db15..4e97580 100644 --- a/main.py +++ b/main.py @@ -6,12 +6,17 @@ import json import time import tempfile import subprocess +import uuid as uuidlib +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional, Dict, Tuple +from urllib import error as urlerror +from urllib import request as urlrequest import boto3 import psycopg +from botocore.config import Config as BotoClientConfig from dateutil import tz from PIL import Image, ImageOps, ImageEnhance import pytesseract @@ -65,7 +70,16 @@ class AppConfig: thumb_max: int = 512 exiftool_timeout_seconds: int = 15 - job_sleep_seconds: float = 1.0 + job_sleep_seconds: float = 0.0 + parallel_workers: int = 4 + s3_max_pool_connections: int = 32 + enable_ocr: bool = True + enable_exif: bool = True + enable_deermapper_api: bool = False + deermapper_api_base_url: str = "https://webapp.deermapper.net/api/icu" + deermapper_api_key: str = "" + deermapper_api_timeout_seconds: int = 20 + deermapper_api_image_field: str = "image" def load_config(path: str) -> AppConfig: @@ -76,9 +90,26 @@ def load_config(path: str) -> AppConfig: def env_or(key: str, default=None): return os.environ.get(key, default) + def as_bool(value, default: bool = True) -> bool: + if value is None: + return default + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + return value.strip().lower() in ("1", "true", "yes", "on") + return bool(value) + s3 = raw["s3"] pg = raw["postgres"] app = raw.get("app", {}) + deermapper = raw.get("deermapper-api", raw.get("deermapper_api", {})) + default_workers = max(1, min(16, (os.cpu_count() or 2) * 2)) + parallel_workers = int(app.get("parallel_workers", default_workers)) + parallel_workers = max(1, parallel_workers) + s3_pool = int(app.get("s3_max_pool_connections", max(16, parallel_workers * 4))) + s3_pool = max(10, s3_pool) return AppConfig( s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]), @@ -95,7 +126,16 @@ def load_config(path: str) -> AppConfig: ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)), thumb_max=int(app.get("thumb_max", 512)), exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)), - job_sleep_seconds=float(app.get("job_sleep_seconds", 1.0)), + job_sleep_seconds=float(app.get("job_sleep_seconds", 0.0)), + parallel_workers=parallel_workers, + s3_max_pool_connections=s3_pool, + enable_ocr=as_bool(app.get("enable_ocr", True), True), + enable_exif=as_bool(app.get("enable_exif", True), True), + enable_deermapper_api=as_bool(app.get("enable_deermapper_api", False), False), + deermapper_api_base_url=env_or("DEERMAPPER_API_URL", deermapper.get("base_url", "https://webapp.deermapper.net/api/icu")), + deermapper_api_key=env_or("DEERMAPPER_API_KEY", deermapper.get("apiKey", "")), + deermapper_api_timeout_seconds=int(app.get("deermapper_api_timeout_seconds", 20)), + deermapper_api_image_field=app.get("deermapper_api_image_field", "image"), ) @@ -256,6 +296,71 @@ def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes: return out.getvalue() +def _multipart_form_data(fields: Dict[str, str], files: list[tuple[str, str, bytes, str]]) -> tuple[str, bytes]: + boundary = f"----meles-{uuidlib.uuid4().hex}" + boundary_bytes = boundary.encode("ascii") + crlf = b"\r\n" + + parts: list[bytes] = [] + for name, value in fields.items(): + parts.extend([ + b"--" + boundary_bytes, + f'Content-Disposition: form-data; name="{name}"'.encode("utf-8"), + b"", + str(value).encode("utf-8"), + ]) + for field_name, filename, content, content_type in files: + parts.extend([ + b"--" + boundary_bytes, + f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"'.encode("utf-8"), + f"Content-Type: {content_type}".encode("utf-8"), + b"", + content, + ]) + parts.append(b"--" + boundary_bytes + b"--") + body = crlf.join(parts) + crlf + return f"multipart/form-data; boundary={boundary}", body + + +def push_to_deermapper_api(cfg: AppConfig, image_uuid: str, imei: str, metadata: dict, jpg_bytes: bytes) -> tuple[bool, str]: + fields = { + "apiKey": cfg.deermapper_api_key, + "imei": imei, + "metadata": json.dumps(metadata, ensure_ascii=False), + } + content_type, body = _multipart_form_data( + fields, + [(cfg.deermapper_api_image_field, f"{image_uuid}.jpg", jpg_bytes, "image/jpeg")], + ) + req = urlrequest.Request( + cfg.deermapper_api_base_url, + data=body, + method="POST", + headers={ + "Content-Type": content_type, + "Accept": "application/json", + }, + ) + + try: + with urlrequest.urlopen(req, timeout=cfg.deermapper_api_timeout_seconds) as resp: + payload = resp.read().decode("utf-8", errors="replace") + try: + data = json.loads(payload) if payload else {} + except Exception: + return False, f"API response not JSON: {payload[:300]}" + status_val = str(data.get("status", "")).strip() + msg = str(data.get("msg", "")).strip() + if status_val == "1": + return True, msg or "ok" + return False, msg or f"status={status_val or 'unknown'}" + except urlerror.HTTPError as e: + body_txt = e.read().decode("utf-8", errors="replace") + return False, f"HTTP {e.code}: {body_txt[:300]}" + except Exception as e: + return False, str(e) + + # ----------------------- # DB ops # ----------------------- @@ -272,10 +377,22 @@ def db_init(cur): entrance_cat_key text, processed_jpg_key text, thumbnail_key text, + needs_ocr_backfill boolean NOT NULL DEFAULT false, + needs_exif_backfill boolean NOT NULL DEFAULT false, created_ts timestamptz NOT NULL DEFAULT now(), updated_ts timestamptz NOT NULL DEFAULT now() ); """) + cur.execute(""" + ALTER TABLE remote_cam.import_job + ADD COLUMN IF NOT EXISTS needs_ocr_backfill boolean NOT NULL DEFAULT false; + """) + cur.execute(""" + ALTER TABLE remote_cam.import_job + ADD COLUMN IF NOT EXISTS needs_exif_backfill boolean NOT NULL DEFAULT false; + """) + cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_ocr_idx ON remote_cam.import_job(needs_ocr_backfill);") + cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_exif_idx ON remote_cam.import_job(needs_exif_backfill);") cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);") cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);") @@ -298,6 +415,14 @@ def db_init(cur): END IF; END$$; """) + cur.execute(""" + CREATE TABLE IF NOT EXISTS remote_cam.api_response ( + push_ts timestamp without time zone, + message character(512), + image_uuid uuid, + success boolean + ); + """) def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]): @@ -324,9 +449,20 @@ def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, pr """, (status, status, err, processed_key, thumb_key, uuid)) +def job_set_backfill(cur, uuid: str, *, needs_ocr_backfill: Optional[bool] = None, needs_exif_backfill: Optional[bool] = None): + if needs_ocr_backfill is None and needs_exif_backfill is None: + return + cur.execute(""" + UPDATE remote_cam.import_job + SET needs_ocr_backfill = COALESCE(%s, needs_ocr_backfill), + needs_exif_backfill = COALESCE(%s, needs_exif_backfill) + WHERE image_uuid = %s; + """, (needs_ocr_backfill, needs_exif_backfill, uuid)) + + def job_get(cur, uuid: str): cur.execute(""" - SELECT status, processed_jpg_key, thumbnail_key, has_categories + SELECT status, processed_jpg_key, thumbnail_key, has_categories, needs_ocr_backfill, needs_exif_backfill FROM remote_cam.import_job WHERE image_uuid = %s; """, (uuid,)) @@ -338,6 +474,8 @@ def job_get(cur, uuid: str): "processed_jpg_key": row[1], "thumbnail_key": row[2], "has_categories": row[3], + "needs_ocr_backfill": row[4], + "needs_exif_backfill": row[5], } @@ -345,10 +483,14 @@ STATUS_ORDER = [ "NEW", "META_OK", "OCR_OK", + "OCR_SKIPPED", "EXIF_OK", + "EXIF_SKIPPED", "MOVED", "THUMB_OK", "CATEGORIES_OK", + "API_FAIL", + "API_OK", "CLEANED", ] @@ -359,6 +501,16 @@ def status_rank(status: Optional[str]) -> int: return -1 +def is_terminal_success(status: Optional[str], cfg: AppConfig) -> bool: + if status is None: + return False + if status == "CLEANED": + return True + if cfg.enable_deermapper_api: + return status_rank(status) >= status_rank("API_OK") + return status_rank(status) >= status_rank("THUMB_OK") + + def insert_resource(cur, uuid: str, typ: str): cur.execute(""" INSERT INTO remote_cam.resource(resource_uuid, type, import_ts) @@ -484,6 +636,14 @@ def insert_categories(cur, uuid: str, cat_json: dict): """, (uuid, c.get("category"), c.get("score"), version)) +def insert_api_response(cur, uuid: str, success: bool, message: str): + msg = (message or "")[:512] + cur.execute(""" + INSERT INTO remote_cam.api_response(push_ts, message, image_uuid, success) + VALUES (now(), %s, %s::uuid, %s); + """, (msg, uuid, success)) + + # ----------------------- # S3 ops wrapper # ----------------------- @@ -495,6 +655,7 @@ class S3: endpoint_url=cfg.s3_endpoint, aws_access_key_id=cfg.s3_access_key, aws_secret_access_key=cfg.s3_secret_key, + config=BotoClientConfig(max_pool_connections=cfg.s3_max_pool_connections), ) def list_entrance_objects(self): @@ -513,13 +674,21 @@ class S3: def head(self, key: str): return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key) - def delete_keys(self, keys): + def delete_keys(self, keys) -> int: if not keys: - return - self.client.delete_objects( + return 0 + resp = self.client.delete_objects( Bucket=self.cfg.s3_bucket, - Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True}, + Delete={"Objects": [{"Key": k} for k in keys], "Quiet": False}, ) + errors = resp.get("Errors") or [] + if errors: + details = "; ".join( + f"{e.get('Key')}:{e.get('Code')}:{e.get('Message')}" for e in errors[:3] + ) + raise RuntimeError(f"S3 delete failed for {len(errors)} object(s): {details}") + deleted = resp.get("Deleted") or [] + return len(deleted) # ----------------------- @@ -590,27 +759,42 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet): # 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone jpg_bytes = s3.get_bytes(s.jpg_key) - ocr_ts = None - if status_rank(status_now) < status_rank("OCR_OK"): + ocr_ts = get_ocr_ts(cur, s.uuid) + ocr_needs_run = cfg.enable_ocr and ( + status_rank(status_now) < status_rank("OCR_OK") or status_now == "OCR_SKIPPED" + ) + + if ocr_needs_run: try: customer_tzinfo, _ = parse_customer_tz(meta) ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo) update_ocr_ts(cur, s.uuid, ocr_ts) insert_resource(cur, s.uuid, "image") + job_set_backfill(cur, s.uuid, needs_ocr_backfill=False) job_set_status(cur, s.uuid, "OCR_OK") status_now = "OCR_OK" except Exception as e: + job_set_backfill(cur, s.uuid, needs_ocr_backfill=True) job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e)) status_now = "OCR_FAIL" - elif status_rank(status_now) >= status_rank("OCR_OK"): - ocr_ts = get_ocr_ts(cur, s.uuid) + elif not cfg.enable_ocr: + job_set_backfill(cur, s.uuid, needs_ocr_backfill=True) + if status_rank(status_now) < status_rank("OCR_SKIPPED"): + job_set_status(cur, s.uuid, "OCR_SKIPPED") + status_now = "OCR_SKIPPED" # 3) EXIF write: # DateTimeOriginal = ocr_ts (local, keep offset tags) # DateTimeDigitized = servertime (UTC) jpg_exif = jpg_bytes - if status_rank(status_now) < status_rank("MOVED"): + if not cfg.enable_exif: + job_set_backfill(cur, s.uuid, needs_exif_backfill=True) + if status_rank(status_now) < status_rank("EXIF_SKIPPED"): + job_set_status(cur, s.uuid, "EXIF_SKIPPED") + status_now = "EXIF_SKIPPED" + elif status_rank(status_now) < status_rank("MOVED"): if ocr_ts is None: + job_set_backfill(cur, s.uuid, needs_exif_backfill=True) job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing") status_now = "EXIF_SKIPPED" else: @@ -622,9 +806,11 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet): servertime, timeout_seconds=cfg.exiftool_timeout_seconds, ) + job_set_backfill(cur, s.uuid, needs_exif_backfill=False) job_set_status(cur, s.uuid, "EXIF_OK") status_now = "EXIF_OK" except Exception as e: + job_set_backfill(cur, s.uuid, needs_exif_backfill=True) job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e)) status_now = "EXIF_FAIL" @@ -655,8 +841,32 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet): job_set_status(cur, s.uuid, "CATEGORIES_OK") status_now = "CATEGORIES_OK" - # 7) Cleanup entrance only after verify - if status_rank(status_now) >= status_rank("THUMB_OK"): + # 7) Optional DeerMapper API push (image + metadata) + if cfg.enable_deermapper_api and status_rank(status_now) < status_rank("API_OK"): + api_key = (cfg.deermapper_api_key or "").strip() + api_url = (cfg.deermapper_api_base_url or "").strip() + if not api_key or not api_url: + err_msg = "DeerMapper API config missing" + job_set_status(cur, s.uuid, "API_FAIL", err=err_msg) + insert_api_response(cur, s.uuid, False, err_msg) + status_now = "API_FAIL" + else: + if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes: + jpg_exif = s3.get_bytes(processed_key) + imei = str(meta.get("imei") or "").strip() + ok, msg = push_to_deermapper_api(cfg, s.uuid, imei, meta, jpg_exif) + if ok: + insert_api_response(cur, s.uuid, True, msg) + job_set_status(cur, s.uuid, "API_OK") + status_now = "API_OK" + else: + insert_api_response(cur, s.uuid, False, msg) + job_set_status(cur, s.uuid, "API_FAIL", err=msg) + status_now = "API_FAIL" + + # 8) Cleanup entrance only after verify (+ API when enabled) + api_ready_for_cleanup = (not cfg.enable_deermapper_api) or status_rank(status_now) >= status_rank("API_OK") + if status_rank(status_now) >= status_rank("THUMB_OK") and api_ready_for_cleanup: try: s3.head(processed_key) s3.head(thumb_key) @@ -687,21 +897,22 @@ def is_completed_job_materialized(cur, s3: S3, cfg: AppConfig, uuid: str) -> boo return True -def cleanup_entrance_set(s3: S3, s: EntranceSet): +def cleanup_entrance_set(s3: S3, s: EntranceSet) -> int: to_delete = [s.jpg_key, s.meta_key] if s.cat_key: to_delete.append(s.cat_key) - s3.delete_keys(to_delete) + return s3.delete_keys(to_delete) -def run_once(cfg: AppConfig) -> int: +def _process_ready_batch(cfg: AppConfig, batch: list[EntranceSet]): s3c = S3(cfg) - ready = collect_ready_sets(s3c, cfg) - if not ready: - return 0 - with psycopg.connect(cfg.pg_dsn) as db: - for s in ready: + for s in batch: + start = time.perf_counter() + result = "failure" + final_status = "UNKNOWN" + err_text = None + cleanup_deleted = None try: skip_duplicate = False with db.transaction(): @@ -710,17 +921,56 @@ def run_once(cfg: AppConfig) -> int: skip_duplicate = True else: process_one(cur, s3c, cfg, s) + final_status = (job_get(cur, s.uuid) or {}).get("status", "UNKNOWN") + result = "success" if is_terminal_success(final_status, cfg) else "failure" if skip_duplicate: - cleanup_entrance_set(s3c, s) - print(f"[SKIP] duplicate already processed: {s.uuid}") + cleanup_deleted = cleanup_entrance_set(s3c, s) + final_status = "DUPLICATE_CLEANED" + result = "success" except Exception as e: with db.transaction(): with db.cursor() as cur: job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key) job_set_status(cur, s.uuid, "ERROR", err=str(e)) - print(f"[ERROR] {s.uuid}: {e}") + final_status = "ERROR" + err_text = str(e) + elapsed_ms = int((time.perf_counter() - start) * 1000) + if err_text: + print( + f"[RESULT] uuid={s.uuid} result={result} status={final_status} " + f"process_ms={elapsed_ms} error={err_text}" + ) + elif cleanup_deleted is not None: + print( + f"[RESULT] uuid={s.uuid} result={result} status={final_status} " + f"process_ms={elapsed_ms} cleanup_deleted={cleanup_deleted}" + ) + else: + print(f"[RESULT] uuid={s.uuid} result={result} status={final_status} process_ms={elapsed_ms}") if cfg.job_sleep_seconds > 0: time.sleep(cfg.job_sleep_seconds) + + +def run_once(cfg: AppConfig) -> int: + s3c = S3(cfg) + ready = collect_ready_sets(s3c, cfg) + if not ready: + return 0 + + workers = max(1, min(cfg.parallel_workers, len(ready))) + if workers == 1: + _process_ready_batch(cfg, ready) + return len(ready) + + batches: list[list[EntranceSet]] = [[] for _ in range(workers)] + for i, s in enumerate(ready): + batches[i % workers].append(s) + + with ThreadPoolExecutor(max_workers=workers) as pool: + futures = [pool.submit(_process_ready_batch, cfg, batch) for batch in batches if batch] + for future in as_completed(futures): + future.result() + return len(ready)