Compare commits

..

3 Commits

5 changed files with 325 additions and 589 deletions

View File

@@ -8,6 +8,10 @@ s3:
postgres: postgres:
dsn: "postgresql://postgres:DFGk5s9H21lKao2K@136.243.41.58:7777/meles" dsn: "postgresql://postgres:DFGk5s9H21lKao2K@136.243.41.58:7777/meles"
deermapper-api:
base_url: "https://webapp.deermapper.net/api/icu"
apiKey: "695bc217-3b40-48bb-bb12-17fc5b08b320"
app: app:
entrance_prefix: "icu/entrance/" entrance_prefix: "icu/entrance/"
processed_prefix: "icu/processed/" processed_prefix: "icu/processed/"
@@ -15,8 +19,17 @@ app:
min_age_seconds: 90 min_age_seconds: 90
poll_seconds: 30 poll_seconds: 30
# OCR crop tunables (defaults are usually fine) # OCR crop tunables (defaults are usually fine)
ocr_crop_w_frac: 0.42 ocr_crop_w_frac: 0.63
ocr_crop_h_frac: 0.22 ocr_crop_h_frac: 0.05
thumb_max: 512 thumb_max: 512
exiftool_timeout_seconds: 15 exiftool_timeout_seconds: 15
job_sleep_seconds: 1.0 job_sleep_seconds: 0.0
parallel_workers: 6
s3_max_pool_connections: 48
# Optional processing switches:
# - false => image still gets moved + thumbnail, and backfill flag is set in import_job
enable_ocr: true
enable_exif: true
enable_deermapper_api: false
deermapper_api_timeout_seconds: 20
deermapper_api_image_field: "image"

View File

