Compare commits
8 Commits
8414e72870
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2e9ee3e014 | |||
| 2294962d57 | |||
| ddfb281be3 | |||
| b35394c569 | |||
| 3873cb7b3f | |||
| 1e4a7b86b6 | |||
| 400f8a53f8 | |||
| d71e1699d6 |
5
.env
Normal file
5
.env
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
PG_DSN=postgresql://postgres:DFGk5s9H21lKao2K@136.243.41.58:7777/meles
|
||||||
|
S3_BUCKET=trapper-meles
|
||||||
|
S3_ACCESS_KEY=UG950FCGWFYRXSXXVUAH
|
||||||
|
S3_SECRET_KEY=rXpspNhboZY6zNZi7djjq2QaXPA4uwsO9jXf4AXk
|
||||||
|
S3_ENDPOINT=https://fsn1.your-objectstorage.com/
|
||||||
29
config.yaml
29
config.yaml
@@ -1,12 +1,16 @@
|
|||||||
# config.yaml (Beispiel)
|
# config.yaml (Beispiel)
|
||||||
s3:
|
s3:
|
||||||
endpoint: "https://<dein-endpoint>"
|
endpoint: "https://fsn1.your-objectstorage.com/"
|
||||||
access_key: "<key>"
|
access_key: "UG950FCGWFYRXSXXVUAH"
|
||||||
secret_key: "<secret>"
|
secret_key: "rXpspNhboZY6zNZi7djjq2QaXPA4uwsO9jXf4AXk"
|
||||||
bucket: "<bucket>"
|
bucket: "trapper-meles"
|
||||||
|
|
||||||
postgres:
|
postgres:
|
||||||
dsn: "postgresql://user:pass@host:5432/dbname"
|
dsn: "postgresql://postgres:DFGk5s9H21lKao2K@136.243.41.58:7777/meles"
|
||||||
|
|
||||||
|
deermapper-api:
|
||||||
|
base_url: "https://webapp.deermapper.net/api/icu"
|
||||||
|
apiKey: "695bc217-3b40-48bb-bb12-17fc5b08b320"
|
||||||
|
|
||||||
app:
|
app:
|
||||||
entrance_prefix: "icu/entrance/"
|
entrance_prefix: "icu/entrance/"
|
||||||
@@ -15,6 +19,17 @@ app:
|
|||||||
min_age_seconds: 90
|
min_age_seconds: 90
|
||||||
poll_seconds: 30
|
poll_seconds: 30
|
||||||
# OCR crop tunables (defaults are usually fine)
|
# OCR crop tunables (defaults are usually fine)
|
||||||
ocr_crop_w_frac: 0.42
|
ocr_crop_w_frac: 0.63
|
||||||
ocr_crop_h_frac: 0.22
|
ocr_crop_h_frac: 0.05
|
||||||
thumb_max: 512
|
thumb_max: 512
|
||||||
|
exiftool_timeout_seconds: 15
|
||||||
|
job_sleep_seconds: 0.0
|
||||||
|
parallel_workers: 6
|
||||||
|
s3_max_pool_connections: 48
|
||||||
|
# Optional processing switches:
|
||||||
|
# - false => image still gets moved + thumbnail, and backfill flag is set in import_job
|
||||||
|
enable_ocr: true
|
||||||
|
enable_exif: true
|
||||||
|
enable_deermapper_api: false
|
||||||
|
deermapper_api_timeout_seconds: 20
|
||||||
|
deermapper_api_image_field: "image"
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
services:
|
services:
|
||||||
melesICUmover:
|
melesicumover:
|
||||||
build: .
|
build: .
|
||||||
container_name: melesICUmover
|
container_name: melesicumover
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
environment:
|
environment:
|
||||||
CONFIG_YAML: /app/config.yaml
|
CONFIG_YAML: /app/config.yaml
|
||||||
DEBUGPY: "1"
|
DEBUG: "0"
|
||||||
ports:
|
ports:
|
||||||
- "5678:5678"
|
- "5678:5678"
|
||||||
volumes:
|
volumes:
|
||||||
|
|||||||
14
launch.json
14
launch.json
@@ -1,14 +0,0 @@
|
|||||||
{
|
|
||||||
"version": "0.2.0",
|
|
||||||
"configurations": [
|
|
||||||
{
|
|
||||||
"name": "Attach to Docker (melesICUmover)",
|
|
||||||
"type": "python",
|
|
||||||
"request": "attach",
|
|
||||||
"connect": {
|
|
||||||
"host": "localhost",
|
|
||||||
"port": 5678
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
569
main.py
569
main.py
@@ -6,12 +6,17 @@ import json
|
|||||||
import time
|
import time
|
||||||
import tempfile
|
import tempfile
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import uuid as uuidlib
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Optional, Dict, Tuple
|
from typing import Optional, Dict, Tuple
|
||||||
|
from urllib import error as urlerror
|
||||||
|
from urllib import request as urlrequest
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
import psycopg
|
import psycopg
|
||||||
|
from botocore.config import Config as BotoClientConfig
|
||||||
from dateutil import tz
|
from dateutil import tz
|
||||||
from PIL import Image, ImageOps, ImageEnhance
|
from PIL import Image, ImageOps, ImageEnhance
|
||||||
import pytesseract
|
import pytesseract
|
||||||
@@ -38,6 +43,9 @@ CAT_RE = re.compile(r"^([0-9a-fA-F-]{36})_categories\.json$")
|
|||||||
OCR_DT_RE = re.compile(
|
OCR_DT_RE = re.compile(
|
||||||
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s+(?P<d>\d{2})-(?P<mo>\d{2})-(?P<y>\d{4})"
|
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s+(?P<d>\d{2})-(?P<mo>\d{2})-(?P<y>\d{4})"
|
||||||
)
|
)
|
||||||
|
OCR_DT_RE_FLEX = re.compile(
|
||||||
|
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s*(?P<d>\d{2})[-.](?P<mo>\d{2})[-.](?P<y>\d{4})"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -61,6 +69,17 @@ class AppConfig:
|
|||||||
ocr_crop_h_frac: float = 0.22 # bottom ~22% of height
|
ocr_crop_h_frac: float = 0.22 # bottom ~22% of height
|
||||||
|
|
||||||
thumb_max: int = 512
|
thumb_max: int = 512
|
||||||
|
exiftool_timeout_seconds: int = 15
|
||||||
|
job_sleep_seconds: float = 0.0
|
||||||
|
parallel_workers: int = 4
|
||||||
|
s3_max_pool_connections: int = 32
|
||||||
|
enable_ocr: bool = True
|
||||||
|
enable_exif: bool = True
|
||||||
|
enable_deermapper_api: bool = False
|
||||||
|
deermapper_api_base_url: str = "https://webapp.deermapper.net/api/icu"
|
||||||
|
deermapper_api_key: str = ""
|
||||||
|
deermapper_api_timeout_seconds: int = 20
|
||||||
|
deermapper_api_image_field: str = "image"
|
||||||
|
|
||||||
|
|
||||||
def load_config(path: str) -> AppConfig:
|
def load_config(path: str) -> AppConfig:
|
||||||
@@ -71,9 +90,26 @@ def load_config(path: str) -> AppConfig:
|
|||||||
def env_or(key: str, default=None):
|
def env_or(key: str, default=None):
|
||||||
return os.environ.get(key, default)
|
return os.environ.get(key, default)
|
||||||
|
|
||||||
|
def as_bool(value, default: bool = True) -> bool:
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
if isinstance(value, bool):
|
||||||
|
return value
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
return bool(value)
|
||||||
|
if isinstance(value, str):
|
||||||
|
return value.strip().lower() in ("1", "true", "yes", "on")
|
||||||
|
return bool(value)
|
||||||
|
|
||||||
s3 = raw["s3"]
|
s3 = raw["s3"]
|
||||||
pg = raw["postgres"]
|
pg = raw["postgres"]
|
||||||
app = raw.get("app", {})
|
app = raw.get("app", {})
|
||||||
|
deermapper = raw.get("deermapper-api", raw.get("deermapper_api", {}))
|
||||||
|
default_workers = max(1, min(16, (os.cpu_count() or 2) * 2))
|
||||||
|
parallel_workers = int(app.get("parallel_workers", default_workers))
|
||||||
|
parallel_workers = max(1, parallel_workers)
|
||||||
|
s3_pool = int(app.get("s3_max_pool_connections", max(16, parallel_workers * 4)))
|
||||||
|
s3_pool = max(10, s3_pool)
|
||||||
|
|
||||||
return AppConfig(
|
return AppConfig(
|
||||||
s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]),
|
s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]),
|
||||||
@@ -89,6 +125,17 @@ def load_config(path: str) -> AppConfig:
|
|||||||
ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)),
|
ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)),
|
||||||
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
|
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
|
||||||
thumb_max=int(app.get("thumb_max", 512)),
|
thumb_max=int(app.get("thumb_max", 512)),
|
||||||
|
exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)),
|
||||||
|
job_sleep_seconds=float(app.get("job_sleep_seconds", 0.0)),
|
||||||
|
parallel_workers=parallel_workers,
|
||||||
|
s3_max_pool_connections=s3_pool,
|
||||||
|
enable_ocr=as_bool(app.get("enable_ocr", True), True),
|
||||||
|
enable_exif=as_bool(app.get("enable_exif", True), True),
|
||||||
|
enable_deermapper_api=as_bool(app.get("enable_deermapper_api", False), False),
|
||||||
|
deermapper_api_base_url=env_or("DEERMAPPER_API_URL", deermapper.get("base_url", "https://webapp.deermapper.net/api/icu")),
|
||||||
|
deermapper_api_key=env_or("DEERMAPPER_API_KEY", deermapper.get("apiKey", "")),
|
||||||
|
deermapper_api_timeout_seconds=int(app.get("deermapper_api_timeout_seconds", 20)),
|
||||||
|
deermapper_api_image_field=app.get("deermapper_api_image_field", "image"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -156,28 +203,58 @@ def ocr_extract_timestamp(jpg_bytes: bytes, crop_w_frac: float, crop_h_frac: flo
|
|||||||
cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- '
|
cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- '
|
||||||
text = pytesseract.image_to_string(bw, config=cfg).strip()
|
text = pytesseract.image_to_string(bw, config=cfg).strip()
|
||||||
|
|
||||||
m = OCR_DT_RE.search(text)
|
|
||||||
if not m:
|
|
||||||
# retry without thresholding
|
# retry without thresholding
|
||||||
text2 = pytesseract.image_to_string(gray, config=cfg).strip()
|
text2 = pytesseract.image_to_string(gray, config=cfg).strip()
|
||||||
m = OCR_DT_RE.search(text2)
|
|
||||||
if not m:
|
|
||||||
raise ValueError(f"OCR timestamp not found. OCR1='{text}' OCR2='{text2}'")
|
|
||||||
|
|
||||||
dt_local_naive = datetime(
|
dt_local_naive = _parse_ocr_datetime(text, text2)
|
||||||
int(m.group("y")),
|
|
||||||
int(m.group("mo")),
|
|
||||||
int(m.group("d")),
|
|
||||||
int(m.group("h")),
|
|
||||||
int(m.group("m")),
|
|
||||||
int(m.group("s")),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Keep camera-local timezone (no UTC conversion requested)
|
# Keep camera-local timezone (no UTC conversion requested)
|
||||||
return dt_local_naive.replace(tzinfo=customer_tzinfo)
|
return dt_local_naive.replace(tzinfo=customer_tzinfo)
|
||||||
|
|
||||||
|
|
||||||
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime) -> bytes:
|
def _parse_ocr_datetime(*texts: str) -> datetime:
|
||||||
|
def build_dt(h, m, s, d, mo, y):
|
||||||
|
return datetime(int(y), int(mo), int(d), int(h), int(m), int(s))
|
||||||
|
|
||||||
|
def valid_dt(h, m, s, d, mo, y) -> bool:
|
||||||
|
try:
|
||||||
|
build_dt(h, m, s, d, mo, y)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 1) direct regex matches
|
||||||
|
for t in texts:
|
||||||
|
for rx in (OCR_DT_RE, OCR_DT_RE_FLEX):
|
||||||
|
m = rx.search(t)
|
||||||
|
if m and valid_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y")):
|
||||||
|
return build_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y"))
|
||||||
|
|
||||||
|
# 2) find time and date separately (handles missing space between time and date)
|
||||||
|
time_rx = re.compile(r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})")
|
||||||
|
date_rx = re.compile(r"(?P<d>\d{2})[-.](?P<mo>\d{2})[-.](?P<y>\d{4})")
|
||||||
|
for t in texts:
|
||||||
|
tm = time_rx.search(t)
|
||||||
|
dm = date_rx.search(t)
|
||||||
|
if tm and dm and valid_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y")):
|
||||||
|
return build_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y"))
|
||||||
|
|
||||||
|
# 3) digits-only fallback: scan for HHMMSSDDMMYYYY
|
||||||
|
for t in texts:
|
||||||
|
digits = re.sub(r"\D", "", t)
|
||||||
|
for i in range(0, max(0, len(digits) - 13)):
|
||||||
|
chunk = digits[i:i + 14]
|
||||||
|
if len(chunk) < 14:
|
||||||
|
continue
|
||||||
|
h, m, s = chunk[0:2], chunk[2:4], chunk[4:6]
|
||||||
|
d, mo, y = chunk[6:8], chunk[8:10], chunk[10:14]
|
||||||
|
if valid_dt(h, m, s, d, mo, y):
|
||||||
|
return build_dt(h, m, s, d, mo, y)
|
||||||
|
|
||||||
|
raise ValueError(f"OCR timestamp not found. OCR1='{texts[0] if texts else ''}' OCR2='{texts[1] if len(texts) > 1 else ''}'")
|
||||||
|
|
||||||
|
|
||||||
|
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime, *, timeout_seconds: int) -> bytes:
|
||||||
"""
|
"""
|
||||||
DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal
|
DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal
|
||||||
DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00
|
DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00
|
||||||
@@ -204,7 +281,7 @@ def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digi
|
|||||||
f"-OffsetTime={otd}",
|
f"-OffsetTime={otd}",
|
||||||
in_path,
|
in_path,
|
||||||
]
|
]
|
||||||
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_seconds)
|
||||||
|
|
||||||
with open(in_path, "rb") as f:
|
with open(in_path, "rb") as f:
|
||||||
return f.read()
|
return f.read()
|
||||||
@@ -219,6 +296,71 @@ def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes:
|
|||||||
return out.getvalue()
|
return out.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
def _multipart_form_data(fields: Dict[str, str], files: list[tuple[str, str, bytes, str]]) -> tuple[str, bytes]:
|
||||||
|
boundary = f"----meles-{uuidlib.uuid4().hex}"
|
||||||
|
boundary_bytes = boundary.encode("ascii")
|
||||||
|
crlf = b"\r\n"
|
||||||
|
|
||||||
|
parts: list[bytes] = []
|
||||||
|
for name, value in fields.items():
|
||||||
|
parts.extend([
|
||||||
|
b"--" + boundary_bytes,
|
||||||
|
f'Content-Disposition: form-data; name="{name}"'.encode("utf-8"),
|
||||||
|
b"",
|
||||||
|
str(value).encode("utf-8"),
|
||||||
|
])
|
||||||
|
for field_name, filename, content, content_type in files:
|
||||||
|
parts.extend([
|
||||||
|
b"--" + boundary_bytes,
|
||||||
|
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"'.encode("utf-8"),
|
||||||
|
f"Content-Type: {content_type}".encode("utf-8"),
|
||||||
|
b"",
|
||||||
|
content,
|
||||||
|
])
|
||||||
|
parts.append(b"--" + boundary_bytes + b"--")
|
||||||
|
body = crlf.join(parts) + crlf
|
||||||
|
return f"multipart/form-data; boundary={boundary}", body
|
||||||
|
|
||||||
|
|
||||||
|
def push_to_deermapper_api(cfg: AppConfig, image_uuid: str, imei: str, metadata: dict, jpg_bytes: bytes) -> tuple[bool, str]:
|
||||||
|
fields = {
|
||||||
|
"apiKey": cfg.deermapper_api_key,
|
||||||
|
"imei": imei,
|
||||||
|
"metadata": json.dumps(metadata, ensure_ascii=False),
|
||||||
|
}
|
||||||
|
content_type, body = _multipart_form_data(
|
||||||
|
fields,
|
||||||
|
[(cfg.deermapper_api_image_field, f"{image_uuid}.jpg", jpg_bytes, "image/jpeg")],
|
||||||
|
)
|
||||||
|
req = urlrequest.Request(
|
||||||
|
cfg.deermapper_api_base_url,
|
||||||
|
data=body,
|
||||||
|
method="POST",
|
||||||
|
headers={
|
||||||
|
"Content-Type": content_type,
|
||||||
|
"Accept": "application/json",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with urlrequest.urlopen(req, timeout=cfg.deermapper_api_timeout_seconds) as resp:
|
||||||
|
payload = resp.read().decode("utf-8", errors="replace")
|
||||||
|
try:
|
||||||
|
data = json.loads(payload) if payload else {}
|
||||||
|
except Exception:
|
||||||
|
return False, f"API response not JSON: {payload[:300]}"
|
||||||
|
status_val = str(data.get("status", "")).strip()
|
||||||
|
msg = str(data.get("msg", "")).strip()
|
||||||
|
if status_val == "1":
|
||||||
|
return True, msg or "ok"
|
||||||
|
return False, msg or f"status={status_val or 'unknown'}"
|
||||||
|
except urlerror.HTTPError as e:
|
||||||
|
body_txt = e.read().decode("utf-8", errors="replace")
|
||||||
|
return False, f"HTTP {e.code}: {body_txt[:300]}"
|
||||||
|
except Exception as e:
|
||||||
|
return False, str(e)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------
|
# -----------------------
|
||||||
# DB ops
|
# DB ops
|
||||||
# -----------------------
|
# -----------------------
|
||||||
@@ -235,10 +377,22 @@ def db_init(cur):
|
|||||||
entrance_cat_key text,
|
entrance_cat_key text,
|
||||||
processed_jpg_key text,
|
processed_jpg_key text,
|
||||||
thumbnail_key text,
|
thumbnail_key text,
|
||||||
|
needs_ocr_backfill boolean NOT NULL DEFAULT false,
|
||||||
|
needs_exif_backfill boolean NOT NULL DEFAULT false,
|
||||||
created_ts timestamptz NOT NULL DEFAULT now(),
|
created_ts timestamptz NOT NULL DEFAULT now(),
|
||||||
updated_ts timestamptz NOT NULL DEFAULT now()
|
updated_ts timestamptz NOT NULL DEFAULT now()
|
||||||
);
|
);
|
||||||
""")
|
""")
|
||||||
|
cur.execute("""
|
||||||
|
ALTER TABLE remote_cam.import_job
|
||||||
|
ADD COLUMN IF NOT EXISTS needs_ocr_backfill boolean NOT NULL DEFAULT false;
|
||||||
|
""")
|
||||||
|
cur.execute("""
|
||||||
|
ALTER TABLE remote_cam.import_job
|
||||||
|
ADD COLUMN IF NOT EXISTS needs_exif_backfill boolean NOT NULL DEFAULT false;
|
||||||
|
""")
|
||||||
|
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_ocr_idx ON remote_cam.import_job(needs_ocr_backfill);")
|
||||||
|
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_exif_idx ON remote_cam.import_job(needs_exif_backfill);")
|
||||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);")
|
cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);")
|
||||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);")
|
cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);")
|
||||||
|
|
||||||
@@ -261,6 +415,14 @@ def db_init(cur):
|
|||||||
END IF;
|
END IF;
|
||||||
END$$;
|
END$$;
|
||||||
""")
|
""")
|
||||||
|
cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS remote_cam.api_response (
|
||||||
|
push_ts timestamp without time zone,
|
||||||
|
message character(512),
|
||||||
|
image_uuid uuid,
|
||||||
|
success boolean
|
||||||
|
);
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]):
|
def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]):
|
||||||
@@ -280,11 +442,73 @@ def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, pr
|
|||||||
UPDATE remote_cam.import_job
|
UPDATE remote_cam.import_job
|
||||||
SET status = %s,
|
SET status = %s,
|
||||||
attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END,
|
attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END,
|
||||||
last_error = CASE WHEN %s IS NULL THEN last_error ELSE %s END,
|
last_error = COALESCE(%s::text, last_error),
|
||||||
processed_jpg_key = COALESCE(%s, processed_jpg_key),
|
processed_jpg_key = COALESCE(%s, processed_jpg_key),
|
||||||
thumbnail_key = COALESCE(%s, thumbnail_key)
|
thumbnail_key = COALESCE(%s, thumbnail_key)
|
||||||
WHERE image_uuid = %s;
|
WHERE image_uuid = %s;
|
||||||
""", (status, status, err, err, err, processed_key, thumb_key, uuid))
|
""", (status, status, err, processed_key, thumb_key, uuid))
|
||||||
|
|
||||||
|
|
||||||
|
def job_set_backfill(cur, uuid: str, *, needs_ocr_backfill: Optional[bool] = None, needs_exif_backfill: Optional[bool] = None):
|
||||||
|
if needs_ocr_backfill is None and needs_exif_backfill is None:
|
||||||
|
return
|
||||||
|
cur.execute("""
|
||||||
|
UPDATE remote_cam.import_job
|
||||||
|
SET needs_ocr_backfill = COALESCE(%s, needs_ocr_backfill),
|
||||||
|
needs_exif_backfill = COALESCE(%s, needs_exif_backfill)
|
||||||
|
WHERE image_uuid = %s;
|
||||||
|
""", (needs_ocr_backfill, needs_exif_backfill, uuid))
|
||||||
|
|
||||||
|
|
||||||
|
def job_get(cur, uuid: str):
|
||||||
|
cur.execute("""
|
||||||
|
SELECT status, processed_jpg_key, thumbnail_key, has_categories, needs_ocr_backfill, needs_exif_backfill
|
||||||
|
FROM remote_cam.import_job
|
||||||
|
WHERE image_uuid = %s;
|
||||||
|
""", (uuid,))
|
||||||
|
row = cur.fetchone()
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
"status": row[0],
|
||||||
|
"processed_jpg_key": row[1],
|
||||||
|
"thumbnail_key": row[2],
|
||||||
|
"has_categories": row[3],
|
||||||
|
"needs_ocr_backfill": row[4],
|
||||||
|
"needs_exif_backfill": row[5],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
STATUS_ORDER = [
|
||||||
|
"NEW",
|
||||||
|
"META_OK",
|
||||||
|
"OCR_OK",
|
||||||
|
"OCR_SKIPPED",
|
||||||
|
"EXIF_OK",
|
||||||
|
"EXIF_SKIPPED",
|
||||||
|
"MOVED",
|
||||||
|
"THUMB_OK",
|
||||||
|
"CATEGORIES_OK",
|
||||||
|
"API_FAIL",
|
||||||
|
"API_OK",
|
||||||
|
"CLEANED",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def status_rank(status: Optional[str]) -> int:
|
||||||
|
if status in STATUS_ORDER:
|
||||||
|
return STATUS_ORDER.index(status)
|
||||||
|
return -1
|
||||||
|
|
||||||
|
|
||||||
|
def is_terminal_success(status: Optional[str], cfg: AppConfig) -> bool:
|
||||||
|
if status is None:
|
||||||
|
return False
|
||||||
|
if status == "CLEANED":
|
||||||
|
return True
|
||||||
|
if cfg.enable_deermapper_api:
|
||||||
|
return status_rank(status) >= status_rank("API_OK")
|
||||||
|
return status_rank(status) >= status_rank("THUMB_OK")
|
||||||
|
|
||||||
|
|
||||||
def insert_resource(cur, uuid: str, typ: str):
|
def insert_resource(cur, uuid: str, typ: str):
|
||||||
@@ -314,15 +538,21 @@ def insert_metadata(cur, uuid: str, meta: dict):
|
|||||||
lat_f = float(lat) if lat not in (None, "") else None
|
lat_f = float(lat) if lat not in (None, "") else None
|
||||||
lon_f = float(lon) if lon not in (None, "") else None
|
lon_f = float(lon) if lon not in (None, "") else None
|
||||||
|
|
||||||
# NOTE: DateTimeLastSettings looks like "05:45:35 30.06.2025" in sample; parse if needed later.
|
dls_raw = get("DateTimeLastSettings")
|
||||||
|
dls = None
|
||||||
|
if dls_raw:
|
||||||
|
try:
|
||||||
|
dls = datetime.strptime(dls_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc)
|
||||||
|
except Exception:
|
||||||
|
dls = None
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
INSERT INTO remote_cam.metadata(
|
INSERT INTO remote_cam.metadata(
|
||||||
metadata_uuid, import_ts, ImageCreationDate, SizeInByte, ImageFormat, ImageNight,
|
metadata_uuid, import_ts, "ImageCreationDate", "SizeInByte", "ImageFormat", "ImageNight",
|
||||||
DateTimeLastSettings, BatteryStatus, FirmwareVersion, SignalStrength, Temperature,
|
"DateTimeLastSettings", "BatteryStatus", "FirmwareVersion", "SignalStrength", "Temperature",
|
||||||
CoordinateSwitch, Latitude, Longitude, WorkPeriod, WorkStart, WorkEnd, ThumbnailSize,
|
"CoordinateSwitch", "Latitude", "Longitude", "WorkPeriod", "WorkStart", "WorkEnd", "ThumbnailSize",
|
||||||
PIRInterval, TimeScan, imei, iccid, INITKey, CameraName, CameraModel, CameraLanguage,
|
"PIRInterval", "TimeScan", imei, iccid, "INITKey", "CameraName", "CameraModel", "CameraLanguage",
|
||||||
CameraNetwork, MobileNetworkCode, MobileCountryCode, LocationAreaCode, CustomerTimezone,
|
"CameraNetwork", "MobileNetworkCode", "MobileCountryCode", "LocationAreaCode", "CustomerTimezone",
|
||||||
CustomerLanguage, servertime, ocr_ts
|
"CustomerLanguage", servertime, ocr_ts
|
||||||
) VALUES (
|
) VALUES (
|
||||||
%s, now(), %s, %s, %s, %s,
|
%s, now(), %s, %s, %s, %s,
|
||||||
%s, %s, %s, %s, %s,
|
%s, %s, %s, %s, %s,
|
||||||
@@ -333,40 +563,40 @@ def insert_metadata(cur, uuid: str, meta: dict):
|
|||||||
)
|
)
|
||||||
ON CONFLICT (metadata_uuid) DO UPDATE SET
|
ON CONFLICT (metadata_uuid) DO UPDATE SET
|
||||||
import_ts = EXCLUDED.import_ts,
|
import_ts = EXCLUDED.import_ts,
|
||||||
ImageCreationDate = EXCLUDED.ImageCreationDate,
|
"ImageCreationDate" = EXCLUDED."ImageCreationDate",
|
||||||
SizeInByte = EXCLUDED.SizeInByte,
|
"SizeInByte" = EXCLUDED."SizeInByte",
|
||||||
ImageFormat = EXCLUDED.ImageFormat,
|
"ImageFormat" = EXCLUDED."ImageFormat",
|
||||||
ImageNight = EXCLUDED.ImageNight,
|
"ImageNight" = EXCLUDED."ImageNight",
|
||||||
DateTimeLastSettings = EXCLUDED.DateTimeLastSettings,
|
"DateTimeLastSettings" = EXCLUDED."DateTimeLastSettings",
|
||||||
BatteryStatus = EXCLUDED.BatteryStatus,
|
"BatteryStatus" = EXCLUDED."BatteryStatus",
|
||||||
FirmwareVersion = EXCLUDED.FirmwareVersion,
|
"FirmwareVersion" = EXCLUDED."FirmwareVersion",
|
||||||
SignalStrength = EXCLUDED.SignalStrength,
|
"SignalStrength" = EXCLUDED."SignalStrength",
|
||||||
Temperature = EXCLUDED.Temperature,
|
"Temperature" = EXCLUDED."Temperature",
|
||||||
CoordinateSwitch = EXCLUDED.CoordinateSwitch,
|
"CoordinateSwitch" = EXCLUDED."CoordinateSwitch",
|
||||||
Latitude = EXCLUDED.Latitude,
|
"Latitude" = EXCLUDED."Latitude",
|
||||||
Longitude = EXCLUDED.Longitude,
|
"Longitude" = EXCLUDED."Longitude",
|
||||||
WorkPeriod = EXCLUDED.WorkPeriod,
|
"WorkPeriod" = EXCLUDED."WorkPeriod",
|
||||||
WorkStart = EXCLUDED.WorkStart,
|
"WorkStart" = EXCLUDED."WorkStart",
|
||||||
WorkEnd = EXCLUDED.WorkEnd,
|
"WorkEnd" = EXCLUDED."WorkEnd",
|
||||||
ThumbnailSize = EXCLUDED.ThumbnailSize,
|
"ThumbnailSize" = EXCLUDED."ThumbnailSize",
|
||||||
PIRInterval = EXCLUDED.PIRInterval,
|
"PIRInterval" = EXCLUDED."PIRInterval",
|
||||||
TimeScan = EXCLUDED.TimeScan,
|
"TimeScan" = EXCLUDED."TimeScan",
|
||||||
imei = EXCLUDED.imei,
|
imei = EXCLUDED.imei,
|
||||||
iccid = EXCLUDED.iccid,
|
iccid = EXCLUDED.iccid,
|
||||||
INITKey = EXCLUDED.INITKey,
|
"INITKey" = EXCLUDED."INITKey",
|
||||||
CameraName = EXCLUDED.CameraName,
|
"CameraName" = EXCLUDED."CameraName",
|
||||||
CameraModel = EXCLUDED.CameraModel,
|
"CameraModel" = EXCLUDED."CameraModel",
|
||||||
CameraLanguage = EXCLUDED.CameraLanguage,
|
"CameraLanguage" = EXCLUDED."CameraLanguage",
|
||||||
CameraNetwork = EXCLUDED.CameraNetwork,
|
"CameraNetwork" = EXCLUDED."CameraNetwork",
|
||||||
MobileNetworkCode = EXCLUDED.MobileNetworkCode,
|
"MobileNetworkCode" = EXCLUDED."MobileNetworkCode",
|
||||||
MobileCountryCode = EXCLUDED.MobileCountryCode,
|
"MobileCountryCode" = EXCLUDED."MobileCountryCode",
|
||||||
LocationAreaCode = EXCLUDED.LocationAreaCode,
|
"LocationAreaCode" = EXCLUDED."LocationAreaCode",
|
||||||
CustomerTimezone = EXCLUDED.CustomerTimezone,
|
"CustomerTimezone" = EXCLUDED."CustomerTimezone",
|
||||||
CustomerLanguage = EXCLUDED.CustomerLanguage,
|
"CustomerLanguage" = EXCLUDED."CustomerLanguage",
|
||||||
servertime = EXCLUDED.servertime;
|
servertime = EXCLUDED.servertime;
|
||||||
""", (
|
""", (
|
||||||
uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"),
|
uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"),
|
||||||
get("DateTimeLastSettings"), get("BatteryStatus"), get("FirmwareVersion"),
|
dls, get("BatteryStatus"), get("FirmwareVersion"),
|
||||||
get("SignalStrength"), get("Temperature"),
|
get("SignalStrength"), get("Temperature"),
|
||||||
get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"),
|
get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"),
|
||||||
get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"),
|
get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"),
|
||||||
@@ -385,6 +615,16 @@ def update_ocr_ts(cur, uuid: str, ocr_ts: datetime):
|
|||||||
""", (ocr_ts, uuid))
|
""", (ocr_ts, uuid))
|
||||||
|
|
||||||
|
|
||||||
|
def get_ocr_ts(cur, uuid: str) -> Optional[datetime]:
|
||||||
|
cur.execute("""
|
||||||
|
SELECT ocr_ts
|
||||||
|
FROM remote_cam.metadata
|
||||||
|
WHERE metadata_uuid = %s;
|
||||||
|
""", (uuid,))
|
||||||
|
row = cur.fetchone()
|
||||||
|
return row[0] if row else None
|
||||||
|
|
||||||
|
|
||||||
def insert_categories(cur, uuid: str, cat_json: dict):
|
def insert_categories(cur, uuid: str, cat_json: dict):
|
||||||
version = cat_json.get("version")
|
version = cat_json.get("version")
|
||||||
cats = cat_json.get("categories") or []
|
cats = cat_json.get("categories") or []
|
||||||
@@ -396,6 +636,14 @@ def insert_categories(cur, uuid: str, cat_json: dict):
|
|||||||
""", (uuid, c.get("category"), c.get("score"), version))
|
""", (uuid, c.get("category"), c.get("score"), version))
|
||||||
|
|
||||||
|
|
||||||
|
def insert_api_response(cur, uuid: str, success: bool, message: str):
|
||||||
|
msg = (message or "")[:512]
|
||||||
|
cur.execute("""
|
||||||
|
INSERT INTO remote_cam.api_response(push_ts, message, image_uuid, success)
|
||||||
|
VALUES (now(), %s, %s::uuid, %s);
|
||||||
|
""", (msg, uuid, success))
|
||||||
|
|
||||||
|
|
||||||
# -----------------------
|
# -----------------------
|
||||||
# S3 ops wrapper
|
# S3 ops wrapper
|
||||||
# -----------------------
|
# -----------------------
|
||||||
@@ -407,6 +655,7 @@ class S3:
|
|||||||
endpoint_url=cfg.s3_endpoint,
|
endpoint_url=cfg.s3_endpoint,
|
||||||
aws_access_key_id=cfg.s3_access_key,
|
aws_access_key_id=cfg.s3_access_key,
|
||||||
aws_secret_access_key=cfg.s3_secret_key,
|
aws_secret_access_key=cfg.s3_secret_key,
|
||||||
|
config=BotoClientConfig(max_pool_connections=cfg.s3_max_pool_connections),
|
||||||
)
|
)
|
||||||
|
|
||||||
def list_entrance_objects(self):
|
def list_entrance_objects(self):
|
||||||
@@ -425,13 +674,21 @@ class S3:
|
|||||||
def head(self, key: str):
|
def head(self, key: str):
|
||||||
return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key)
|
return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key)
|
||||||
|
|
||||||
def delete_keys(self, keys):
|
def delete_keys(self, keys) -> int:
|
||||||
if not keys:
|
if not keys:
|
||||||
return
|
return 0
|
||||||
self.client.delete_objects(
|
resp = self.client.delete_objects(
|
||||||
Bucket=self.cfg.s3_bucket,
|
Bucket=self.cfg.s3_bucket,
|
||||||
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True},
|
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": False},
|
||||||
)
|
)
|
||||||
|
errors = resp.get("Errors") or []
|
||||||
|
if errors:
|
||||||
|
details = "; ".join(
|
||||||
|
f"{e.get('Key')}:{e.get('Code')}:{e.get('Message')}" for e in errors[:3]
|
||||||
|
)
|
||||||
|
raise RuntimeError(f"S3 delete failed for {len(errors)} object(s): {details}")
|
||||||
|
deleted = resp.get("Deleted") or []
|
||||||
|
return len(deleted)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------
|
# -----------------------
|
||||||
@@ -489,54 +746,209 @@ def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]:
|
|||||||
|
|
||||||
def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
|
def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
|
||||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||||
|
job = job_get(cur, s.uuid) or {}
|
||||||
|
status_now = job.get("status", "NEW")
|
||||||
|
|
||||||
# 1) metadata.json -> DB
|
# 1) metadata.json -> DB
|
||||||
meta = json.loads(s3.get_bytes(s.meta_key))
|
meta = json.loads(s3.get_bytes(s.meta_key))
|
||||||
insert_metadata(cur, s.uuid, meta)
|
insert_metadata(cur, s.uuid, meta)
|
||||||
insert_resource(cur, s.uuid, "metadata")
|
insert_resource(cur, s.uuid, "metadata")
|
||||||
|
if status_rank("META_OK") >= status_rank(status_now):
|
||||||
job_set_status(cur, s.uuid, "META_OK")
|
job_set_status(cur, s.uuid, "META_OK")
|
||||||
|
status_now = "META_OK"
|
||||||
|
|
||||||
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
|
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
|
||||||
jpg_bytes = s3.get_bytes(s.jpg_key)
|
jpg_bytes = s3.get_bytes(s.jpg_key)
|
||||||
|
ocr_ts = get_ocr_ts(cur, s.uuid)
|
||||||
|
ocr_needs_run = cfg.enable_ocr and (
|
||||||
|
status_rank(status_now) < status_rank("OCR_OK") or status_now == "OCR_SKIPPED"
|
||||||
|
)
|
||||||
|
|
||||||
|
if ocr_needs_run:
|
||||||
|
try:
|
||||||
customer_tzinfo, _ = parse_customer_tz(meta)
|
customer_tzinfo, _ = parse_customer_tz(meta)
|
||||||
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
|
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
|
||||||
update_ocr_ts(cur, s.uuid, ocr_ts)
|
update_ocr_ts(cur, s.uuid, ocr_ts)
|
||||||
insert_resource(cur, s.uuid, "image")
|
insert_resource(cur, s.uuid, "image")
|
||||||
|
job_set_backfill(cur, s.uuid, needs_ocr_backfill=False)
|
||||||
job_set_status(cur, s.uuid, "OCR_OK")
|
job_set_status(cur, s.uuid, "OCR_OK")
|
||||||
|
status_now = "OCR_OK"
|
||||||
|
except Exception as e:
|
||||||
|
job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
|
||||||
|
job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e))
|
||||||
|
status_now = "OCR_FAIL"
|
||||||
|
elif not cfg.enable_ocr:
|
||||||
|
job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
|
||||||
|
if status_rank(status_now) < status_rank("OCR_SKIPPED"):
|
||||||
|
job_set_status(cur, s.uuid, "OCR_SKIPPED")
|
||||||
|
status_now = "OCR_SKIPPED"
|
||||||
|
|
||||||
# 3) EXIF write:
|
# 3) EXIF write:
|
||||||
# DateTimeOriginal = ocr_ts (local, keep offset tags)
|
# DateTimeOriginal = ocr_ts (local, keep offset tags)
|
||||||
# DateTimeDigitized = servertime (UTC)
|
# DateTimeDigitized = servertime (UTC)
|
||||||
|
jpg_exif = jpg_bytes
|
||||||
|
if not cfg.enable_exif:
|
||||||
|
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
|
||||||
|
if status_rank(status_now) < status_rank("EXIF_SKIPPED"):
|
||||||
|
job_set_status(cur, s.uuid, "EXIF_SKIPPED")
|
||||||
|
status_now = "EXIF_SKIPPED"
|
||||||
|
elif status_rank(status_now) < status_rank("MOVED"):
|
||||||
|
if ocr_ts is None:
|
||||||
|
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
|
||||||
|
job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing")
|
||||||
|
status_now = "EXIF_SKIPPED"
|
||||||
|
else:
|
||||||
|
try:
|
||||||
servertime = parse_servertime(meta)
|
servertime = parse_servertime(meta)
|
||||||
jpg_exif = exif_write_with_exiftool(jpg_bytes, ocr_ts, servertime)
|
jpg_exif = exif_write_with_exiftool(
|
||||||
|
jpg_bytes,
|
||||||
|
ocr_ts,
|
||||||
|
servertime,
|
||||||
|
timeout_seconds=cfg.exiftool_timeout_seconds,
|
||||||
|
)
|
||||||
|
job_set_backfill(cur, s.uuid, needs_exif_backfill=False)
|
||||||
job_set_status(cur, s.uuid, "EXIF_OK")
|
job_set_status(cur, s.uuid, "EXIF_OK")
|
||||||
|
status_now = "EXIF_OK"
|
||||||
|
except Exception as e:
|
||||||
|
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
|
||||||
|
job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e))
|
||||||
|
status_now = "EXIF_FAIL"
|
||||||
|
|
||||||
# 4) Upload processed JPG (EXIF patched)
|
# 4) Upload processed JPG (EXIF patched)
|
||||||
processed_key = f"{cfg.processed_prefix}{s.uuid}.jpg"
|
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{s.uuid}.jpg"
|
||||||
|
if status_rank(status_now) < status_rank("MOVED"):
|
||||||
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
|
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
|
||||||
s3.head(processed_key)
|
s3.head(processed_key)
|
||||||
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
|
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
|
||||||
|
status_now = "MOVED"
|
||||||
|
|
||||||
# 5) Thumbnail
|
# 5) Thumbnail
|
||||||
|
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
||||||
|
if status_rank(status_now) < status_rank("THUMB_OK"):
|
||||||
|
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
|
||||||
|
jpg_exif = s3.get_bytes(processed_key)
|
||||||
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
|
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
|
||||||
thumb_key = f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
|
||||||
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
|
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
|
||||||
s3.head(thumb_key)
|
s3.head(thumb_key)
|
||||||
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
|
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
|
||||||
|
status_now = "THUMB_OK"
|
||||||
|
|
||||||
# 6) Optional categories
|
# 6) Optional categories
|
||||||
if s.cat_key:
|
if s.cat_key and status_rank(status_now) < status_rank("CATEGORIES_OK"):
|
||||||
cat_json = json.loads(s3.get_bytes(s.cat_key))
|
cat_json = json.loads(s3.get_bytes(s.cat_key))
|
||||||
insert_categories(cur, s.uuid, cat_json)
|
insert_categories(cur, s.uuid, cat_json)
|
||||||
insert_resource(cur, s.uuid, "classification")
|
insert_resource(cur, s.uuid, "classification")
|
||||||
job_set_status(cur, s.uuid, "CATEGORIES_OK")
|
job_set_status(cur, s.uuid, "CATEGORIES_OK")
|
||||||
|
status_now = "CATEGORIES_OK"
|
||||||
|
|
||||||
# 7) Cleanup entrance only after verify
|
# 7) Optional DeerMapper API push (image + metadata)
|
||||||
|
if cfg.enable_deermapper_api and status_rank(status_now) < status_rank("API_OK"):
|
||||||
|
api_key = (cfg.deermapper_api_key or "").strip()
|
||||||
|
api_url = (cfg.deermapper_api_base_url or "").strip()
|
||||||
|
if not api_key or not api_url:
|
||||||
|
err_msg = "DeerMapper API config missing"
|
||||||
|
job_set_status(cur, s.uuid, "API_FAIL", err=err_msg)
|
||||||
|
insert_api_response(cur, s.uuid, False, err_msg)
|
||||||
|
status_now = "API_FAIL"
|
||||||
|
else:
|
||||||
|
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
|
||||||
|
jpg_exif = s3.get_bytes(processed_key)
|
||||||
|
imei = str(meta.get("imei") or "").strip()
|
||||||
|
ok, msg = push_to_deermapper_api(cfg, s.uuid, imei, meta, jpg_exif)
|
||||||
|
if ok:
|
||||||
|
insert_api_response(cur, s.uuid, True, msg)
|
||||||
|
job_set_status(cur, s.uuid, "API_OK")
|
||||||
|
status_now = "API_OK"
|
||||||
|
else:
|
||||||
|
insert_api_response(cur, s.uuid, False, msg)
|
||||||
|
job_set_status(cur, s.uuid, "API_FAIL", err=msg)
|
||||||
|
status_now = "API_FAIL"
|
||||||
|
|
||||||
|
# 8) Cleanup entrance only after verify (+ API when enabled)
|
||||||
|
api_ready_for_cleanup = (not cfg.enable_deermapper_api) or status_rank(status_now) >= status_rank("API_OK")
|
||||||
|
if status_rank(status_now) >= status_rank("THUMB_OK") and api_ready_for_cleanup:
|
||||||
|
try:
|
||||||
|
s3.head(processed_key)
|
||||||
|
s3.head(thumb_key)
|
||||||
to_delete = [s.jpg_key, s.meta_key]
|
to_delete = [s.jpg_key, s.meta_key]
|
||||||
if s.cat_key:
|
if s.cat_key:
|
||||||
to_delete.append(s.cat_key)
|
to_delete.append(s.cat_key)
|
||||||
s3.delete_keys(to_delete)
|
s3.delete_keys(to_delete)
|
||||||
job_set_status(cur, s.uuid, "CLEANED")
|
job_set_status(cur, s.uuid, "CLEANED")
|
||||||
|
status_now = "CLEANED"
|
||||||
|
except Exception as e:
|
||||||
|
job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
def is_completed_job_materialized(cur, s3: S3, cfg: AppConfig, uuid: str) -> bool:
|
||||||
|
job = job_get(cur, uuid)
|
||||||
|
if not job:
|
||||||
|
return False
|
||||||
|
if status_rank(job.get("status")) < status_rank("THUMB_OK"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{uuid}.jpg"
|
||||||
|
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{uuid}.jpg"
|
||||||
|
try:
|
||||||
|
s3.head(processed_key)
|
||||||
|
s3.head(thumb_key)
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_entrance_set(s3: S3, s: EntranceSet) -> int:
|
||||||
|
to_delete = [s.jpg_key, s.meta_key]
|
||||||
|
if s.cat_key:
|
||||||
|
to_delete.append(s.cat_key)
|
||||||
|
return s3.delete_keys(to_delete)
|
||||||
|
|
||||||
|
|
||||||
|
def _process_ready_batch(cfg: AppConfig, batch: list[EntranceSet]):
|
||||||
|
s3c = S3(cfg)
|
||||||
|
with psycopg.connect(cfg.pg_dsn) as db:
|
||||||
|
for s in batch:
|
||||||
|
start = time.perf_counter()
|
||||||
|
result = "failure"
|
||||||
|
final_status = "UNKNOWN"
|
||||||
|
err_text = None
|
||||||
|
cleanup_deleted = None
|
||||||
|
try:
|
||||||
|
skip_duplicate = False
|
||||||
|
with db.transaction():
|
||||||
|
with db.cursor() as cur:
|
||||||
|
if is_completed_job_materialized(cur, s3c, cfg, s.uuid):
|
||||||
|
skip_duplicate = True
|
||||||
|
else:
|
||||||
|
process_one(cur, s3c, cfg, s)
|
||||||
|
final_status = (job_get(cur, s.uuid) or {}).get("status", "UNKNOWN")
|
||||||
|
result = "success" if is_terminal_success(final_status, cfg) else "failure"
|
||||||
|
if skip_duplicate:
|
||||||
|
cleanup_deleted = cleanup_entrance_set(s3c, s)
|
||||||
|
final_status = "DUPLICATE_CLEANED"
|
||||||
|
result = "success"
|
||||||
|
except Exception as e:
|
||||||
|
with db.transaction():
|
||||||
|
with db.cursor() as cur:
|
||||||
|
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||||
|
job_set_status(cur, s.uuid, "ERROR", err=str(e))
|
||||||
|
final_status = "ERROR"
|
||||||
|
err_text = str(e)
|
||||||
|
elapsed_ms = int((time.perf_counter() - start) * 1000)
|
||||||
|
if err_text:
|
||||||
|
print(
|
||||||
|
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
|
||||||
|
f"process_ms={elapsed_ms} error={err_text}"
|
||||||
|
)
|
||||||
|
elif cleanup_deleted is not None:
|
||||||
|
print(
|
||||||
|
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
|
||||||
|
f"process_ms={elapsed_ms} cleanup_deleted={cleanup_deleted}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print(f"[RESULT] uuid={s.uuid} result={result} status={final_status} process_ms={elapsed_ms}")
|
||||||
|
if cfg.job_sleep_seconds > 0:
|
||||||
|
time.sleep(cfg.job_sleep_seconds)
|
||||||
|
|
||||||
|
|
||||||
def run_once(cfg: AppConfig) -> int:
|
def run_once(cfg: AppConfig) -> int:
|
||||||
@@ -545,22 +957,20 @@ def run_once(cfg: AppConfig) -> int:
|
|||||||
if not ready:
|
if not ready:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
with psycopg.connect(cfg.pg_dsn) as db:
|
workers = max(1, min(cfg.parallel_workers, len(ready)))
|
||||||
with db.cursor() as cur:
|
if workers == 1:
|
||||||
db_init(cur)
|
_process_ready_batch(cfg, ready)
|
||||||
db.commit()
|
return len(ready)
|
||||||
|
|
||||||
|
batches: list[list[EntranceSet]] = [[] for _ in range(workers)]
|
||||||
|
for i, s in enumerate(ready):
|
||||||
|
batches[i % workers].append(s)
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||||
|
futures = [pool.submit(_process_ready_batch, cfg, batch) for batch in batches if batch]
|
||||||
|
for future in as_completed(futures):
|
||||||
|
future.result()
|
||||||
|
|
||||||
for s in ready:
|
|
||||||
try:
|
|
||||||
with db.transaction():
|
|
||||||
with db.cursor() as cur:
|
|
||||||
process_one(cur, s3c, cfg, s)
|
|
||||||
except Exception as e:
|
|
||||||
with db.transaction():
|
|
||||||
with db.cursor() as cur:
|
|
||||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
|
||||||
job_set_status(cur, s.uuid, "ERROR", err=str(e))
|
|
||||||
print(f"[ERROR] {s.uuid}: {e}")
|
|
||||||
return len(ready)
|
return len(ready)
|
||||||
|
|
||||||
|
|
||||||
@@ -568,6 +978,11 @@ def main():
|
|||||||
cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml")
|
cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml")
|
||||||
cfg = load_config(cfg_path)
|
cfg = load_config(cfg_path)
|
||||||
|
|
||||||
|
with psycopg.connect(cfg.pg_dsn) as db:
|
||||||
|
with db.cursor() as cur:
|
||||||
|
db_init(cur)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
n = run_once(cfg)
|
n = run_once(cfg)
|
||||||
if n == 0:
|
if n == 0:
|
||||||
|
|||||||
Reference in New Issue
Block a user