commit c8f1ca4f4e9ef91523e1b54adc8cab852149424d Author: dom Date: Fri Feb 6 11:44:32 2026 +0100 starting out diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d1a15e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc +*.pyo +__pycache__/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..64cc1b1 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# melesICUmover \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..431bdea --- /dev/null +++ b/config.yaml @@ -0,0 +1,20 @@ +# config.yaml (Beispiel) +s3: + endpoint: "https://" + access_key: "" + secret_key: "" + bucket: "" + +postgres: + dsn: "postgresql://user:pass@host:5432/dbname" + +app: + entrance_prefix: "icu/entrance/" + processed_prefix: "icu/processed/" + thumb_prefix: "icu/thumbnails/" + min_age_seconds: 90 + poll_seconds: 30 + # OCR crop tunables (defaults are usually fine) + ocr_crop_w_frac: 0.42 + ocr_crop_h_frac: 0.22 + thumb_max: 512 diff --git a/main.py b/main.py new file mode 100644 index 0000000..48068cd --- /dev/null +++ b/main.py @@ -0,0 +1,568 @@ +# ingest.py +#!/usr/bin/env python3 +import os +import re +import json +import time +import tempfile +import subprocess +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional, Dict, Tuple + +import boto3 +import psycopg +from dateutil import tz +from PIL import Image, ImageOps, ImageEnhance +import pytesseract +import yaml + + +# ----------------------- +# Config +# ----------------------- +UUID_RE = re.compile(r"^([0-9a-fA-F-]{36})\.(jpg|json)$") +CAT_RE = re.compile(r"^([0-9a-fA-F-]{36})_categories\.json$") + +# Overlay fixed: "HH:MM:SS DD-MM-YYYY" +OCR_DT_RE = re.compile( + r"(?P\d{2}):(?P\d{2}):(?P\d{2})\s+(?P\d{2})-(?P\d{2})-(?P\d{4})" +) + + +@dataclass +class AppConfig: + s3_endpoint: str + s3_access_key: str + s3_secret_key: str + s3_bucket: str + + pg_dsn: str + + entrance_prefix: str = "icu/entrance/" + processed_prefix: str = "icu/processed/" + thumb_prefix: str = "icu/thumbnails/" + + min_age_seconds: int = 90 + poll_seconds: int = 30 + + # OCR crop settings (tune if needed) + ocr_crop_w_frac: float = 0.42 # right ~42% of width + ocr_crop_h_frac: float = 0.22 # bottom ~22% of height + + thumb_max: int = 512 + + +def load_config(path: str) -> AppConfig: + with open(path, "r", encoding="utf-8") as f: + raw = yaml.safe_load(f) + + # allow env override for secret injection if desired + def env_or(key: str, default=None): + return os.environ.get(key, default) + + s3 = raw["s3"] + pg = raw["postgres"] + app = raw.get("app", {}) + + return AppConfig( + s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]), + s3_access_key=env_or("S3_ACCESS_KEY", s3["access_key"]), + s3_secret_key=env_or("S3_SECRET_KEY", s3["secret_key"]), + s3_bucket=env_or("S3_BUCKET", s3["bucket"]), + pg_dsn=env_or("PG_DSN", pg["dsn"]), + entrance_prefix=app.get("entrance_prefix", "icu/entrance/"), + processed_prefix=app.get("processed_prefix", "icu/processed/"), + thumb_prefix=app.get("thumb_prefix", "icu/thumbnails/"), + min_age_seconds=int(app.get("min_age_seconds", 90)), + poll_seconds=int(app.get("poll_seconds", 30)), + ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)), + ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)), + thumb_max=int(app.get("thumb_max", 512)), + ) + + +# ----------------------- +# Helpers +# ----------------------- +def is_old_enough(last_modified, min_age_seconds: int) -> bool: + now = datetime.now(timezone.utc) + return (now - last_modified).total_seconds() >= min_age_seconds + + +def to_exif_datetime(dt: datetime) -> str: + return dt.strftime("%Y:%m:%d %H:%M:%S") + + +def tz_offset_str(dt: datetime) -> str: + off = dt.utcoffset() + if off is None: + off = timezone.utc.utcoffset(dt) + total = int(off.total_seconds()) + sign = "+" if total >= 0 else "-" + total = abs(total) + hh = total // 3600 + mm = (total % 3600) // 60 + return f"{sign}{hh:02d}:{mm:02d}" + + +def parse_servertime(meta: dict) -> datetime: + s = meta.get("servertime") + if not s: + raise ValueError("metadata.servertime missing") + + # Example: "2025-06-30T04:03:04+0000" + if re.match(r".*[+-]\d{4}$", s): + return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z") + # Fallback: ISO-ish + return datetime.fromisoformat(s) + + +def parse_customer_tz(meta: dict): + # CustomerTimezone applies to OCR (camera overlay) + name = meta.get("CustomerTimezone") or "UTC" + z = tz.gettz(name) + if z is None: + raise ValueError(f"Unknown CustomerTimezone: {name}") + return z, name + + +def ocr_extract_timestamp(jpg_bytes: bytes, crop_w_frac: float, crop_h_frac: float, customer_tzinfo) -> datetime: + from io import BytesIO + im = Image.open(BytesIO(jpg_bytes)).convert("RGB") + + w, h = im.size + x0 = int(w * (1.0 - crop_w_frac)) + y0 = int(h * (1.0 - crop_h_frac)) + crop = im.crop((x0, y0, w, h)) + + gray = ImageOps.grayscale(crop) + gray = ImageOps.autocontrast(gray) + gray = ImageEnhance.Contrast(gray).enhance(2.5) + gray = ImageEnhance.Sharpness(gray).enhance(2.0) + + bw = gray.point(lambda p: 255 if p > 180 else 0) + + cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- ' + text = pytesseract.image_to_string(bw, config=cfg).strip() + + m = OCR_DT_RE.search(text) + if not m: + # retry without thresholding + text2 = pytesseract.image_to_string(gray, config=cfg).strip() + m = OCR_DT_RE.search(text2) + if not m: + raise ValueError(f"OCR timestamp not found. OCR1='{text}' OCR2='{text2}'") + + dt_local_naive = datetime( + int(m.group("y")), + int(m.group("mo")), + int(m.group("d")), + int(m.group("h")), + int(m.group("m")), + int(m.group("s")), + ) + + # Keep camera-local timezone (no UTC conversion requested) + return dt_local_naive.replace(tzinfo=customer_tzinfo) + + +def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime) -> bytes: + """ + DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal + DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00 + """ + dto = to_exif_datetime(dt_original_local) + oto = tz_offset_str(dt_original_local) + + dtd = to_exif_datetime(dt_digitized_utc.astimezone(timezone.utc)) + otd = "+00:00" + + with tempfile.TemporaryDirectory() as td: + in_path = os.path.join(td, "in.jpg") + with open(in_path, "wb") as f: + f.write(jpg_in) + + cmd = [ + "exiftool", + "-overwrite_original", + f"-DateTimeOriginal={dto}", + f"-OffsetTimeOriginal={oto}", + f"-DateTimeDigitized={dtd}", + f"-OffsetTimeDigitized={otd}", + f"-ModifyDate={dtd}", + f"-OffsetTime={otd}", + in_path, + ] + subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + with open(in_path, "rb") as f: + return f.read() + + +def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes: + from io import BytesIO + im = Image.open(BytesIO(jpg_bytes)).convert("RGB") + im.thumbnail((thumb_max, thumb_max)) + out = BytesIO() + im.save(out, format="JPEG", quality=85, optimize=True) + return out.getvalue() + + +# ----------------------- +# DB ops +# ----------------------- +def db_init(cur): + cur.execute(""" + CREATE TABLE IF NOT EXISTS remote_cam.import_job ( + image_uuid text PRIMARY KEY, + status text NOT NULL DEFAULT 'NEW', + attempts integer NOT NULL DEFAULT 0, + has_categories boolean, + last_error text, + entrance_jpg_key text, + entrance_meta_key text, + entrance_cat_key text, + processed_jpg_key text, + thumbnail_key text, + created_ts timestamptz NOT NULL DEFAULT now(), + updated_ts timestamptz NOT NULL DEFAULT now() + ); + """) + cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);") + cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);") + + cur.execute(""" + CREATE OR REPLACE FUNCTION remote_cam.set_updated_ts() + RETURNS trigger AS $$ + BEGIN + NEW.updated_ts = now(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """) + cur.execute(""" + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'import_job_set_updated_ts') THEN + CREATE TRIGGER import_job_set_updated_ts + BEFORE UPDATE ON remote_cam.import_job + FOR EACH ROW EXECUTE FUNCTION remote_cam.set_updated_ts(); + END IF; + END$$; + """) + + +def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]): + cur.execute(""" + INSERT INTO remote_cam.import_job (image_uuid, status, entrance_jpg_key, entrance_meta_key, entrance_cat_key, has_categories) + VALUES (%s, 'NEW', %s, %s, %s, %s) + ON CONFLICT (image_uuid) DO UPDATE + SET entrance_jpg_key = EXCLUDED.entrance_jpg_key, + entrance_meta_key = EXCLUDED.entrance_meta_key, + entrance_cat_key = EXCLUDED.entrance_cat_key, + has_categories = EXCLUDED.has_categories; + """, (uuid, jpg_key, meta_key, cat_key, cat_key is not None)) + + +def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, processed_key: Optional[str] = None, thumb_key: Optional[str] = None): + cur.execute(""" + UPDATE remote_cam.import_job + SET status = %s, + attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END, + last_error = CASE WHEN %s IS NULL THEN last_error ELSE %s END, + processed_jpg_key = COALESCE(%s, processed_jpg_key), + thumbnail_key = COALESCE(%s, thumbnail_key) + WHERE image_uuid = %s; + """, (status, status, err, err, err, processed_key, thumb_key, uuid)) + + +def insert_resource(cur, uuid: str, typ: str): + cur.execute(""" + INSERT INTO remote_cam.resource(resource_uuid, type, import_ts) + VALUES (%s, %s, now()) + ON CONFLICT DO NOTHING; + """, (uuid, typ)) + + +def insert_metadata(cur, uuid: str, meta: dict): + def get(k): return meta.get(k) + + servertime = parse_servertime(meta) + + icd_raw = get("ImageCreationDate") + icd = None + if icd_raw: + try: + # "06:03:01 30.06.2025" -> assume UTC if no tz info + icd = datetime.strptime(icd_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc) + except Exception: + icd = None + + lat = get("Latitude") + lon = get("Longitude") + lat_f = float(lat) if lat not in (None, "") else None + lon_f = float(lon) if lon not in (None, "") else None + + # NOTE: DateTimeLastSettings looks like "05:45:35 30.06.2025" in sample; parse if needed later. + cur.execute(""" + INSERT INTO remote_cam.metadata( + metadata_uuid, import_ts, ImageCreationDate, SizeInByte, ImageFormat, ImageNight, + DateTimeLastSettings, BatteryStatus, FirmwareVersion, SignalStrength, Temperature, + CoordinateSwitch, Latitude, Longitude, WorkPeriod, WorkStart, WorkEnd, ThumbnailSize, + PIRInterval, TimeScan, imei, iccid, INITKey, CameraName, CameraModel, CameraLanguage, + CameraNetwork, MobileNetworkCode, MobileCountryCode, LocationAreaCode, CustomerTimezone, + CustomerLanguage, servertime, ocr_ts + ) VALUES ( + %s, now(), %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, + %s, %s, NULL + ) + ON CONFLICT (metadata_uuid) DO UPDATE SET + import_ts = EXCLUDED.import_ts, + ImageCreationDate = EXCLUDED.ImageCreationDate, + SizeInByte = EXCLUDED.SizeInByte, + ImageFormat = EXCLUDED.ImageFormat, + ImageNight = EXCLUDED.ImageNight, + DateTimeLastSettings = EXCLUDED.DateTimeLastSettings, + BatteryStatus = EXCLUDED.BatteryStatus, + FirmwareVersion = EXCLUDED.FirmwareVersion, + SignalStrength = EXCLUDED.SignalStrength, + Temperature = EXCLUDED.Temperature, + CoordinateSwitch = EXCLUDED.CoordinateSwitch, + Latitude = EXCLUDED.Latitude, + Longitude = EXCLUDED.Longitude, + WorkPeriod = EXCLUDED.WorkPeriod, + WorkStart = EXCLUDED.WorkStart, + WorkEnd = EXCLUDED.WorkEnd, + ThumbnailSize = EXCLUDED.ThumbnailSize, + PIRInterval = EXCLUDED.PIRInterval, + TimeScan = EXCLUDED.TimeScan, + imei = EXCLUDED.imei, + iccid = EXCLUDED.iccid, + INITKey = EXCLUDED.INITKey, + CameraName = EXCLUDED.CameraName, + CameraModel = EXCLUDED.CameraModel, + CameraLanguage = EXCLUDED.CameraLanguage, + CameraNetwork = EXCLUDED.CameraNetwork, + MobileNetworkCode = EXCLUDED.MobileNetworkCode, + MobileCountryCode = EXCLUDED.MobileCountryCode, + LocationAreaCode = EXCLUDED.LocationAreaCode, + CustomerTimezone = EXCLUDED.CustomerTimezone, + CustomerLanguage = EXCLUDED.CustomerLanguage, + servertime = EXCLUDED.servertime; + """, ( + uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"), + get("DateTimeLastSettings"), get("BatteryStatus"), get("FirmwareVersion"), + get("SignalStrength"), get("Temperature"), + get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"), + get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"), + get("PIRInterval"), get("TimeScan"), get("imei"), get("iccid"), get("INITKey"), + get("CameraName"), get("CameraModel"), get("CameraLanguage"), get("CameraNetwork"), + get("MobileNetworkCode"), get("MobileCountryCode"), get("LocationAreaCode"), + get("CustomerTimezone"), get("CustomerLanguage"), servertime + )) + + +def update_ocr_ts(cur, uuid: str, ocr_ts: datetime): + cur.execute(""" + UPDATE remote_cam.metadata + SET ocr_ts = %s + WHERE metadata_uuid = %s; + """, (ocr_ts, uuid)) + + +def insert_categories(cur, uuid: str, cat_json: dict): + version = cat_json.get("version") + cats = cat_json.get("categories") or [] + cur.execute("DELETE FROM remote_cam.category WHERE image_uuid = %s;", (uuid,)) + for c in cats: + cur.execute(""" + INSERT INTO remote_cam.category(image_uuid, category, score, version) + VALUES (%s, %s, %s, %s); + """, (uuid, c.get("category"), c.get("score"), version)) + + +# ----------------------- +# S3 ops wrapper +# ----------------------- +class S3: + def __init__(self, cfg: AppConfig): + self.cfg = cfg + self.client = boto3.client( + "s3", + endpoint_url=cfg.s3_endpoint, + aws_access_key_id=cfg.s3_access_key, + aws_secret_access_key=cfg.s3_secret_key, + ) + + def list_entrance_objects(self): + paginator = self.client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=self.cfg.s3_bucket, Prefix=self.cfg.entrance_prefix): + for obj in page.get("Contents", []): + yield obj + + def get_bytes(self, key: str) -> bytes: + resp = self.client.get_object(Bucket=self.cfg.s3_bucket, Key=key) + return resp["Body"].read() + + def put_bytes(self, key: str, data: bytes, content_type: str): + self.client.put_object(Bucket=self.cfg.s3_bucket, Key=key, Body=data, ContentType=content_type) + + def head(self, key: str): + return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key) + + def delete_keys(self, keys): + if not keys: + return + self.client.delete_objects( + Bucket=self.cfg.s3_bucket, + Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True}, + ) + + +# ----------------------- +# Main processing +# ----------------------- +@dataclass +class EntranceSet: + uuid: str + jpg_key: str + meta_key: str + cat_key: Optional[str] + lm_jpg: datetime + lm_meta: datetime + lm_cat: Optional[datetime] + + +def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]: + groups: Dict[str, Dict[str, Tuple[str, datetime]]] = {} + for obj in s3.list_entrance_objects(): + key = obj["Key"] + name = key.split("/")[-1] + lm = obj["LastModified"] + + m1 = UUID_RE.match(name) + if m1: + uuid = m1.group(1) + ext = m1.group(2) + typ = "jpg" if ext == "jpg" else "meta" + groups.setdefault(uuid, {})[typ] = (key, lm) + continue + + m2 = CAT_RE.match(name) + if m2: + uuid = m2.group(1) + groups.setdefault(uuid, {})["cat"] = (key, lm) + continue + + ready: list[EntranceSet] = [] + for uuid, parts in groups.items(): + if "jpg" in parts and "meta" in parts: + jpg_key, lm_j = parts["jpg"] + meta_key, lm_m = parts["meta"] + cat = parts.get("cat") + cat_key = cat[0] if cat else None + lm_c = cat[1] if cat else None + + if not (is_old_enough(lm_j, cfg.min_age_seconds) and is_old_enough(lm_m, cfg.min_age_seconds)): + continue + if cat and not is_old_enough(lm_c, cfg.min_age_seconds): + continue + + ready.append(EntranceSet(uuid, jpg_key, meta_key, cat_key, lm_j, lm_m, lm_c)) + return ready + + +def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet): + job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key) + + # 1) metadata.json -> DB + meta = json.loads(s3.get_bytes(s.meta_key)) + insert_metadata(cur, s.uuid, meta) + insert_resource(cur, s.uuid, "metadata") + job_set_status(cur, s.uuid, "META_OK") + + # 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone + jpg_bytes = s3.get_bytes(s.jpg_key) + customer_tzinfo, _ = parse_customer_tz(meta) + ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo) + update_ocr_ts(cur, s.uuid, ocr_ts) + insert_resource(cur, s.uuid, "image") + job_set_status(cur, s.uuid, "OCR_OK") + + # 3) EXIF write: + # DateTimeOriginal = ocr_ts (local, keep offset tags) + # DateTimeDigitized = servertime (UTC) + servertime = parse_servertime(meta) + jpg_exif = exif_write_with_exiftool(jpg_bytes, ocr_ts, servertime) + job_set_status(cur, s.uuid, "EXIF_OK") + + # 4) Upload processed JPG (EXIF patched) + processed_key = f"{cfg.processed_prefix}{s.uuid}.jpg" + s3.put_bytes(processed_key, jpg_exif, "image/jpeg") + s3.head(processed_key) + job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key) + + # 5) Thumbnail + thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max) + thumb_key = f"{cfg.thumb_prefix}{s.uuid}.jpg" + s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg") + s3.head(thumb_key) + job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key) + + # 6) Optional categories + if s.cat_key: + cat_json = json.loads(s3.get_bytes(s.cat_key)) + insert_categories(cur, s.uuid, cat_json) + insert_resource(cur, s.uuid, "classification") + job_set_status(cur, s.uuid, "CATEGORIES_OK") + + # 7) Cleanup entrance only after verify + to_delete = [s.jpg_key, s.meta_key] + if s.cat_key: + to_delete.append(s.cat_key) + s3.delete_keys(to_delete) + job_set_status(cur, s.uuid, "CLEANED") + + +def run_once(cfg: AppConfig) -> int: + s3c = S3(cfg) + ready = collect_ready_sets(s3c, cfg) + if not ready: + return 0 + + with psycopg.connect(cfg.pg_dsn) as db: + with db.cursor() as cur: + db_init(cur) + db.commit() + + for s in ready: + try: + with db.transaction(): + with db.cursor() as cur: + process_one(cur, s3c, cfg, s) + except Exception as e: + with db.transaction(): + with db.cursor() as cur: + job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key) + job_set_status(cur, s.uuid, "ERROR", err=str(e)) + print(f"[ERROR] {s.uuid}: {e}") + return len(ready) + + +def main(): + cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml") + cfg = load_config(cfg_path) + + while True: + n = run_once(cfg) + if n == 0: + time.sleep(cfg.poll_seconds) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4d16b42 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,5 @@ +[project] +name = "melesICUmover" +version = "0.1.0" +requires-python = ">= 3.11.9" +dependencies = []