@@ -11,22 +11,3 @@ services:
volumes: volumes:
- ./:/app - ./:/app
- ./config.yaml:/app/config.yaml:ro - ./config.yaml:/app/config.yaml:ro
melesicu_shiny:
build: ./shiny-app
container_name: melesicu_shiny
restart: unless-stopped
environment:
PG_DSN: ${PG_DSN}
S3_BUCKET: ${S3_BUCKET}
ENTRANCE_PREFIX: "icu/entrance/"
PROCESSED_PREFIX: "icu/processed/"
THUMB_PREFIX: "icu/thumbnails/"
INVENTORY_TABLE: "cam_inventory"
DEPLOYMENT_TABLE: "cam_deployments"
AWS_ACCESS_KEY_ID: ${S3_ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${S3_SECRET_KEY}
AWS_DEFAULT_REGION: "us-east-1"
AWS_S3_ENDPOINT: ${S3_ENDPOINT}
AWS_S3_ENDPOINT_URL: ${S3_ENDPOINT}
ports:
- "3838:3838"

337
main.py
View File

@@ -6,12 +6,17 @@ import json
import time import time
import tempfile import tempfile
import subprocess import subprocess
import uuid as uuidlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional, Dict, Tuple from typing import Optional, Dict, Tuple
from urllib import error as urlerror
from urllib import request as urlrequest
import boto3 import boto3
import psycopg import psycopg
from botocore.config import Config as BotoClientConfig
from dateutil import tz from dateutil import tz
from PIL import Image, ImageOps, ImageEnhance from PIL import Image, ImageOps, ImageEnhance
import pytesseract import pytesseract
@@ -65,7 +70,16 @@ class AppConfig:
thumb_max: int = 512 thumb_max: int = 512
exiftool_timeout_seconds: int = 15 exiftool_timeout_seconds: int = 15
job_sleep_seconds: float = 1.0 job_sleep_seconds: float = 0.0
parallel_workers: int = 4
s3_max_pool_connections: int = 32
enable_ocr: bool = True
enable_exif: bool = True
enable_deermapper_api: bool = False
deermapper_api_base_url: str = "https://webapp.deermapper.net/api/icu"
deermapper_api_key: str = ""
deermapper_api_timeout_seconds: int = 20
deermapper_api_image_field: str = "image"
def load_config(path: str) -> AppConfig: def load_config(path: str) -> AppConfig:
@@ -76,9 +90,26 @@ def load_config(path: str) -> AppConfig:
def env_or(key: str, default=None): def env_or(key: str, default=None):
return os.environ.get(key, default) return os.environ.get(key, default)
def as_bool(value, default: bool = True) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in ("1", "true", "yes", "on")
return bool(value)
s3 = raw["s3"] s3 = raw["s3"]
pg = raw["postgres"] pg = raw["postgres"]
app = raw.get("app", {}) app = raw.get("app", {})
deermapper = raw.get("deermapper-api", raw.get("deermapper_api", {}))
default_workers = max(1, min(16, (os.cpu_count() or 2) * 2))
parallel_workers = int(app.get("parallel_workers", default_workers))
parallel_workers = max(1, parallel_workers)
s3_pool = int(app.get("s3_max_pool_connections", max(16, parallel_workers * 4)))
s3_pool = max(10, s3_pool)
return AppConfig( return AppConfig(
s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]), s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]),
@@ -95,7 +126,16 @@ def load_config(path: str) -> AppConfig:
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)), ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
thumb_max=int(app.get("thumb_max", 512)), thumb_max=int(app.get("thumb_max", 512)),
exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)), exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)),
job_sleep_seconds=float(app.get("job_sleep_seconds", 1.0)), job_sleep_seconds=float(app.get("job_sleep_seconds", 0.0)),
parallel_workers=parallel_workers,
s3_max_pool_connections=s3_pool,
enable_ocr=as_bool(app.get("enable_ocr", True), True),
enable_exif=as_bool(app.get("enable_exif", True), True),
enable_deermapper_api=as_bool(app.get("enable_deermapper_api", False), False),
deermapper_api_base_url=env_or("DEERMAPPER_API_URL", deermapper.get("base_url", "https://webapp.deermapper.net/api/icu")),
deermapper_api_key=env_or("DEERMAPPER_API_KEY", deermapper.get("apiKey", "")),
deermapper_api_timeout_seconds=int(app.get("deermapper_api_timeout_seconds", 20)),
deermapper_api_image_field=app.get("deermapper_api_image_field", "image"),
) )
@@ -256,6 +296,71 @@ def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes:
return out.getvalue() return out.getvalue()
def _multipart_form_data(fields: Dict[str, str], files: list[tuple[str, str, bytes, str]]) -> tuple[str, bytes]:
boundary = f"----meles-{uuidlib.uuid4().hex}"
boundary_bytes = boundary.encode("ascii")
crlf = b"\r\n"
parts: list[bytes] = []
for name, value in fields.items():
parts.extend([
b"--" + boundary_bytes,
f'Content-Disposition: form-data; name="{name}"'.encode("utf-8"),
b"",
str(value).encode("utf-8"),
])
for field_name, filename, content, content_type in files:
parts.extend([
b"--" + boundary_bytes,
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"'.encode("utf-8"),
f"Content-Type: {content_type}".encode("utf-8"),
b"",
content,
])
parts.append(b"--" + boundary_bytes + b"--")
body = crlf.join(parts) + crlf
return f"multipart/form-data; boundary={boundary}", body
def push_to_deermapper_api(cfg: AppConfig, image_uuid: str, imei: str, metadata: dict, jpg_bytes: bytes) -> tuple[bool, str]:
fields = {
"apiKey": cfg.deermapper_api_key,
"imei": imei,
"metadata": json.dumps(metadata, ensure_ascii=False),
}
content_type, body = _multipart_form_data(
fields,
[(cfg.deermapper_api_image_field, f"{image_uuid}.jpg", jpg_bytes, "image/jpeg")],
)
req = urlrequest.Request(
cfg.deermapper_api_base_url,
data=body,
method="POST",
headers={
"Content-Type": content_type,
"Accept": "application/json",
},
)
try:
with urlrequest.urlopen(req, timeout=cfg.deermapper_api_timeout_seconds) as resp:
payload = resp.read().decode("utf-8", errors="replace")
try:
data = json.loads(payload) if payload else {}
except Exception:
return False, f"API response not JSON: {payload[:300]}"
status_val = str(data.get("status", "")).strip()
msg = str(data.get("msg", "")).strip()
if status_val == "1":
return True, msg or "ok"
return False, msg or f"status={status_val or 'unknown'}"
except urlerror.HTTPError as e:
body_txt = e.read().decode("utf-8", errors="replace")
return False, f"HTTP {e.code}: {body_txt[:300]}"
except Exception as e:
return False, str(e)
# ----------------------- # -----------------------
# DB ops # DB ops
# ----------------------- # -----------------------
@@ -272,10 +377,22 @@ def db_init(cur):
entrance_cat_key text, entrance_cat_key text,
processed_jpg_key text, processed_jpg_key text,
thumbnail_key text, thumbnail_key text,
needs_ocr_backfill boolean NOT NULL DEFAULT false,
needs_exif_backfill boolean NOT NULL DEFAULT false,
created_ts timestamptz NOT NULL DEFAULT now(), created_ts timestamptz NOT NULL DEFAULT now(),
updated_ts timestamptz NOT NULL DEFAULT now() updated_ts timestamptz NOT NULL DEFAULT now()
); );
""") """)
cur.execute("""
ALTER TABLE remote_cam.import_job
ADD COLUMN IF NOT EXISTS needs_ocr_backfill boolean NOT NULL DEFAULT false;
""")
cur.execute("""
ALTER TABLE remote_cam.import_job
ADD COLUMN IF NOT EXISTS needs_exif_backfill boolean NOT NULL DEFAULT false;
""")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_ocr_idx ON remote_cam.import_job(needs_ocr_backfill);")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_exif_idx ON remote_cam.import_job(needs_exif_backfill);")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);") cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);")
cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);") cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);")
@@ -298,6 +415,14 @@ def db_init(cur):
END IF; END IF;
END$$; END$$;
""") """)
cur.execute("""
CREATE TABLE IF NOT EXISTS remote_cam.api_response (
push_ts timestamp without time zone,
message character(512),
image_uuid uuid,
success boolean
);
""")
def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]): def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]):
@@ -324,9 +449,20 @@ def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, pr
""", (status, status, err, processed_key, thumb_key, uuid)) """, (status, status, err, processed_key, thumb_key, uuid))
def job_set_backfill(cur, uuid: str, *, needs_ocr_backfill: Optional[bool] = None, needs_exif_backfill: Optional[bool] = None):
if needs_ocr_backfill is None and needs_exif_backfill is None:
return
cur.execute("""
UPDATE remote_cam.import_job
SET needs_ocr_backfill = COALESCE(%s, needs_ocr_backfill),
needs_exif_backfill = COALESCE(%s, needs_exif_backfill)
WHERE image_uuid = %s;
""", (needs_ocr_backfill, needs_exif_backfill, uuid))
def job_get(cur, uuid: str): def job_get(cur, uuid: str):
cur.execute(""" cur.execute("""
SELECT status, processed_jpg_key, thumbnail_key, has_categories SELECT status, processed_jpg_key, thumbnail_key, has_categories, needs_ocr_backfill, needs_exif_backfill
FROM remote_cam.import_job FROM remote_cam.import_job
WHERE image_uuid = %s; WHERE image_uuid = %s;
""", (uuid,)) """, (uuid,))
@@ -338,6 +474,8 @@ def job_get(cur, uuid: str):
"processed_jpg_key": row[1], "processed_jpg_key": row[1],
"thumbnail_key": row[2], "thumbnail_key": row[2],
"has_categories": row[3], "has_categories": row[3],
"needs_ocr_backfill": row[4],
"needs_exif_backfill": row[5],
} }
@@ -345,10 +483,14 @@ STATUS_ORDER = [
"NEW", "NEW",
"META_OK", "META_OK",
"OCR_OK", "OCR_OK",
"OCR_SKIPPED",
"EXIF_OK", "EXIF_OK",
"EXIF_SKIPPED",
"MOVED", "MOVED",
"THUMB_OK", "THUMB_OK",
"CATEGORIES_OK", "CATEGORIES_OK",
"API_FAIL",
"API_OK",
"CLEANED", "CLEANED",
] ]
@@ -359,6 +501,16 @@ def status_rank(status: Optional[str]) -> int:
return -1 return -1
def is_terminal_success(status: Optional[str], cfg: AppConfig) -> bool:
if status is None:
return False
if status == "CLEANED":
return True
if cfg.enable_deermapper_api:
return status_rank(status) >= status_rank("API_OK")
return status_rank(status) >= status_rank("THUMB_OK")
def insert_resource(cur, uuid: str, typ: str): def insert_resource(cur, uuid: str, typ: str):
cur.execute(""" cur.execute("""
INSERT INTO remote_cam.resource(resource_uuid, type, import_ts) INSERT INTO remote_cam.resource(resource_uuid, type, import_ts)
@@ -484,6 +636,14 @@ def insert_categories(cur, uuid: str, cat_json: dict):
""", (uuid, c.get("category"), c.get("score"), version)) """, (uuid, c.get("category"), c.get("score"), version))
def insert_api_response(cur, uuid: str, success: bool, message: str):
msg = (message or "")[:512]
cur.execute("""
INSERT INTO remote_cam.api_response(push_ts, message, image_uuid, success)
VALUES (now(), %s, %s::uuid, %s);
""", (msg, uuid, success))
# ----------------------- # -----------------------
# S3 ops wrapper # S3 ops wrapper
# ----------------------- # -----------------------
@@ -495,6 +655,7 @@ class S3:
endpoint_url=cfg.s3_endpoint, endpoint_url=cfg.s3_endpoint,
aws_access_key_id=cfg.s3_access_key, aws_access_key_id=cfg.s3_access_key,
aws_secret_access_key=cfg.s3_secret_key, aws_secret_access_key=cfg.s3_secret_key,
config=BotoClientConfig(max_pool_connections=cfg.s3_max_pool_connections),
) )
def list_entrance_objects(self): def list_entrance_objects(self):
@@ -513,13 +674,21 @@ class S3:
def head(self, key: str): def head(self, key: str):
return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key) return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key)
def delete_keys(self, keys): def delete_keys(self, keys) -> int:
if not keys: if not keys:
return return 0
self.client.delete_objects( resp = self.client.delete_objects(
Bucket=self.cfg.s3_bucket, Bucket=self.cfg.s3_bucket,
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True}, Delete={"Objects": [{"Key": k} for k in keys], "Quiet": False},
) )
errors = resp.get("Errors") or []
if errors:
details = "; ".join(
f"{e.get('Key')}:{e.get('Code')}:{e.get('Message')}" for e in errors[:3]
)
raise RuntimeError(f"S3 delete failed for {len(errors)} object(s): {details}")
deleted = resp.get("Deleted") or []
return len(deleted)
# ----------------------- # -----------------------
@@ -590,27 +759,42 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone # 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
jpg_bytes = s3.get_bytes(s.jpg_key) jpg_bytes = s3.get_bytes(s.jpg_key)
ocr_ts = None ocr_ts = get_ocr_ts(cur, s.uuid)
if status_rank(status_now) < status_rank("OCR_OK"): ocr_needs_run = cfg.enable_ocr and (
status_rank(status_now) < status_rank("OCR_OK") or status_now == "OCR_SKIPPED"
)
if ocr_needs_run:
try: try:
customer_tzinfo, _ = parse_customer_tz(meta) customer_tzinfo, _ = parse_customer_tz(meta)
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo) ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
update_ocr_ts(cur, s.uuid, ocr_ts) update_ocr_ts(cur, s.uuid, ocr_ts)
insert_resource(cur, s.uuid, "image") insert_resource(cur, s.uuid, "image")
job_set_backfill(cur, s.uuid, needs_ocr_backfill=False)
job_set_status(cur, s.uuid, "OCR_OK") job_set_status(cur, s.uuid, "OCR_OK")
status_now = "OCR_OK" status_now = "OCR_OK"
except Exception as e: except Exception as e:
job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e)) job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e))
status_now = "OCR_FAIL" status_now = "OCR_FAIL"
elif status_rank(status_now) >= status_rank("OCR_OK"): elif not cfg.enable_ocr:
ocr_ts = get_ocr_ts(cur, s.uuid) job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
if status_rank(status_now) < status_rank("OCR_SKIPPED"):
job_set_status(cur, s.uuid, "OCR_SKIPPED")
status_now = "OCR_SKIPPED"
# 3) EXIF write: # 3) EXIF write:
# DateTimeOriginal = ocr_ts (local, keep offset tags) # DateTimeOriginal = ocr_ts (local, keep offset tags)
# DateTimeDigitized = servertime (UTC) # DateTimeDigitized = servertime (UTC)
jpg_exif = jpg_bytes jpg_exif = jpg_bytes
if status_rank(status_now) < status_rank("MOVED"): if not cfg.enable_exif:
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
if status_rank(status_now) < status_rank("EXIF_SKIPPED"):
job_set_status(cur, s.uuid, "EXIF_SKIPPED")
status_now = "EXIF_SKIPPED"
elif status_rank(status_now) < status_rank("MOVED"):
if ocr_ts is None: if ocr_ts is None:
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing") job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing")
status_now = "EXIF_SKIPPED" status_now = "EXIF_SKIPPED"
else: else:
@@ -622,9 +806,11 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
servertime, servertime,
timeout_seconds=cfg.exiftool_timeout_seconds, timeout_seconds=cfg.exiftool_timeout_seconds,
) )
job_set_backfill(cur, s.uuid, needs_exif_backfill=False)
job_set_status(cur, s.uuid, "EXIF_OK") job_set_status(cur, s.uuid, "EXIF_OK")
status_now = "EXIF_OK" status_now = "EXIF_OK"
except Exception as e: except Exception as e:
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e)) job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e))
status_now = "EXIF_FAIL" status_now = "EXIF_FAIL"
@@ -655,8 +841,32 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
job_set_status(cur, s.uuid, "CATEGORIES_OK") job_set_status(cur, s.uuid, "CATEGORIES_OK")
status_now = "CATEGORIES_OK" status_now = "CATEGORIES_OK"
# 7) Cleanup entrance only after verify # 7) Optional DeerMapper API push (image + metadata)
if status_rank(status_now) >= status_rank("THUMB_OK"): if cfg.enable_deermapper_api and status_rank(status_now) < status_rank("API_OK"):
api_key = (cfg.deermapper_api_key or "").strip()
api_url = (cfg.deermapper_api_base_url or "").strip()
if not api_key or not api_url:
err_msg = "DeerMapper API config missing"
job_set_status(cur, s.uuid, "API_FAIL", err=err_msg)
insert_api_response(cur, s.uuid, False, err_msg)
status_now = "API_FAIL"
else:
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
jpg_exif = s3.get_bytes(processed_key)
imei = str(meta.get("imei") or "").strip()
ok, msg = push_to_deermapper_api(cfg, s.uuid, imei, meta, jpg_exif)
if ok:
insert_api_response(cur, s.uuid, True, msg)
job_set_status(cur, s.uuid, "API_OK")
status_now = "API_OK"
else:
insert_api_response(cur, s.uuid, False, msg)
job_set_status(cur, s.uuid, "API_FAIL", err=msg)
status_now = "API_FAIL"
# 8) Cleanup entrance only after verify (+ API when enabled)
api_ready_for_cleanup = (not cfg.enable_deermapper_api) or status_rank(status_now) >= status_rank("API_OK")
if status_rank(status_now) >= status_rank("THUMB_OK") and api_ready_for_cleanup:
try: try:
s3.head(processed_key) s3.head(processed_key)
s3.head(thumb_key) s3.head(thumb_key)
@@ -670,26 +880,97 @@ def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e)) job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e))
def is_completed_job_materialized(cur, s3: S3, cfg: AppConfig, uuid: str) -> bool:
job = job_get(cur, uuid)
if not job:
return False
if status_rank(job.get("status")) < status_rank("THUMB_OK"):
return False
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{uuid}.jpg"
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{uuid}.jpg"
try:
s3.head(processed_key)
s3.head(thumb_key)
except Exception:
return False
return True
def cleanup_entrance_set(s3: S3, s: EntranceSet) -> int:
to_delete = [s.jpg_key, s.meta_key]
if s.cat_key:
to_delete.append(s.cat_key)
return s3.delete_keys(to_delete)
def _process_ready_batch(cfg: AppConfig, batch: list[EntranceSet]):
s3c = S3(cfg)
with psycopg.connect(cfg.pg_dsn) as db:
for s in batch:
start = time.perf_counter()
result = "failure"
final_status = "UNKNOWN"
err_text = None
cleanup_deleted = None
try:
skip_duplicate = False
with db.transaction():
with db.cursor() as cur:
if is_completed_job_materialized(cur, s3c, cfg, s.uuid):
skip_duplicate = True
else:
process_one(cur, s3c, cfg, s)
final_status = (job_get(cur, s.uuid) or {}).get("status", "UNKNOWN")
result = "success" if is_terminal_success(final_status, cfg) else "failure"
if skip_duplicate:
cleanup_deleted = cleanup_entrance_set(s3c, s)
final_status = "DUPLICATE_CLEANED"
result = "success"
except Exception as e:
with db.transaction():
with db.cursor() as cur:
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
job_set_status(cur, s.uuid, "ERROR", err=str(e))
final_status = "ERROR"
err_text = str(e)
elapsed_ms = int((time.perf_counter() - start) * 1000)
if err_text:
print(
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
f"process_ms={elapsed_ms} error={err_text}"
)
elif cleanup_deleted is not None:
print(
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
f"process_ms={elapsed_ms} cleanup_deleted={cleanup_deleted}"
)
else:
print(f"[RESULT] uuid={s.uuid} result={result} status={final_status} process_ms={elapsed_ms}")
if cfg.job_sleep_seconds > 0:
time.sleep(cfg.job_sleep_seconds)
def run_once(cfg: AppConfig) -> int: def run_once(cfg: AppConfig) -> int:
s3c = S3(cfg) s3c = S3(cfg)
ready = collect_ready_sets(s3c, cfg) ready = collect_ready_sets(s3c, cfg)
if not ready: if not ready:
return 0 return 0
with psycopg.connect(cfg.pg_dsn) as db: workers = max(1, min(cfg.parallel_workers, len(ready)))
for s in ready: if workers == 1:
try: _process_ready_batch(cfg, ready)
with db.transaction(): return len(ready)
with db.cursor() as cur:
process_one(cur, s3c, cfg, s) batches: list[list[EntranceSet]] = [[] for _ in range(workers)]
except Exception as e: for i, s in enumerate(ready):
with db.transaction(): batches[i % workers].append(s)
with db.cursor() as cur:
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key) with ThreadPoolExecutor(max_workers=workers) as pool:
job_set_status(cur, s.uuid, "ERROR", err=str(e)) futures = [pool.submit(_process_ready_batch, cfg, batch) for batch in batches if batch]
print(f"[ERROR] {s.uuid}: {e}") for future in as_completed(futures):
if cfg.job_sleep_seconds > 0: future.result()
time.sleep(cfg.job_sleep_seconds)
return len(ready) return len(ready)

