Files
melesICUmover/main.py

994 lines
36 KiB
Python

# main.py
#!/usr/bin/env python3
import os
import re
import json
import time
import tempfile
import subprocess
import uuid as uuidlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, Dict, Tuple
from urllib import error as urlerror
from urllib import request as urlrequest
import boto3
import psycopg
from botocore.config import Config as BotoClientConfig
from dateutil import tz
from PIL import Image, ImageOps, ImageEnhance
import pytesseract
import yaml
if os.environ.get("DEBUG") == "1":
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("Waiting for debugger...")
debugpy.wait_for_client()
# -----------------------
# Config
# -----------------------
UUID_RE = re.compile(r"^([0-9a-fA-F-]{36})\.(jpg|json)$")
CAT_RE = re.compile(r"^([0-9a-fA-F-]{36})_categories\.json$")
# Overlay fixed: "HH:MM:SS DD-MM-YYYY"
OCR_DT_RE = re.compile(
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s+(?P<d>\d{2})-(?P<mo>\d{2})-(?P<y>\d{4})"
)
OCR_DT_RE_FLEX = re.compile(
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s*(?P<d>\d{2})[-.](?P<mo>\d{2})[-.](?P<y>\d{4})"
)
@dataclass
class AppConfig:
s3_endpoint: str
s3_access_key: str
s3_secret_key: str
s3_bucket: str
pg_dsn: str
entrance_prefix: str = "icu/entrance/"
processed_prefix: str = "icu/processed/"
thumb_prefix: str = "icu/thumbnails/"
min_age_seconds: int = 90
poll_seconds: int = 30
# OCR crop settings (tune if needed)
ocr_crop_w_frac: float = 0.42 # right ~42% of width
ocr_crop_h_frac: float = 0.22 # bottom ~22% of height
thumb_max: int = 512
exiftool_timeout_seconds: int = 15
job_sleep_seconds: float = 0.0
parallel_workers: int = 4
s3_max_pool_connections: int = 32
enable_ocr: bool = True
enable_exif: bool = True
enable_deermapper_api: bool = False
deermapper_api_base_url: str = "https://webapp.deermapper.net/api/icu"
deermapper_api_key: str = ""
deermapper_api_timeout_seconds: int = 20
deermapper_api_image_field: str = "image"
def load_config(path: str) -> AppConfig:
with open(path, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
# allow env override for secret injection if desired
def env_or(key: str, default=None):
return os.environ.get(key, default)
def as_bool(value, default: bool = True) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in ("1", "true", "yes", "on")
return bool(value)
s3 = raw["s3"]
pg = raw["postgres"]
app = raw.get("app", {})
deermapper = raw.get("deermapper-api", raw.get("deermapper_api", {}))
default_workers = max(1, min(16, (os.cpu_count() or 2) * 2))
parallel_workers = int(app.get("parallel_workers", default_workers))
parallel_workers = max(1, parallel_workers)
s3_pool = int(app.get("s3_max_pool_connections", max(16, parallel_workers * 4)))
s3_pool = max(10, s3_pool)
return AppConfig(
s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]),
s3_access_key=env_or("S3_ACCESS_KEY", s3["access_key"]),
s3_secret_key=env_or("S3_SECRET_KEY", s3["secret_key"]),
s3_bucket=env_or("S3_BUCKET", s3["bucket"]),
pg_dsn=env_or("PG_DSN", pg["dsn"]),
entrance_prefix=app.get("entrance_prefix", "icu/entrance/"),
processed_prefix=app.get("processed_prefix", "icu/processed/"),
thumb_prefix=app.get("thumb_prefix", "icu/thumbnails/"),
min_age_seconds=int(app.get("min_age_seconds", 90)),
poll_seconds=int(app.get("poll_seconds", 30)),
ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)),
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
thumb_max=int(app.get("thumb_max", 512)),
exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)),
job_sleep_seconds=float(app.get("job_sleep_seconds", 0.0)),
parallel_workers=parallel_workers,
s3_max_pool_connections=s3_pool,
enable_ocr=as_bool(app.get("enable_ocr", True), True),
enable_exif=as_bool(app.get("enable_exif", True), True),
enable_deermapper_api=as_bool(app.get("enable_deermapper_api", False), False),
deermapper_api_base_url=env_or("DEERMAPPER_API_URL", deermapper.get("base_url", "https://webapp.deermapper.net/api/icu")),
deermapper_api_key=env_or("DEERMAPPER_API_KEY", deermapper.get("apiKey", "")),
deermapper_api_timeout_seconds=int(app.get("deermapper_api_timeout_seconds", 20)),
deermapper_api_image_field=app.get("deermapper_api_image_field", "image"),
)
# -----------------------
# Helpers
# -----------------------
def is_old_enough(last_modified, min_age_seconds: int) -> bool:
now = datetime.now(timezone.utc)
return (now - last_modified).total_seconds() >= min_age_seconds
def to_exif_datetime(dt: datetime) -> str:
return dt.strftime("%Y:%m:%d %H:%M:%S")
def tz_offset_str(dt: datetime) -> str:
off = dt.utcoffset()
if off is None:
off = timezone.utc.utcoffset(dt)
total = int(off.total_seconds())
sign = "+" if total >= 0 else "-"
total = abs(total)
hh = total // 3600
mm = (total % 3600) // 60
return f"{sign}{hh:02d}:{mm:02d}"
def parse_servertime(meta: dict) -> datetime:
s = meta.get("servertime")
if not s:
raise ValueError("metadata.servertime missing")
# Example: "2025-06-30T04:03:04+0000"
if re.match(r".*[+-]\d{4}$", s):
return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z")
# Fallback: ISO-ish
return datetime.fromisoformat(s)
def parse_customer_tz(meta: dict):
# CustomerTimezone applies to OCR (camera overlay)
name = meta.get("CustomerTimezone") or "UTC"
z = tz.gettz(name)
if z is None:
raise ValueError(f"Unknown CustomerTimezone: {name}")
return z, name
def ocr_extract_timestamp(jpg_bytes: bytes, crop_w_frac: float, crop_h_frac: float, customer_tzinfo) -> datetime:
from io import BytesIO
im = Image.open(BytesIO(jpg_bytes)).convert("RGB")
w, h = im.size
x0 = int(w * (1.0 - crop_w_frac))
y0 = int(h * (1.0 - crop_h_frac))
crop = im.crop((x0, y0, w, h))
gray = ImageOps.grayscale(crop)
gray = ImageOps.autocontrast(gray)
gray = ImageEnhance.Contrast(gray).enhance(2.5)
gray = ImageEnhance.Sharpness(gray).enhance(2.0)
bw = gray.point(lambda p: 255 if p > 180 else 0)
cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- '
text = pytesseract.image_to_string(bw, config=cfg).strip()
# retry without thresholding
text2 = pytesseract.image_to_string(gray, config=cfg).strip()
dt_local_naive = _parse_ocr_datetime(text, text2)
# Keep camera-local timezone (no UTC conversion requested)
return dt_local_naive.replace(tzinfo=customer_tzinfo)
def _parse_ocr_datetime(*texts: str) -> datetime:
def build_dt(h, m, s, d, mo, y):
return datetime(int(y), int(mo), int(d), int(h), int(m), int(s))
def valid_dt(h, m, s, d, mo, y) -> bool:
try:
build_dt(h, m, s, d, mo, y)
return True
except Exception:
return False
# 1) direct regex matches
for t in texts:
for rx in (OCR_DT_RE, OCR_DT_RE_FLEX):
m = rx.search(t)
if m and valid_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y")):
return build_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y"))
# 2) find time and date separately (handles missing space between time and date)
time_rx = re.compile(r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})")
date_rx = re.compile(r"(?P<d>\d{2})[-.](?P<mo>\d{2})[-.](?P<y>\d{4})")
for t in texts:
tm = time_rx.search(t)
dm = date_rx.search(t)
if tm and dm and valid_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y")):
return build_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y"))
# 3) digits-only fallback: scan for HHMMSSDDMMYYYY
for t in texts:
digits = re.sub(r"\D", "", t)
for i in range(0, max(0, len(digits) - 13)):
chunk = digits[i:i + 14]
if len(chunk) < 14:
continue
h, m, s = chunk[0:2], chunk[2:4], chunk[4:6]
d, mo, y = chunk[6:8], chunk[8:10], chunk[10:14]
if valid_dt(h, m, s, d, mo, y):
return build_dt(h, m, s, d, mo, y)
raise ValueError(f"OCR timestamp not found. OCR1='{texts[0] if texts else ''}' OCR2='{texts[1] if len(texts) > 1 else ''}'")
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime, *, timeout_seconds: int) -> bytes:
"""
DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal
DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00
"""
dto = to_exif_datetime(dt_original_local)
oto = tz_offset_str(dt_original_local)
dtd = to_exif_datetime(dt_digitized_utc.astimezone(timezone.utc))
otd = "+00:00"
with tempfile.TemporaryDirectory() as td:
in_path = os.path.join(td, "in.jpg")
with open(in_path, "wb") as f:
f.write(jpg_in)
cmd = [
"exiftool",
"-overwrite_original",
f"-DateTimeOriginal={dto}",
f"-OffsetTimeOriginal={oto}",
f"-DateTimeDigitized={dtd}",
f"-OffsetTimeDigitized={otd}",
f"-ModifyDate={dtd}",
f"-OffsetTime={otd}",
in_path,
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_seconds)
with open(in_path, "rb") as f:
return f.read()
def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes:
from io import BytesIO
im = Image.open(BytesIO(jpg_bytes)).convert("RGB")
im.thumbnail((thumb_max, thumb_max))
out = BytesIO()
im.save(out, format="JPEG", quality=85, optimize=True)
return out.getvalue()
def _multipart_form_data(fields: Dict[str, str], files: list[tuple[str, str, bytes, str]]) -> tuple[str, bytes]:
boundary = f"----meles-{uuidlib.uuid4().hex}"
boundary_bytes = boundary.encode("ascii")
crlf = b"\r\n"
parts: list[bytes] = []
for name, value in fields.items():
parts.extend([
b"--" + boundary_bytes,
f'Content-Disposition: form-data; name="{name}"'.encode("utf-8"),
b"",
str(value).encode("utf-8"),
])
for field_name, filename, content, content_type in files:
parts.extend([
b"--" + boundary_bytes,
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"'.encode("utf-8"),
f"Content-Type: {content_type}".encode("utf-8"),
b"",
content,
])
parts.append(b"--" + boundary_bytes + b"--")
body = crlf.join(parts) + crlf
return f"multipart/form-data; boundary={boundary}", body
def push_to_deermapper_api(cfg: AppConfig, image_uuid: str, imei: str, metadata: dict, jpg_bytes: bytes) -> tuple[bool, str]:
fields = {
"apiKey": cfg.deermapper_api_key,
"imei": imei,
"metadata": json.dumps(metadata, ensure_ascii=False),
}
content_type, body = _multipart_form_data(
fields,
[(cfg.deermapper_api_image_field, f"{image_uuid}.jpg", jpg_bytes, "image/jpeg")],
)
req = urlrequest.Request(
cfg.deermapper_api_base_url,
data=body,
method="POST",
headers={
"Content-Type": content_type,
"Accept": "application/json",
},
)
try:
with urlrequest.urlopen(req, timeout=cfg.deermapper_api_timeout_seconds) as resp:
payload = resp.read().decode("utf-8", errors="replace")
try:
data = json.loads(payload) if payload else {}
except Exception:
return False, f"API response not JSON: {payload[:300]}"
status_val = str(data.get("status", "")).strip()
msg = str(data.get("msg", "")).strip()
if status_val == "1":
return True, msg or "ok"
return False, msg or f"status={status_val or 'unknown'}"
except urlerror.HTTPError as e:
body_txt = e.read().decode("utf-8", errors="replace")
return False, f"HTTP {e.code}: {body_txt[:300]}"
except Exception as e:
return False, str(e)
# -----------------------
# DB ops
# -----------------------
def db_init(cur):
cur.execute("""
CREATE TABLE IF NOT EXISTS remote_cam.import_job (
image_uuid text PRIMARY KEY,
status text NOT NULL DEFAULT 'NEW',
attempts integer NOT NULL DEFAULT 0,
has_categories boolean,
last_error text,
entrance_jpg_key text,
entrance_meta_key text,
entrance_cat_key text,
processed_jpg_key text,
thumbnail_key text,
needs_ocr_backfill boolean NOT NULL DEFAULT false,
needs_exif_backfill boolean NOT NULL DEFAULT false,
created_ts timestamptz NOT NULL DEFAULT now(),
updated_ts timestamptz NOT NULL DEFAULT now()
);
""")
cur.execute("""
ALTER TABLE remote_cam.import_job
ADD COLUMN IF NOT EXISTS needs_ocr_backfill boolean NOT NULL DEFAULT false;
""")
cur.execute("""
ALTER TABLE remote_cam.import_job
ADD COLUMN IF NOT EXISTS needs_exif_backfill boolean NOT NULL DEFAULT false;
""")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_ocr_idx ON remote_cam.import_job(needs_ocr_backfill);")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_exif_idx ON remote_cam.import_job(needs_exif_backfill);")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);")
cur.execute("""
CREATE OR REPLACE FUNCTION remote_cam.set_updated_ts()
RETURNS trigger AS $$
BEGIN
NEW.updated_ts = now();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
cur.execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'import_job_set_updated_ts') THEN
CREATE TRIGGER import_job_set_updated_ts
BEFORE UPDATE ON remote_cam.import_job
FOR EACH ROW EXECUTE FUNCTION remote_cam.set_updated_ts();
END IF;
END$$;
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS remote_cam.api_response (
push_ts timestamp without time zone,
message character(512),
image_uuid uuid,
success boolean
);
""")
def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]):
cur.execute("""
INSERT INTO remote_cam.import_job (image_uuid, status, entrance_jpg_key, entrance_meta_key, entrance_cat_key, has_categories)
VALUES (%s, 'NEW', %s, %s, %s, %s)
ON CONFLICT (image_uuid) DO UPDATE
SET entrance_jpg_key = EXCLUDED.entrance_jpg_key,
entrance_meta_key = EXCLUDED.entrance_meta_key,
entrance_cat_key = EXCLUDED.entrance_cat_key,
has_categories = EXCLUDED.has_categories;
""", (uuid, jpg_key, meta_key, cat_key, cat_key is not None))
def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, processed_key: Optional[str] = None, thumb_key: Optional[str] = None):
cur.execute("""
UPDATE remote_cam.import_job
SET status = %s,
attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END,
last_error = COALESCE(%s::text, last_error),
processed_jpg_key = COALESCE(%s, processed_jpg_key),
thumbnail_key = COALESCE(%s, thumbnail_key)
WHERE image_uuid = %s;
""", (status, status, err, processed_key, thumb_key, uuid))
def job_set_backfill(cur, uuid: str, *, needs_ocr_backfill: Optional[bool] = None, needs_exif_backfill: Optional[bool] = None):
if needs_ocr_backfill is None and needs_exif_backfill is None:
return
cur.execute("""
UPDATE remote_cam.import_job
SET needs_ocr_backfill = COALESCE(%s, needs_ocr_backfill),
needs_exif_backfill = COALESCE(%s, needs_exif_backfill)
WHERE image_uuid = %s;
""", (needs_ocr_backfill, needs_exif_backfill, uuid))
def job_get(cur, uuid: str):
cur.execute("""
SELECT status, processed_jpg_key, thumbnail_key, has_categories, needs_ocr_backfill, needs_exif_backfill
FROM remote_cam.import_job
WHERE image_uuid = %s;
""", (uuid,))
row = cur.fetchone()
if not row:
return None
return {
"status": row[0],
"processed_jpg_key": row[1],
"thumbnail_key": row[2],
"has_categories": row[3],
"needs_ocr_backfill": row[4],
"needs_exif_backfill": row[5],
}
STATUS_ORDER = [
"NEW",
"META_OK",
"OCR_OK",
"OCR_SKIPPED",
"EXIF_OK",
"EXIF_SKIPPED",
"MOVED",
"THUMB_OK",
"CATEGORIES_OK",
"API_FAIL",
"API_OK",
"CLEANED",
]
def status_rank(status: Optional[str]) -> int:
if status in STATUS_ORDER:
return STATUS_ORDER.index(status)
return -1
def is_terminal_success(status: Optional[str], cfg: AppConfig) -> bool:
if status is None:
return False
if status == "CLEANED":
return True
if cfg.enable_deermapper_api:
return status_rank(status) >= status_rank("API_OK")
return status_rank(status) >= status_rank("THUMB_OK")
def insert_resource(cur, uuid: str, typ: str):
cur.execute("""
INSERT INTO remote_cam.resource(resource_uuid, type, import_ts)
VALUES (%s, %s, now())
ON CONFLICT DO NOTHING;
""", (uuid, typ))
def insert_metadata(cur, uuid: str, meta: dict):
def get(k): return meta.get(k)
servertime = parse_servertime(meta)
icd_raw = get("ImageCreationDate")
icd = None
if icd_raw:
try:
# "06:03:01 30.06.2025" -> assume UTC if no tz info
icd = datetime.strptime(icd_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc)
except Exception:
icd = None
lat = get("Latitude")
lon = get("Longitude")
lat_f = float(lat) if lat not in (None, "") else None
lon_f = float(lon) if lon not in (None, "") else None
dls_raw = get("DateTimeLastSettings")
dls = None
if dls_raw:
try:
dls = datetime.strptime(dls_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc)
except Exception:
dls = None
cur.execute("""
INSERT INTO remote_cam.metadata(
metadata_uuid, import_ts, "ImageCreationDate", "SizeInByte", "ImageFormat", "ImageNight",
"DateTimeLastSettings", "BatteryStatus", "FirmwareVersion", "SignalStrength", "Temperature",
"CoordinateSwitch", "Latitude", "Longitude", "WorkPeriod", "WorkStart", "WorkEnd", "ThumbnailSize",
"PIRInterval", "TimeScan", imei, iccid, "INITKey", "CameraName", "CameraModel", "CameraLanguage",
"CameraNetwork", "MobileNetworkCode", "MobileCountryCode", "LocationAreaCode", "CustomerTimezone",
"CustomerLanguage", servertime, ocr_ts
) VALUES (
%s, now(), %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, NULL
)
ON CONFLICT (metadata_uuid) DO UPDATE SET
import_ts = EXCLUDED.import_ts,
"ImageCreationDate" = EXCLUDED."ImageCreationDate",
"SizeInByte" = EXCLUDED."SizeInByte",
"ImageFormat" = EXCLUDED."ImageFormat",
"ImageNight" = EXCLUDED."ImageNight",
"DateTimeLastSettings" = EXCLUDED."DateTimeLastSettings",
"BatteryStatus" = EXCLUDED."BatteryStatus",
"FirmwareVersion" = EXCLUDED."FirmwareVersion",
"SignalStrength" = EXCLUDED."SignalStrength",
"Temperature" = EXCLUDED."Temperature",
"CoordinateSwitch" = EXCLUDED."CoordinateSwitch",
"Latitude" = EXCLUDED."Latitude",
"Longitude" = EXCLUDED."Longitude",
"WorkPeriod" = EXCLUDED."WorkPeriod",
"WorkStart" = EXCLUDED."WorkStart",
"WorkEnd" = EXCLUDED."WorkEnd",
"ThumbnailSize" = EXCLUDED."ThumbnailSize",
"PIRInterval" = EXCLUDED."PIRInterval",
"TimeScan" = EXCLUDED."TimeScan",
imei = EXCLUDED.imei,
iccid = EXCLUDED.iccid,
"INITKey" = EXCLUDED."INITKey",
"CameraName" = EXCLUDED."CameraName",
"CameraModel" = EXCLUDED."CameraModel",
"CameraLanguage" = EXCLUDED."CameraLanguage",
"CameraNetwork" = EXCLUDED."CameraNetwork",
"MobileNetworkCode" = EXCLUDED."MobileNetworkCode",
"MobileCountryCode" = EXCLUDED."MobileCountryCode",
"LocationAreaCode" = EXCLUDED."LocationAreaCode",
"CustomerTimezone" = EXCLUDED."CustomerTimezone",
"CustomerLanguage" = EXCLUDED."CustomerLanguage",
servertime = EXCLUDED.servertime;
""", (
uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"),
dls, get("BatteryStatus"), get("FirmwareVersion"),
get("SignalStrength"), get("Temperature"),
get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"),
get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"),
get("PIRInterval"), get("TimeScan"), get("imei"), get("iccid"), get("INITKey"),
get("CameraName"), get("CameraModel"), get("CameraLanguage"), get("CameraNetwork"),
get("MobileNetworkCode"), get("MobileCountryCode"), get("LocationAreaCode"),
get("CustomerTimezone"), get("CustomerLanguage"), servertime
))
def update_ocr_ts(cur, uuid: str, ocr_ts: datetime):
cur.execute("""
UPDATE remote_cam.metadata
SET ocr_ts = %s
WHERE metadata_uuid = %s;
""", (ocr_ts, uuid))
def get_ocr_ts(cur, uuid: str) -> Optional[datetime]:
cur.execute("""
SELECT ocr_ts
FROM remote_cam.metadata
WHERE metadata_uuid = %s;
""", (uuid,))
row = cur.fetchone()
return row[0] if row else None
def insert_categories(cur, uuid: str, cat_json: dict):
version = cat_json.get("version")
cats = cat_json.get("categories") or []
cur.execute("DELETE FROM remote_cam.category WHERE image_uuid = %s;", (uuid,))
for c in cats:
cur.execute("""
INSERT INTO remote_cam.category(image_uuid, category, score, version)
VALUES (%s, %s, %s, %s);
""", (uuid, c.get("category"), c.get("score"), version))
def insert_api_response(cur, uuid: str, success: bool, message: str):
msg = (message or "")[:512]
cur.execute("""
INSERT INTO remote_cam.api_response(push_ts, message, image_uuid, success)
VALUES (now(), %s, %s::uuid, %s);
""", (msg, uuid, success))
# -----------------------
# S3 ops wrapper
# -----------------------
class S3:
def __init__(self, cfg: AppConfig):
self.cfg = cfg
self.client = boto3.client(
"s3",
endpoint_url=cfg.s3_endpoint,
aws_access_key_id=cfg.s3_access_key,
aws_secret_access_key=cfg.s3_secret_key,
config=BotoClientConfig(max_pool_connections=cfg.s3_max_pool_connections),
)
def list_entrance_objects(self):
paginator = self.client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=self.cfg.s3_bucket, Prefix=self.cfg.entrance_prefix):
for obj in page.get("Contents", []):
yield obj
def get_bytes(self, key: str) -> bytes:
resp = self.client.get_object(Bucket=self.cfg.s3_bucket, Key=key)
return resp["Body"].read()
def put_bytes(self, key: str, data: bytes, content_type: str):
self.client.put_object(Bucket=self.cfg.s3_bucket, Key=key, Body=data, ContentType=content_type)
def head(self, key: str):
return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key)
def delete_keys(self, keys) -> int:
if not keys:
return 0
resp = self.client.delete_objects(
Bucket=self.cfg.s3_bucket,
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": False},
)
errors = resp.get("Errors") or []
if errors:
details = "; ".join(
f"{e.get('Key')}:{e.get('Code')}:{e.get('Message')}" for e in errors[:3]
)
raise RuntimeError(f"S3 delete failed for {len(errors)} object(s): {details}")
deleted = resp.get("Deleted") or []
return len(deleted)
# -----------------------
# Main processing
# -----------------------
@dataclass
class EntranceSet:
uuid: str
jpg_key: str
meta_key: str
cat_key: Optional[str]
lm_jpg: datetime
lm_meta: datetime
lm_cat: Optional[datetime]
def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]:
groups: Dict[str, Dict[str, Tuple[str, datetime]]] = {}
for obj in s3.list_entrance_objects():
key = obj["Key"]
name = key.split("/")[-1]
lm = obj["LastModified"]
m1 = UUID_RE.match(name)
if m1:
uuid = m1.group(1)
ext = m1.group(2)
typ = "jpg" if ext == "jpg" else "meta"
groups.setdefault(uuid, {})[typ] = (key, lm)
continue
m2 = CAT_RE.match(name)
if m2:
uuid = m2.group(1)
groups.setdefault(uuid, {})["cat"] = (key, lm)
continue
ready: list[EntranceSet] = []
for uuid, parts in groups.items():
if "jpg" in parts and "meta" in parts:
jpg_key, lm_j = parts["jpg"]
meta_key, lm_m = parts["meta"]
cat = parts.get("cat")
cat_key = cat[0] if cat else None
lm_c = cat[1] if cat else None
if not (is_old_enough(lm_j, cfg.min_age_seconds) and is_old_enough(lm_m, cfg.min_age_seconds)):
continue
if cat and not is_old_enough(lm_c, cfg.min_age_seconds):
continue
ready.append(EntranceSet(uuid, jpg_key, meta_key, cat_key, lm_j, lm_m, lm_c))
return ready
def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
job = job_get(cur, s.uuid) or {}
status_now = job.get("status", "NEW")
# 1) metadata.json -> DB
meta = json.loads(s3.get_bytes(s.meta_key))
insert_metadata(cur, s.uuid, meta)
insert_resource(cur, s.uuid, "metadata")
if status_rank("META_OK") >= status_rank(status_now):
job_set_status(cur, s.uuid, "META_OK")
status_now = "META_OK"
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
jpg_bytes = s3.get_bytes(s.jpg_key)
ocr_ts = get_ocr_ts(cur, s.uuid)
ocr_needs_run = cfg.enable_ocr and (
status_rank(status_now) < status_rank("OCR_OK") or status_now == "OCR_SKIPPED"
)
if ocr_needs_run:
try:
customer_tzinfo, _ = parse_customer_tz(meta)
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
update_ocr_ts(cur, s.uuid, ocr_ts)
insert_resource(cur, s.uuid, "image")
job_set_backfill(cur, s.uuid, needs_ocr_backfill=False)
job_set_status(cur, s.uuid, "OCR_OK")
status_now = "OCR_OK"
except Exception as e:
job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e))
status_now = "OCR_FAIL"
elif not cfg.enable_ocr:
job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
if status_rank(status_now) < status_rank("OCR_SKIPPED"):
job_set_status(cur, s.uuid, "OCR_SKIPPED")
status_now = "OCR_SKIPPED"
# 3) EXIF write:
# DateTimeOriginal = ocr_ts (local, keep offset tags)
# DateTimeDigitized = servertime (UTC)
jpg_exif = jpg_bytes
if not cfg.enable_exif:
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
if status_rank(status_now) < status_rank("EXIF_SKIPPED"):
job_set_status(cur, s.uuid, "EXIF_SKIPPED")
status_now = "EXIF_SKIPPED"
elif status_rank(status_now) < status_rank("MOVED"):
if ocr_ts is None:
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing")
status_now = "EXIF_SKIPPED"
else:
try:
servertime = parse_servertime(meta)
jpg_exif = exif_write_with_exiftool(
jpg_bytes,
ocr_ts,
servertime,
timeout_seconds=cfg.exiftool_timeout_seconds,
)
job_set_backfill(cur, s.uuid, needs_exif_backfill=False)
job_set_status(cur, s.uuid, "EXIF_OK")
status_now = "EXIF_OK"
except Exception as e:
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e))
status_now = "EXIF_FAIL"
# 4) Upload processed JPG (EXIF patched)
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{s.uuid}.jpg"
if status_rank(status_now) < status_rank("MOVED"):
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
s3.head(processed_key)
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
status_now = "MOVED"
# 5) Thumbnail
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{s.uuid}.jpg"
if status_rank(status_now) < status_rank("THUMB_OK"):
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
jpg_exif = s3.get_bytes(processed_key)
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
s3.head(thumb_key)
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
status_now = "THUMB_OK"
# 6) Optional categories
if s.cat_key and status_rank(status_now) < status_rank("CATEGORIES_OK"):
cat_json = json.loads(s3.get_bytes(s.cat_key))
insert_categories(cur, s.uuid, cat_json)
insert_resource(cur, s.uuid, "classification")
job_set_status(cur, s.uuid, "CATEGORIES_OK")
status_now = "CATEGORIES_OK"
# 7) Optional DeerMapper API push (image + metadata)
if cfg.enable_deermapper_api and status_rank(status_now) < status_rank("API_OK"):
api_key = (cfg.deermapper_api_key or "").strip()
api_url = (cfg.deermapper_api_base_url or "").strip()
if not api_key or not api_url:
err_msg = "DeerMapper API config missing"
job_set_status(cur, s.uuid, "API_FAIL", err=err_msg)
insert_api_response(cur, s.uuid, False, err_msg)
status_now = "API_FAIL"
else:
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
jpg_exif = s3.get_bytes(processed_key)
imei = str(meta.get("imei") or "").strip()
ok, msg = push_to_deermapper_api(cfg, s.uuid, imei, meta, jpg_exif)
if ok:
insert_api_response(cur, s.uuid, True, msg)
job_set_status(cur, s.uuid, "API_OK")
status_now = "API_OK"
else:
insert_api_response(cur, s.uuid, False, msg)
job_set_status(cur, s.uuid, "API_FAIL", err=msg)
status_now = "API_FAIL"
# 8) Cleanup entrance only after verify (+ API when enabled)
api_ready_for_cleanup = (not cfg.enable_deermapper_api) or status_rank(status_now) >= status_rank("API_OK")
if status_rank(status_now) >= status_rank("THUMB_OK") and api_ready_for_cleanup:
try:
s3.head(processed_key)
s3.head(thumb_key)
to_delete = [s.jpg_key, s.meta_key]
if s.cat_key:
to_delete.append(s.cat_key)
s3.delete_keys(to_delete)
job_set_status(cur, s.uuid, "CLEANED")
status_now = "CLEANED"
except Exception as e:
job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e))
def is_completed_job_materialized(cur, s3: S3, cfg: AppConfig, uuid: str) -> bool:
job = job_get(cur, uuid)
if not job:
return False
if status_rank(job.get("status")) < status_rank("THUMB_OK"):
return False
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{uuid}.jpg"
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{uuid}.jpg"
try:
s3.head(processed_key)
s3.head(thumb_key)
except Exception:
return False
return True
def cleanup_entrance_set(s3: S3, s: EntranceSet) -> int:
to_delete = [s.jpg_key, s.meta_key]
if s.cat_key:
to_delete.append(s.cat_key)
return s3.delete_keys(to_delete)
def _process_ready_batch(cfg: AppConfig, batch: list[EntranceSet]):
s3c = S3(cfg)
with psycopg.connect(cfg.pg_dsn) as db:
for s in batch:
start = time.perf_counter()
result = "failure"
final_status = "UNKNOWN"
err_text = None
cleanup_deleted = None
try:
skip_duplicate = False
with db.transaction():
with db.cursor() as cur:
if is_completed_job_materialized(cur, s3c, cfg, s.uuid):
skip_duplicate = True
else:
process_one(cur, s3c, cfg, s)
final_status = (job_get(cur, s.uuid) or {}).get("status", "UNKNOWN")
result = "success" if is_terminal_success(final_status, cfg) else "failure"
if skip_duplicate:
cleanup_deleted = cleanup_entrance_set(s3c, s)
final_status = "DUPLICATE_CLEANED"
result = "success"
except Exception as e:
with db.transaction():
with db.cursor() as cur:
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
job_set_status(cur, s.uuid, "ERROR", err=str(e))
final_status = "ERROR"
err_text = str(e)
elapsed_ms = int((time.perf_counter() - start) * 1000)
if err_text:
print(
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
f"process_ms={elapsed_ms} error={err_text}"
)
elif cleanup_deleted is not None:
print(
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
f"process_ms={elapsed_ms} cleanup_deleted={cleanup_deleted}"
)
else:
print(f"[RESULT] uuid={s.uuid} result={result} status={final_status} process_ms={elapsed_ms}")
if cfg.job_sleep_seconds > 0:
time.sleep(cfg.job_sleep_seconds)
def run_once(cfg: AppConfig) -> int:
s3c = S3(cfg)
ready = collect_ready_sets(s3c, cfg)
if not ready:
return 0
workers = max(1, min(cfg.parallel_workers, len(ready)))
if workers == 1:
_process_ready_batch(cfg, ready)
return len(ready)
batches: list[list[EntranceSet]] = [[] for _ in range(workers)]
for i, s in enumerate(ready):
batches[i % workers].append(s)
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = [pool.submit(_process_ready_batch, cfg, batch) for batch in batches if batch]
for future in as_completed(futures):
future.result()
return len(ready)
def main():
cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml")
cfg = load_config(cfg_path)
with psycopg.connect(cfg.pg_dsn) as db:
with db.cursor() as cur:
db_init(cur)
db.commit()
while True:
n = run_once(cfg)
if n == 0:
time.sleep(cfg.poll_seconds)
if __name__ == "__main__":
main()