View File

@@ -1,22 +0,0 @@
FROM rocker/shiny:latest
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq-dev \
libcurl4-openssl-dev \
libssl-dev \
libxml2-dev \
curl \
unzip \
&& rm -rf /var/lib/apt/lists/*
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \
&& unzip awscliv2.zip \
&& ./aws/install \
&& rm -rf awscliv2.zip aws
RUN R -e "install.packages(c('DBI','RPostgres','dplyr','DT','glue','shiny'), repos='https://cloud.r-project.org')"
WORKDIR /srv/shiny-server
COPY app.R /srv/shiny-server/app.R
EXPOSE 3838

View File

@@ -1,517 +0,0 @@
library(shiny)
library(DBI)
library(RPostgres)
library(dplyr)
library(DT)
library(glue)
pg_dsn <- Sys.getenv("PG_DSN", "")
s3_bucket <- Sys.getenv("S3_BUCKET", "")
s3_endpoint <- Sys.getenv("S3_ENDPOINT", Sys.getenv("AWS_S3_ENDPOINT", Sys.getenv("AWS_S3_ENDPOINT_URL", "")))
aws_access_key <- Sys.getenv("S3_ACCESS_KEY", Sys.getenv("AWS_ACCESS_KEY_ID", ""))
aws_secret_key <- Sys.getenv("S3_SECRET_KEY", Sys.getenv("AWS_SECRET_ACCESS_KEY", ""))
aws_cli <- Sys.getenv("AWS_CLI", "aws")
thumb_prefix <- Sys.getenv("THUMB_PREFIX", "icu/thumbnails/")
processed_prefix <- Sys.getenv("PROCESSED_PREFIX", "icu/processed/")
entrance_prefix <- Sys.getenv("ENTRANCE_PREFIX", "icu/entrance/")
processed_prefix <- Sys.getenv("PROCESSED_PREFIX", "icu/processed/")
thumb_prefix <- Sys.getenv("THUMB_PREFIX", "icu/thumbnails/")
inventory_table <- Sys.getenv("INVENTORY_TABLE", "")
deployment_table <- Sys.getenv("DEPLOYMENT_TABLE", "")
if (pg_dsn == "") {
stop("PG_DSN env var is required")
}
if (s3_bucket == "") {
stop("S3_BUCKET env var is required")
}
if (s3_endpoint == "") {
stop("S3_ENDPOINT/AWS_S3_ENDPOINT env var is required")
}
if (aws_access_key == "" || aws_secret_key == "") {
stop("S3_ACCESS_KEY/S3_SECRET_KEY or AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY required")
}
Sys.setenv(
AWS_ACCESS_KEY_ID = aws_access_key,
AWS_SECRET_ACCESS_KEY = aws_secret_key,
AWS_EC2_METADATA_DISABLED = "true"
)
parse_pg_dsn <- function(dsn) {
if (grepl("^postgresql://", dsn)) {
m <- regexec("^postgresql://([^:]+):([^@]+)@([^:/]+):?(\\d+)?/(.+)$", dsn)
r <- regmatches(dsn, m)[[1]]
if (length(r) == 0) stop("Invalid PG_DSN URL format")
return(list(
user = r[2],
password = r[3],
host = r[4],
port = ifelse(r[5] == "", 5432, as.integer(r[5])),
dbname = r[6]
))
}
if (grepl("host=", dsn)) {
parts <- strsplit(dsn, "\\s+")[[1]]
kv <- lapply(parts, function(p) strsplit(p, "=", fixed = TRUE)[[1]])
vals <- setNames(lapply(kv, `[`, 2), lapply(kv, `[`, 1))
return(list(
user = vals$user,
password = vals$password,
host = vals$host,
port = as.integer(vals$port),
dbname = vals$dbname
))
}
return(list(dbname = dsn))
}
pg <- parse_pg_dsn(pg_dsn)
pg_con <- do.call(dbConnect, c(list(RPostgres::Postgres()), pg))
onStop(function() dbDisconnect(pg_con))
table_exists <- function(con, table_name) {
res <- dbGetQuery(con, "SELECT to_regclass($1) AS t", params = list(table_name))
!is.na(res$t[1])
}
fetch_cameras <- function() {
if (inventory_table != "" && table_exists(pg_con, inventory_table)) {
q <- glue("SELECT DISTINCT cam_id FROM {inventory_table} ORDER BY 1")
return(dbGetQuery(pg_con, q)$cam_id)
}
character(0)
}
fetch_projects <- function() {
if (deployment_table != "" && table_exists(pg_con, deployment_table)) {
q <- glue("SELECT DISTINCT project FROM {deployment_table} ORDER BY 1")
return(dbGetQuery(pg_con, q)$project)
}
character(0)
}
build_query <- function(from_date, to_date, camera_id, project_name, limit_rows, offset_rows, use_date_filter) {
base <- "
SELECT
m.metadata_uuid,
m.import_ts,
m.ocr_ts,
m.\"CustomerTimezone\",
m.\"SignalStrength\",
m.\"Temperature\",
m.imei,
m.servertime,
ci.cam_id,
d.project,
c.category,
c.score
FROM remote_cam.metadata m
LEFT JOIN cam_inventory ci ON ci.imei = m.imei
LEFT JOIN LATERAL (
SELECT d.project
FROM cam_deployments d
WHERE d.camera_id = ci.cam_id
AND m.servertime >= d.deployment_start
AND (d.deployment_end IS NULL OR m.servertime < d.deployment_end)
ORDER BY d.deployment_start DESC NULLS LAST
LIMIT 1
) d ON TRUE
LEFT JOIN LATERAL (
SELECT category, score
FROM remote_cam.category c
WHERE c.image_uuid = m.metadata_uuid
ORDER BY score DESC NULLS LAST
LIMIT 1
) c ON TRUE
"
params <- list()
if (camera_id != "" && inventory_table != "" && table_exists(pg_con, inventory_table)) {
base <- paste0(base, " WHERE ci.cam_id = $", length(params) + 1)
params <- append(params, list(camera_id))
}
if (project_name != "" && deployment_table != "" && table_exists(pg_con, deployment_table)) {
prefix <- if (length(params) == 0) " WHERE " else " AND "
base <- paste0(base, prefix, " m.imei IN (SELECT ci2.imei FROM cam_inventory ci2 JOIN ",
deployment_table, " d ON d.camera_id = ci2.cam_id WHERE d.project = $",
length(params) + 1, ")")
params <- append(params, list(project_name))
}
if (camera_id == "" && project_name == "" && !use_date_filter) {
base <- paste0(base, " ORDER BY m.import_ts DESC LIMIT $", length(params) + 1, " OFFSET $", length(params) + 2)
params <- append(params, list(as.integer(limit_rows), as.integer(offset_rows)))
return(list(sql = base, params = params))
}
if (use_date_filter) {
prefix <- if (length(params) == 0) " WHERE " else " AND "
base <- paste0(base, prefix, " m.import_ts >= $", length(params) + 1, " AND m.import_ts < $", length(params) + 2)
params <- append(params, list(from_date, to_date))
}
base <- paste0(base, " ORDER BY m.import_ts DESC LIMIT $", length(params) + 1, " OFFSET $", length(params) + 2)
params <- append(params, list(as.integer(limit_rows), as.integer(offset_rows)))
list(sql = base, params = params)
}
normalize_endpoint <- function(url) {
if (url == "") return("")
u <- url
if (!grepl("^https?://", u)) {
u <- paste0("https://", u)
}
sub("/+$", "", u)
}
s3_endpoint <- normalize_endpoint(s3_endpoint)
if (aws_access_key == "" || aws_secret_key == "" || s3_endpoint == "") {
stop("S3_ACCESS_KEY/S3_SECRET_KEY or AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY and S3_ENDPOINT are required")
}
normalize_endpoint <- function(url) {
if (url == "") return("")
u <- url
if (!grepl("^https?://", u)) {
u <- paste0("https://", u)
}
sub("/+$", "", u)
}
s3_endpoint <- normalize_endpoint(s3_endpoint)
cache_dir <- Sys.getenv("THUMB_CACHE_DIR", "/tmp/thumb_cache")
dir.create(cache_dir, recursive = TRUE, showWarnings = FALSE)
addResourcePath("thumbs", cache_dir)
presigned_url <- function(key, expires = 600) {
if (is.null(key) || is.na(key) || key == "") return(NULL)
key <- sub(paste0("^", s3_bucket, "/"), "", key)
cmd <- sprintf(
"\"%s\" --endpoint-url %s s3 presign s3://%s/%s --expires-in %d",
aws_cli, s3_endpoint, s3_bucket, key, as.integer(expires)
)
tryCatch({
out <- system(cmd, intern = TRUE)
if (length(out) == 0) return(NULL)
out[1]
}, error = function(e) {
message(sprintf("[DEBUG] presign error: %s", e$message))
NULL
})
}
cache_thumbnail <- function(uuid) {
if (is.null(uuid) || is.na(uuid)) return(NULL)
local_path <- file.path(cache_dir, paste0(uuid, ".jpg"))
if (file.exists(local_path)) return(local_path)
key <- paste0(thumb_prefix, uuid, ".jpg")
cmd <- sprintf(
"\"%s\" --endpoint-url %s s3 cp s3://%s/%s \"%s\" --only-show-errors",
aws_cli, s3_endpoint, s3_bucket, key, local_path
)
tryCatch({
system(cmd, ignore.stdout = TRUE, ignore.stderr = TRUE)
if (file.exists(local_path)) return(local_path)
NULL
}, error = function(e) {
message(sprintf("[DEBUG] cache thumb error: %s", e$message))
NULL
})
}
ui <- fluidPage(
titlePanel("MelesICU - Image Browser"),
tags$script(HTML("
function melesThumbClick(el){
var uuid = el.getAttribute('data-uuid');
if(uuid){
Shiny.setInputValue('thumb_click', uuid, {priority: 'event'});
}
}
var melesScrollLock = false;
window.addEventListener('scroll', function() {
if (melesScrollLock) return;
var nearBottom = (window.innerHeight + window.scrollY) >= (document.body.offsetHeight - 200);
var canLoad = Shiny && Shiny.shinyapp && Shiny.shinyapp.$outputValues && Shiny.shinyapp.$outputValues.has_more_flag;
var isLoading = Shiny && Shiny.shinyapp && Shiny.shinyapp.$outputValues && Shiny.shinyapp.$outputValues.loading_flag;
if (nearBottom && canLoad && !isLoading) {
melesScrollLock = true;
Shiny.setInputValue('scroll_load', Date.now(), {priority: 'event'});
setTimeout(function(){ melesScrollLock = false; }, 1000);
}
});
")),
sidebarLayout(
sidebarPanel(
dateRangeInput("date_range", "Datum", start = Sys.Date() - 7, end = Sys.Date()),
checkboxInput("use_date_filter", "Datumsfilter aktiv", value = TRUE),
selectInput("camera_id", "Kamera (cam_id)", choices = c("ALL", fetch_cameras()), selected = "ALL"),
selectInput("project_name", "Projekt", choices = c("ALL", fetch_projects()), selected = "ALL"),
numericInput("limit_rows", "Eintraege pro Seite", value = 20, min = 10, max = 2000, step = 10),
numericInput("page", "Seite", value = 1, min = 1, step = 1),
fluidRow(
column(6, actionButton("prev_page", "Zurueck")),
column(6, actionButton("next_page", "Vor"))
),
actionButton("apply_filters", "Filter anwenden")
),
mainPanel(
tags$style(HTML("
.spinner {
margin: 20px 0;
width: 32px;
height: 32px;
border: 4px solid #ddd;
border-top-color: #333;
border-radius: 50%;
animation: spin 1s linear infinite;
}
@keyframes spin { to { transform: rotate(360deg); } }
")),
conditionalPanel("output.loading_flag", tags$div(class = "spinner")),
uiOutput("debug_info"),
uiOutput("thumb_grid")
)
)
)
server <- function(input, output, session) {
rv <- reactiveValues(data = NULL, selected = NULL, loading = FALSE, has_more = TRUE, debug = NULL)
cache_env <- new.env(parent = emptyenv())
cache_keys <- character(0)
url_cache_env <- new.env(parent = emptyenv())
url_cache_keys <- character(0)
load_data <- function() {
from_date <- as.POSIXct(input$date_range[1], tz = "UTC")
to_date <- as.POSIXct(input$date_range[2] + 1, tz = "UTC")
cam <- ifelse(input$camera_id == "ALL", "", input$camera_id)
proj <- ifelse(input$project_name == "ALL", "", input$project_name)
page <- max(1, as.integer(input$page))
limit <- as.integer(input$limit_rows)
offset <- (page - 1) * limit
key <- paste(from_date, to_date, cam, proj, input$use_date_filter, limit, offset, sep = "|")
if (exists(key, envir = cache_env, inherits = FALSE)) {
rv$data <- get(key, envir = cache_env, inherits = FALSE)
return(invisible(NULL))
}
q <- build_query(from_date, to_date, cam, proj, limit, offset, input$use_date_filter)
rv$loading <- TRUE
on.exit({ rv$loading <- FALSE }, add = TRUE)
data <- dbGetQuery(pg_con, q$sql, params = q$params)
assign(key, data, envir = cache_env)
cache_keys <<- c(cache_keys, key)
if (length(cache_keys) > 20) {
drop <- head(cache_keys, length(cache_keys) - 20)
for (k in drop) rm(list = k, envir = cache_env)
cache_keys <<- tail(cache_keys, 20)
}
rv$data <- data
rv$has_more <- nrow(data) >= limit
# Batch prefetch first 10 thumbnails in parallel
if (!is.null(data) && nrow(data) > 0) {
uuids <- head(data$metadata_uuid, 10)
tryCatch({
parallel::mclapply(uuids, cache_thumbnail, mc.cores = 4)
}, error = function(e) {
message(sprintf("[DEBUG] prefetch error: %s", e$message))
})
}
if (!is.null(data) && nrow(data) > 0) {
sample_uuid <- data$metadata_uuid[1]
thumb_key <- paste0(thumb_prefix, sample_uuid, ".jpg")
url <- presigned_url(thumb_key)
rv$debug <- list(
count = nrow(data),
sample_uuid = sample_uuid,
thumb_key = thumb_key,
url = url
)
message(sprintf("[DEBUG] count=%d sample_uuid=%s thumb_key=%s url=%s",
nrow(data), sample_uuid, thumb_key, ifelse(is.null(url), "NULL", url)))
} else {
rv$debug <- list(count = 0)
message("[DEBUG] count=0")
}
}
observeEvent(input$apply_filters, {
if (rv$loading) {
showNotification("Noch am Laden bitte warten.", type = "message")
return(NULL)
}
load_data()
}, ignoreInit = TRUE)
observeEvent(input$prev_page, {
if (rv$loading) return(NULL)
new_page <- max(1, as.integer(input$page) - 1)
updateNumericInput(session, "page", value = new_page)
load_data()
}, ignoreInit = TRUE)
observeEvent(input$next_page, {
if (rv$loading) return(NULL)
new_page <- as.integer(input$page) + 1
updateNumericInput(session, "page", value = new_page)
load_data()
}, ignoreInit = TRUE)
observeEvent(input$scroll_load, {
if (rv$loading || !rv$has_more) return(NULL)
new_page <- as.integer(input$page) + 1
updateNumericInput(session, "page", value = new_page)
load_data()
}, ignoreInit = TRUE)
observeEvent(TRUE, {
if (rv$loading) return(NULL)
load_data()
}, once = TRUE)
output$loading_flag <- reactive({ rv$loading })
outputOptions(output, "loading_flag", suspendWhenHidden = FALSE)
output$has_more_flag <- reactive({ rv$has_more })
outputOptions(output, "has_more_flag", suspendWhenHidden = FALSE)
format_cet <- function(ts) {
if (is.null(ts) || is.na(ts)) return("-")
format(as.POSIXct(ts, tz = "CET"), "%Y-%m-%d %H:%M:%S %Z")
}
format_ts <- function(ts) {
if (is.null(ts) || is.na(ts)) return("-")
format(as.POSIXct(ts, tz = "CET"), "%Y-%m-%d %H:%M:%S %Z")
}
format_diff <- function(server_ts, camera_ts) {
if (is.null(server_ts) || is.na(server_ts) || is.null(camera_ts) || is.na(camera_ts)) return("-")
secs <- as.numeric(difftime(server_ts, camera_ts, units = "secs"))
sign <- ifelse(secs >= 0, "+", "-")
secs <- abs(secs)
hh <- floor(secs / 3600)
mm <- floor((secs %% 3600) / 60)
ss <- floor(secs %% 60)
sprintf("%s%02d:%02d:%02d", sign, hh, mm, ss)
}
output$thumb_grid <- renderUI({
dat <- rv$data
if (is.null(dat) || nrow(dat) == 0) {
return(tags$div("Keine Ergebnisse."))
}
cards <- lapply(seq_len(nrow(dat)), function(i) {
row <- dat[i, ]
local_path <- cache_thumbnail(row$metadata_uuid)
if (is.null(local_path)) return(NULL)
url <- paste0("thumbs/", basename(local_path))
tags$div(
style = "display:inline-block; width: 220px; margin: 6px; vertical-align: top;",
tags$div(class = "ph", style = "width: 220px; height: 220px; background:#eee; display:block;"),
tags$img(
src = url,
style = "width: 220px; height: 220px; object-fit: cover; display:block; cursor: pointer;",
loading = "lazy",
onload = "this.parentNode.querySelector('.ph').style.display='none';",
`data-uuid` = row$metadata_uuid,
onclick = "melesThumbClick(this)"
),
tags$div(style = "font-size: 12px; color: #222;",
glue("CamID: {row$cam_id}")),
tags$div(style = "font-size: 12px; color: #444;",
glue("Serverzeit: {format_cet(row$servertime)}")),
tags$div(style = "font-size: 12px; color: #444;",
glue("Kamerazeit: {format_ts(row$ocr_ts)}")),
tags$div(style = "font-size: 12px; color: #444;",
glue("AI classification: {row$category} ({row$score})")),
{
delta <- format_diff(row$servertime, row$ocr_ts)
delta_secs <- if (delta == "-") NA else as.numeric(difftime(row$servertime, row$ocr_ts, units = "secs"))
is_warn <- !is.na(delta_secs) && abs(delta_secs) > 600
style <- if (is_warn) "font-size: 12px; color: #c00; font-weight: bold;" else "font-size: 12px; color: #444;"
tags$div(style = style, glue("Delta: {delta}"))
}
)
})
tags$div(cards)
})
output$debug_info <- renderUI({
if (is.null(rv$debug)) return(NULL)
cnt <- rv$debug$count
if (is.null(cnt)) return(NULL)
if (cnt == 0) {
return(tags$div(style = "font-size:12px; color:#a00; margin-bottom:8px;",
"Debug: 0 Treffer."))
}
tags$div(style = "font-size:12px; color:#555; margin-bottom:8px;",
glue("Debug: Treffer={cnt}, Sample UUID={rv$debug$sample_uuid}"),
tags$br(),
glue("Thumb-Key: {rv$debug$thumb_key}"),
tags$br(),
if (!is.null(rv$debug$url)) tags$a(href = rv$debug$url, "Sample Thumbnail (Link)", target = "_blank") else "Sample URL: NULL")
})
observeEvent(input$thumb_click, {
uuid <- input$thumb_click
dat <- rv$data
if (is.null(dat)) return(NULL)
row <- dat[dat$metadata_uuid == uuid, ]
if (nrow(row) != 1) return(NULL)
full_key <- paste0(processed_prefix, row$metadata_uuid, ".jpg")
presigned_cached <- function(key, expires = 600) {
now <- Sys.time()
if (exists(key, envir = url_cache_env, inherits = FALSE)) {
entry <- get(key, envir = url_cache_env, inherits = FALSE)
if (is.list(entry) && !is.null(entry$expires_at) && now < entry$expires_at) {
return(entry$url)
}
}
u <- presigned_url(key, expires = expires)
if (!is.null(u)) {
assign(key, list(url = u, expires_at = now + expires - 30), envir = url_cache_env)
url_cache_keys <<- c(url_cache_keys, key)
if (length(url_cache_keys) > 200) {
drop <- head(url_cache_keys, length(url_cache_keys) - 200)
for (k in drop) rm(list = k, envir = url_cache_env)
url_cache_keys <<- tail(url_cache_keys, 200)
}
}
u
}
url <- presigned_cached(full_key)
if (is.null(url)) {
showModal(modalDialog(
title = glue("Bild {uuid}"),
"Full image nicht gefunden.",
easyClose = TRUE,
footer = NULL
))
return(NULL)
}
showModal(modalDialog(
title = glue("Bild {uuid}"),
tags$img(src = url, style = "max-width: 100%; height: auto;"),
easyClose = TRUE,
footer = modalButton("Schließen"),
size = "l"
))
})
}
shinyApp(ui, server)