Compare commits
8 Commits
8414e72870
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2e9ee3e014 | |||
| 2294962d57 | |||
| ddfb281be3 | |||
| b35394c569 | |||
| 3873cb7b3f | |||
| 1e4a7b86b6 | |||
| 400f8a53f8 | |||
| d71e1699d6 |
5
.env
Normal file
5
.env
Normal file
@@ -0,0 +1,5 @@
|
||||
PG_DSN=postgresql://postgres:DFGk5s9H21lKao2K@136.243.41.58:7777/meles
|
||||
S3_BUCKET=trapper-meles
|
||||
S3_ACCESS_KEY=UG950FCGWFYRXSXXVUAH
|
||||
S3_SECRET_KEY=rXpspNhboZY6zNZi7djjq2QaXPA4uwsO9jXf4AXk
|
||||
S3_ENDPOINT=https://fsn1.your-objectstorage.com/
|
||||
29
config.yaml
29
config.yaml
@@ -1,12 +1,16 @@
|
||||
# config.yaml (Beispiel)
|
||||
s3:
|
||||
endpoint: "https://<dein-endpoint>"
|
||||
access_key: "<key>"
|
||||
secret_key: "<secret>"
|
||||
bucket: "<bucket>"
|
||||
endpoint: "https://fsn1.your-objectstorage.com/"
|
||||
access_key: "UG950FCGWFYRXSXXVUAH"
|
||||
secret_key: "rXpspNhboZY6zNZi7djjq2QaXPA4uwsO9jXf4AXk"
|
||||
bucket: "trapper-meles"
|
||||
|
||||
postgres:
|
||||
dsn: "postgresql://user:pass@host:5432/dbname"
|
||||
dsn: "postgresql://postgres:DFGk5s9H21lKao2K@136.243.41.58:7777/meles"
|
||||
|
||||
deermapper-api:
|
||||
base_url: "https://webapp.deermapper.net/api/icu"
|
||||
apiKey: "695bc217-3b40-48bb-bb12-17fc5b08b320"
|
||||
|
||||
app:
|
||||
entrance_prefix: "icu/entrance/"
|
||||
@@ -15,6 +19,17 @@ app:
|
||||
min_age_seconds: 90
|
||||
poll_seconds: 30
|
||||
# OCR crop tunables (defaults are usually fine)
|
||||
ocr_crop_w_frac: 0.42
|
||||
ocr_crop_h_frac: 0.22
|
||||
ocr_crop_w_frac: 0.63
|
||||
ocr_crop_h_frac: 0.05
|
||||
thumb_max: 512
|
||||
exiftool_timeout_seconds: 15
|
||||
job_sleep_seconds: 0.0
|
||||
parallel_workers: 6
|
||||
s3_max_pool_connections: 48
|
||||
# Optional processing switches:
|
||||
# - false => image still gets moved + thumbnail, and backfill flag is set in import_job
|
||||
enable_ocr: true
|
||||
enable_exif: true
|
||||
enable_deermapper_api: false
|
||||
deermapper_api_timeout_seconds: 20
|
||||
deermapper_api_image_field: "image"
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
services:
|
||||
melesICUmover:
|
||||
melesicumover:
|
||||
build: .
|
||||
container_name: melesICUmover
|
||||
container_name: melesicumover
|
||||
restart: unless-stopped
|
||||
environment:
|
||||
CONFIG_YAML: /app/config.yaml
|
||||
DEBUGPY: "1"
|
||||
DEBUG: "0"
|
||||
ports:
|
||||
- "5678:5678"
|
||||
volumes:
|
||||
|
||||
14
launch.json
14
launch.json
@@ -1,14 +0,0 @@
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Attach to Docker (melesICUmover)",
|
||||
"type": "python",
|
||||
"request": "attach",
|
||||
"connect": {
|
||||
"host": "localhost",
|
||||
"port": 5678
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
569
main.py
569
main.py
@@ -6,12 +6,17 @@ import json
|
||||
import time
|
||||
import tempfile
|
||||
import subprocess
|
||||
import uuid as uuidlib
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Dict, Tuple
|
||||
from urllib import error as urlerror
|
||||
from urllib import request as urlrequest
|
||||
|
||||
import boto3
|
||||
import psycopg
|
||||
from botocore.config import Config as BotoClientConfig
|
||||
from dateutil import tz
|
||||
from PIL import Image, ImageOps, ImageEnhance
|
||||
import pytesseract
|
||||
@@ -38,6 +43,9 @@ CAT_RE = re.compile(r"^([0-9a-fA-F-]{36})_categories\.json$")
|
||||
OCR_DT_RE = re.compile(
|
||||
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s+(?P<d>\d{2})-(?P<mo>\d{2})-(?P<y>\d{4})"
|
||||
)
|
||||
OCR_DT_RE_FLEX = re.compile(
|
||||
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s*(?P<d>\d{2})[-.](?P<mo>\d{2})[-.](?P<y>\d{4})"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -61,6 +69,17 @@ class AppConfig:
|
||||
ocr_crop_h_frac: float = 0.22 # bottom ~22% of height
|
||||
|
||||
thumb_max: int = 512
|
||||
exiftool_timeout_seconds: int = 15
|
||||
job_sleep_seconds: float = 0.0
|
||||
parallel_workers: int = 4
|
||||
s3_max_pool_connections: int = 32
|
||||
enable_ocr: bool = True
|
||||
enable_exif: bool = True
|
||||
enable_deermapper_api: bool = False
|
||||
deermapper_api_base_url: str = "https://webapp.deermapper.net/api/icu"
|
||||
deermapper_api_key: str = ""
|
||||
deermapper_api_timeout_seconds: int = 20
|
||||
deermapper_api_image_field: str = "image"
|
||||
|
||||
|
||||
def load_config(path: str) -> AppConfig:
|
||||
@@ -71,9 +90,26 @@ def load_config(path: str) -> AppConfig:
|
||||
def env_or(key: str, default=None):
|
||||
return os.environ.get(key, default)
|
||||
|
||||
def as_bool(value, default: bool = True) -> bool:
|
||||
if value is None:
|
||||
return default
|
||||
if isinstance(value, bool):
|
||||
return value
|
||||
if isinstance(value, (int, float)):
|
||||
return bool(value)
|
||||
if isinstance(value, str):
|
||||
return value.strip().lower() in ("1", "true", "yes", "on")
|
||||
return bool(value)
|
||||
|
||||
s3 = raw["s3"]
|
||||
pg = raw["postgres"]
|
||||
app = raw.get("app", {})
|
||||
deermapper = raw.get("deermapper-api", raw.get("deermapper_api", {}))
|
||||
default_workers = max(1, min(16, (os.cpu_count() or 2) * 2))
|
||||
parallel_workers = int(app.get("parallel_workers", default_workers))
|
||||
parallel_workers = max(1, parallel_workers)
|
||||
s3_pool = int(app.get("s3_max_pool_connections", max(16, parallel_workers * 4)))
|
||||
s3_pool = max(10, s3_pool)
|
||||
|
||||
return AppConfig(
|
||||
s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]),
|
||||
@@ -89,6 +125,17 @@ def load_config(path: str) -> AppConfig:
|
||||
ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)),
|
||||
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
|
||||
thumb_max=int(app.get("thumb_max", 512)),
|
||||
exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)),
|
||||
job_sleep_seconds=float(app.get("job_sleep_seconds", 0.0)),
|
||||
parallel_workers=parallel_workers,
|
||||
s3_max_pool_connections=s3_pool,
|
||||
enable_ocr=as_bool(app.get("enable_ocr", True), True),
|
||||
enable_exif=as_bool(app.get("enable_exif", True), True),
|
||||
enable_deermapper_api=as_bool(app.get("enable_deermapper_api", False), False),
|
||||
deermapper_api_base_url=env_or("DEERMAPPER_API_URL", deermapper.get("base_url", "https://webapp.deermapper.net/api/icu")),
|
||||
deermapper_api_key=env_or("DEERMAPPER_API_KEY", deermapper.get("apiKey", "")),
|
||||
deermapper_api_timeout_seconds=int(app.get("deermapper_api_timeout_seconds", 20)),
|
||||
deermapper_api_image_field=app.get("deermapper_api_image_field", "image"),
|
||||
)
|
||||
|
||||
|
||||
@@ -156,28 +203,58 @@ def ocr_extract_timestamp(jpg_bytes: bytes, crop_w_frac: float, crop_h_frac: flo
|
||||
cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- '
|
||||
text = pytesseract.image_to_string(bw, config=cfg).strip()
|
||||
|
||||
m = OCR_DT_RE.search(text)
|
||||
if not m:
|
||||
# retry without thresholding
|
||||
text2 = pytesseract.image_to_string(gray, config=cfg).strip()
|
||||
m = OCR_DT_RE.search(text2)
|
||||
if not m:
|
||||
raise ValueError(f"OCR timestamp not found. OCR1='{text}' OCR2='{text2}'")
|
||||
|
||||
dt_local_naive = datetime(
|
||||
int(m.group("y")),
|
||||
int(m.group("mo")),
|
||||
int(m.group("d")),
|
||||
int(m.group("h")),
|
||||
int(m.group("m")),
|
||||
int(m.group("s")),
|
||||
)
|
||||
dt_local_naive = _parse_ocr_datetime(text, text2)
|
||||
|
||||
# Keep camera-local timezone (no UTC conversion requested)
|
||||
return dt_local_naive.replace(tzinfo=customer_tzinfo)
|
||||
|
||||
|
||||
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime) -> bytes:
|
||||
def _parse_ocr_datetime(*texts: str) -> datetime:
|
||||
def build_dt(h, m, s, d, mo, y):
|
||||
return datetime(int(y), int(mo), int(d), int(h), int(m), int(s))
|
||||
|
||||
def valid_dt(h, m, s, d, mo, y) -> bool:
|
||||
try:
|
||||
build_dt(h, m, s, d, mo, y)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# 1) direct regex matches
|
||||
for t in texts:
|
||||
for rx in (OCR_DT_RE, OCR_DT_RE_FLEX):
|
||||
m = rx.search(t)
|
||||
if m and valid_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y")):
|
||||
return build_dt(m.group("h"), m.group("m"), m.group("s"), m.group("d"), m.group("mo"), m.group("y"))
|
||||
|
||||
# 2) find time and date separately (handles missing space between time and date)
|
||||
time_rx = re.compile(r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})")
|
||||
date_rx = re.compile(r"(?P<d>\d{2})[-.](?P<mo>\d{2})[-.](?P<y>\d{4})")
|
||||
for t in texts:
|
||||
tm = time_rx.search(t)
|
||||
dm = date_rx.search(t)
|
||||
if tm and dm and valid_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y")):
|
||||
return build_dt(tm.group("h"), tm.group("m"), tm.group("s"), dm.group("d"), dm.group("mo"), dm.group("y"))
|
||||
|
||||
# 3) digits-only fallback: scan for HHMMSSDDMMYYYY
|
||||
for t in texts:
|
||||
digits = re.sub(r"\D", "", t)
|
||||
for i in range(0, max(0, len(digits) - 13)):
|
||||
chunk = digits[i:i + 14]
|
||||
if len(chunk) < 14:
|
||||
continue
|
||||
h, m, s = chunk[0:2], chunk[2:4], chunk[4:6]
|
||||
d, mo, y = chunk[6:8], chunk[8:10], chunk[10:14]
|
||||
if valid_dt(h, m, s, d, mo, y):
|
||||
return build_dt(h, m, s, d, mo, y)
|
||||
|
||||
raise ValueError(f"OCR timestamp not found. OCR1='{texts[0] if texts else ''}' OCR2='{texts[1] if len(texts) > 1 else ''}'")
|
||||
|
||||
|
||||
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime, *, timeout_seconds: int) -> bytes:
|
||||
"""
|
||||
DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal
|
||||
DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00
|
||||
@@ -204,7 +281,7 @@ def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digi
|
||||
f"-OffsetTime={otd}",
|
||||
in_path,
|
||||
]
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_seconds)
|
||||
|
||||
with open(in_path, "rb") as f:
|
||||
return f.read()
|
||||
@@ -219,6 +296,71 @@ def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes:
|
||||
return out.getvalue()
|
||||
|
||||
|
||||
def _multipart_form_data(fields: Dict[str, str], files: list[tuple[str, str, bytes, str]]) -> tuple[str, bytes]:
|
||||
boundary = f"----meles-{uuidlib.uuid4().hex}"
|
||||
boundary_bytes = boundary.encode("ascii")
|
||||
crlf = b"\r\n"
|
||||
|
||||
parts: list[bytes] = []
|
||||
for name, value in fields.items():
|
||||
parts.extend([
|
||||
b"--" + boundary_bytes,
|
||||
f'Content-Disposition: form-data; name="{name}"'.encode("utf-8"),
|
||||
b"",
|
||||
str(value).encode("utf-8"),
|
||||
])
|
||||
for field_name, filename, content, content_type in files:
|
||||
parts.extend([
|
||||
b"--" + boundary_bytes,
|
||||
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"'.encode("utf-8"),
|
||||
f"Content-Type: {content_type}".encode("utf-8"),
|
||||
b"",
|
||||
content,
|
||||
])
|
||||
parts.append(b"--" + boundary_bytes + b"--")
|
||||
body = crlf.join(parts) + crlf
|
||||
return f"multipart/form-data; boundary={boundary}", body
|
||||
|
||||
|
||||
def push_to_deermapper_api(cfg: AppConfig, image_uuid: str, imei: str, metadata: dict, jpg_bytes: bytes) -> tuple[bool, str]:
|
||||
fields = {
|
||||
"apiKey": cfg.deermapper_api_key,
|
||||
"imei": imei,
|
||||
"metadata": json.dumps(metadata, ensure_ascii=False),
|
||||
}
|
||||
content_type, body = _multipart_form_data(
|
||||
fields,
|
||||
[(cfg.deermapper_api_image_field, f"{image_uuid}.jpg", jpg_bytes, "image/jpeg")],
|
||||
)
|
||||
req = urlrequest.Request(
|
||||
cfg.deermapper_api_base_url,
|
||||
data=body,
|
||||
method="POST",
|
||||
headers={
|
||||
"Content-Type": content_type,
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
with urlrequest.urlopen(req, timeout=cfg.deermapper_api_timeout_seconds) as resp:
|
||||
payload = resp.read().decode("utf-8", errors="replace")
|
||||
try:
|
||||
data = json.loads(payload) if payload else {}
|
||||
except Exception:
|
||||
return False, f"API response not JSON: {payload[:300]}"
|
||||
status_val = str(data.get("status", "")).strip()
|
||||
msg = str(data.get("msg", "")).strip()
|
||||
if status_val == "1":
|
||||
return True, msg or "ok"
|
||||
return False, msg or f"status={status_val or 'unknown'}"
|
||||
except urlerror.HTTPError as e:
|
||||
body_txt = e.read().decode("utf-8", errors="replace")
|
||||
return False, f"HTTP {e.code}: {body_txt[:300]}"
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
||||
|
||||
# -----------------------
|
||||
# DB ops
|
||||
# -----------------------
|
||||
@@ -235,10 +377,22 @@ def db_init(cur):
|
||||
entrance_cat_key text,
|
||||
processed_jpg_key text,
|
||||
thumbnail_key text,
|
||||
needs_ocr_backfill boolean NOT NULL DEFAULT false,
|
||||
needs_exif_backfill boolean NOT NULL DEFAULT false,
|
||||
created_ts timestamptz NOT NULL DEFAULT now(),
|
||||
updated_ts timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
""")
|
||||
cur.execute("""
|
||||
ALTER TABLE remote_cam.import_job
|
||||
ADD COLUMN IF NOT EXISTS needs_ocr_backfill boolean NOT NULL DEFAULT false;
|
||||
""")
|
||||
cur.execute("""
|
||||
ALTER TABLE remote_cam.import_job
|
||||
ADD COLUMN IF NOT EXISTS needs_exif_backfill boolean NOT NULL DEFAULT false;
|
||||
""")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_ocr_idx ON remote_cam.import_job(needs_ocr_backfill);")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_needs_exif_idx ON remote_cam.import_job(needs_exif_backfill);")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);")
|
||||
|
||||
@@ -261,6 +415,14 @@ def db_init(cur):
|
||||
END IF;
|
||||
END$$;
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS remote_cam.api_response (
|
||||
push_ts timestamp without time zone,
|
||||
message character(512),
|
||||
image_uuid uuid,
|
||||
success boolean
|
||||
);
|
||||
""")
|
||||
|
||||
|
||||
def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]):
|
||||
@@ -280,11 +442,73 @@ def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, pr
|
||||
UPDATE remote_cam.import_job
|
||||
SET status = %s,
|
||||
attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END,
|
||||
last_error = CASE WHEN %s IS NULL THEN last_error ELSE %s END,
|
||||
last_error = COALESCE(%s::text, last_error),
|
||||
processed_jpg_key = COALESCE(%s, processed_jpg_key),
|
||||
thumbnail_key = COALESCE(%s, thumbnail_key)
|
||||
WHERE image_uuid = %s;
|
||||
""", (status, status, err, err, err, processed_key, thumb_key, uuid))
|
||||
""", (status, status, err, processed_key, thumb_key, uuid))
|
||||
|
||||
|
||||
def job_set_backfill(cur, uuid: str, *, needs_ocr_backfill: Optional[bool] = None, needs_exif_backfill: Optional[bool] = None):
|
||||
if needs_ocr_backfill is None and needs_exif_backfill is None:
|
||||
return
|
||||
cur.execute("""
|
||||
UPDATE remote_cam.import_job
|
||||
SET needs_ocr_backfill = COALESCE(%s, needs_ocr_backfill),
|
||||
needs_exif_backfill = COALESCE(%s, needs_exif_backfill)
|
||||
WHERE image_uuid = %s;
|
||||
""", (needs_ocr_backfill, needs_exif_backfill, uuid))
|
||||
|
||||
|
||||
def job_get(cur, uuid: str):
|
||||
cur.execute("""
|
||||
SELECT status, processed_jpg_key, thumbnail_key, has_categories, needs_ocr_backfill, needs_exif_backfill
|
||||
FROM remote_cam.import_job
|
||||
WHERE image_uuid = %s;
|
||||
""", (uuid,))
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {
|
||||
"status": row[0],
|
||||
"processed_jpg_key": row[1],
|
||||
"thumbnail_key": row[2],
|
||||
"has_categories": row[3],
|
||||
"needs_ocr_backfill": row[4],
|
||||
"needs_exif_backfill": row[5],
|
||||
}
|
||||
|
||||
|
||||
STATUS_ORDER = [
|
||||
"NEW",
|
||||
"META_OK",
|
||||
"OCR_OK",
|
||||
"OCR_SKIPPED",
|
||||
"EXIF_OK",
|
||||
"EXIF_SKIPPED",
|
||||
"MOVED",
|
||||
"THUMB_OK",
|
||||
"CATEGORIES_OK",
|
||||
"API_FAIL",
|
||||
"API_OK",
|
||||
"CLEANED",
|
||||
]
|
||||
|
||||
|
||||
def status_rank(status: Optional[str]) -> int:
|
||||
if status in STATUS_ORDER:
|
||||
return STATUS_ORDER.index(status)
|
||||
return -1
|
||||
|
||||
|
||||
def is_terminal_success(status: Optional[str], cfg: AppConfig) -> bool:
|
||||
if status is None:
|
||||
return False
|
||||
if status == "CLEANED":
|
||||
return True
|
||||
if cfg.enable_deermapper_api:
|
||||
return status_rank(status) >= status_rank("API_OK")
|
||||
return status_rank(status) >= status_rank("THUMB_OK")
|
||||
|
||||
|
||||
def insert_resource(cur, uuid: str, typ: str):
|
||||
@@ -314,15 +538,21 @@ def insert_metadata(cur, uuid: str, meta: dict):
|
||||
lat_f = float(lat) if lat not in (None, "") else None
|
||||
lon_f = float(lon) if lon not in (None, "") else None
|
||||
|
||||
# NOTE: DateTimeLastSettings looks like "05:45:35 30.06.2025" in sample; parse if needed later.
|
||||
dls_raw = get("DateTimeLastSettings")
|
||||
dls = None
|
||||
if dls_raw:
|
||||
try:
|
||||
dls = datetime.strptime(dls_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc)
|
||||
except Exception:
|
||||
dls = None
|
||||
cur.execute("""
|
||||
INSERT INTO remote_cam.metadata(
|
||||
metadata_uuid, import_ts, ImageCreationDate, SizeInByte, ImageFormat, ImageNight,
|
||||
DateTimeLastSettings, BatteryStatus, FirmwareVersion, SignalStrength, Temperature,
|
||||
CoordinateSwitch, Latitude, Longitude, WorkPeriod, WorkStart, WorkEnd, ThumbnailSize,
|
||||
PIRInterval, TimeScan, imei, iccid, INITKey, CameraName, CameraModel, CameraLanguage,
|
||||
CameraNetwork, MobileNetworkCode, MobileCountryCode, LocationAreaCode, CustomerTimezone,
|
||||
CustomerLanguage, servertime, ocr_ts
|
||||
metadata_uuid, import_ts, "ImageCreationDate", "SizeInByte", "ImageFormat", "ImageNight",
|
||||
"DateTimeLastSettings", "BatteryStatus", "FirmwareVersion", "SignalStrength", "Temperature",
|
||||
"CoordinateSwitch", "Latitude", "Longitude", "WorkPeriod", "WorkStart", "WorkEnd", "ThumbnailSize",
|
||||
"PIRInterval", "TimeScan", imei, iccid, "INITKey", "CameraName", "CameraModel", "CameraLanguage",
|
||||
"CameraNetwork", "MobileNetworkCode", "MobileCountryCode", "LocationAreaCode", "CustomerTimezone",
|
||||
"CustomerLanguage", servertime, ocr_ts
|
||||
) VALUES (
|
||||
%s, now(), %s, %s, %s, %s,
|
||||
%s, %s, %s, %s, %s,
|
||||
@@ -333,40 +563,40 @@ def insert_metadata(cur, uuid: str, meta: dict):
|
||||
)
|
||||
ON CONFLICT (metadata_uuid) DO UPDATE SET
|
||||
import_ts = EXCLUDED.import_ts,
|
||||
ImageCreationDate = EXCLUDED.ImageCreationDate,
|
||||
SizeInByte = EXCLUDED.SizeInByte,
|
||||
ImageFormat = EXCLUDED.ImageFormat,
|
||||
ImageNight = EXCLUDED.ImageNight,
|
||||
DateTimeLastSettings = EXCLUDED.DateTimeLastSettings,
|
||||
BatteryStatus = EXCLUDED.BatteryStatus,
|
||||
FirmwareVersion = EXCLUDED.FirmwareVersion,
|
||||
SignalStrength = EXCLUDED.SignalStrength,
|
||||
Temperature = EXCLUDED.Temperature,
|
||||
CoordinateSwitch = EXCLUDED.CoordinateSwitch,
|
||||
Latitude = EXCLUDED.Latitude,
|
||||
Longitude = EXCLUDED.Longitude,
|
||||
WorkPeriod = EXCLUDED.WorkPeriod,
|
||||
WorkStart = EXCLUDED.WorkStart,
|
||||
WorkEnd = EXCLUDED.WorkEnd,
|
||||
ThumbnailSize = EXCLUDED.ThumbnailSize,
|
||||
PIRInterval = EXCLUDED.PIRInterval,
|
||||
TimeScan = EXCLUDED.TimeScan,
|
||||
"ImageCreationDate" = EXCLUDED."ImageCreationDate",
|
||||
"SizeInByte" = EXCLUDED."SizeInByte",
|
||||
"ImageFormat" = EXCLUDED."ImageFormat",
|
||||
"ImageNight" = EXCLUDED."ImageNight",
|
||||
"DateTimeLastSettings" = EXCLUDED."DateTimeLastSettings",
|
||||
"BatteryStatus" = EXCLUDED."BatteryStatus",
|
||||
"FirmwareVersion" = EXCLUDED."FirmwareVersion",
|
||||
"SignalStrength" = EXCLUDED."SignalStrength",
|
||||
"Temperature" = EXCLUDED."Temperature",
|
||||
"CoordinateSwitch" = EXCLUDED."CoordinateSwitch",
|
||||
"Latitude" = EXCLUDED."Latitude",
|
||||
"Longitude" = EXCLUDED."Longitude",
|
||||
"WorkPeriod" = EXCLUDED."WorkPeriod",
|
||||
"WorkStart" = EXCLUDED."WorkStart",
|
||||
"WorkEnd" = EXCLUDED."WorkEnd",
|
||||
"ThumbnailSize" = EXCLUDED."ThumbnailSize",
|
||||
"PIRInterval" = EXCLUDED."PIRInterval",
|
||||
"TimeScan" = EXCLUDED."TimeScan",
|
||||
imei = EXCLUDED.imei,
|
||||
iccid = EXCLUDED.iccid,
|
||||
INITKey = EXCLUDED.INITKey,
|
||||
CameraName = EXCLUDED.CameraName,
|
||||
CameraModel = EXCLUDED.CameraModel,
|
||||
CameraLanguage = EXCLUDED.CameraLanguage,
|
||||
CameraNetwork = EXCLUDED.CameraNetwork,
|
||||
MobileNetworkCode = EXCLUDED.MobileNetworkCode,
|
||||
MobileCountryCode = EXCLUDED.MobileCountryCode,
|
||||
LocationAreaCode = EXCLUDED.LocationAreaCode,
|
||||
CustomerTimezone = EXCLUDED.CustomerTimezone,
|
||||
CustomerLanguage = EXCLUDED.CustomerLanguage,
|
||||
"INITKey" = EXCLUDED."INITKey",
|
||||
"CameraName" = EXCLUDED."CameraName",
|
||||
"CameraModel" = EXCLUDED."CameraModel",
|
||||
"CameraLanguage" = EXCLUDED."CameraLanguage",
|
||||
"CameraNetwork" = EXCLUDED."CameraNetwork",
|
||||
"MobileNetworkCode" = EXCLUDED."MobileNetworkCode",
|
||||
"MobileCountryCode" = EXCLUDED."MobileCountryCode",
|
||||
"LocationAreaCode" = EXCLUDED."LocationAreaCode",
|
||||
"CustomerTimezone" = EXCLUDED."CustomerTimezone",
|
||||
"CustomerLanguage" = EXCLUDED."CustomerLanguage",
|
||||
servertime = EXCLUDED.servertime;
|
||||
""", (
|
||||
uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"),
|
||||
get("DateTimeLastSettings"), get("BatteryStatus"), get("FirmwareVersion"),
|
||||
dls, get("BatteryStatus"), get("FirmwareVersion"),
|
||||
get("SignalStrength"), get("Temperature"),
|
||||
get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"),
|
||||
get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"),
|
||||
@@ -385,6 +615,16 @@ def update_ocr_ts(cur, uuid: str, ocr_ts: datetime):
|
||||
""", (ocr_ts, uuid))
|
||||
|
||||
|
||||
def get_ocr_ts(cur, uuid: str) -> Optional[datetime]:
|
||||
cur.execute("""
|
||||
SELECT ocr_ts
|
||||
FROM remote_cam.metadata
|
||||
WHERE metadata_uuid = %s;
|
||||
""", (uuid,))
|
||||
row = cur.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
def insert_categories(cur, uuid: str, cat_json: dict):
|
||||
version = cat_json.get("version")
|
||||
cats = cat_json.get("categories") or []
|
||||
@@ -396,6 +636,14 @@ def insert_categories(cur, uuid: str, cat_json: dict):
|
||||
""", (uuid, c.get("category"), c.get("score"), version))
|
||||
|
||||
|
||||
def insert_api_response(cur, uuid: str, success: bool, message: str):
|
||||
msg = (message or "")[:512]
|
||||
cur.execute("""
|
||||
INSERT INTO remote_cam.api_response(push_ts, message, image_uuid, success)
|
||||
VALUES (now(), %s, %s::uuid, %s);
|
||||
""", (msg, uuid, success))
|
||||
|
||||
|
||||
# -----------------------
|
||||
# S3 ops wrapper
|
||||
# -----------------------
|
||||
@@ -407,6 +655,7 @@ class S3:
|
||||
endpoint_url=cfg.s3_endpoint,
|
||||
aws_access_key_id=cfg.s3_access_key,
|
||||
aws_secret_access_key=cfg.s3_secret_key,
|
||||
config=BotoClientConfig(max_pool_connections=cfg.s3_max_pool_connections),
|
||||
)
|
||||
|
||||
def list_entrance_objects(self):
|
||||
@@ -425,13 +674,21 @@ class S3:
|
||||
def head(self, key: str):
|
||||
return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key)
|
||||
|
||||
def delete_keys(self, keys):
|
||||
def delete_keys(self, keys) -> int:
|
||||
if not keys:
|
||||
return
|
||||
self.client.delete_objects(
|
||||
return 0
|
||||
resp = self.client.delete_objects(
|
||||
Bucket=self.cfg.s3_bucket,
|
||||
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True},
|
||||
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": False},
|
||||
)
|
||||
errors = resp.get("Errors") or []
|
||||
if errors:
|
||||
details = "; ".join(
|
||||
f"{e.get('Key')}:{e.get('Code')}:{e.get('Message')}" for e in errors[:3]
|
||||
)
|
||||
raise RuntimeError(f"S3 delete failed for {len(errors)} object(s): {details}")
|
||||
deleted = resp.get("Deleted") or []
|
||||
return len(deleted)
|
||||
|
||||
|
||||
# -----------------------
|
||||
@@ -489,54 +746,209 @@ def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]:
|
||||
|
||||
def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
|
||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||
job = job_get(cur, s.uuid) or {}
|
||||
status_now = job.get("status", "NEW")
|
||||
|
||||
# 1) metadata.json -> DB
|
||||
meta = json.loads(s3.get_bytes(s.meta_key))
|
||||
insert_metadata(cur, s.uuid, meta)
|
||||
insert_resource(cur, s.uuid, "metadata")
|
||||
if status_rank("META_OK") >= status_rank(status_now):
|
||||
job_set_status(cur, s.uuid, "META_OK")
|
||||
status_now = "META_OK"
|
||||
|
||||
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
|
||||
jpg_bytes = s3.get_bytes(s.jpg_key)
|
||||
ocr_ts = get_ocr_ts(cur, s.uuid)
|
||||
ocr_needs_run = cfg.enable_ocr and (
|
||||
status_rank(status_now) < status_rank("OCR_OK") or status_now == "OCR_SKIPPED"
|
||||
)
|
||||
|
||||
if ocr_needs_run:
|
||||
try:
|
||||
customer_tzinfo, _ = parse_customer_tz(meta)
|
||||
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
|
||||
update_ocr_ts(cur, s.uuid, ocr_ts)
|
||||
insert_resource(cur, s.uuid, "image")
|
||||
job_set_backfill(cur, s.uuid, needs_ocr_backfill=False)
|
||||
job_set_status(cur, s.uuid, "OCR_OK")
|
||||
status_now = "OCR_OK"
|
||||
except Exception as e:
|
||||
job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
|
||||
job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e))
|
||||
status_now = "OCR_FAIL"
|
||||
elif not cfg.enable_ocr:
|
||||
job_set_backfill(cur, s.uuid, needs_ocr_backfill=True)
|
||||
if status_rank(status_now) < status_rank("OCR_SKIPPED"):
|
||||
job_set_status(cur, s.uuid, "OCR_SKIPPED")
|
||||
status_now = "OCR_SKIPPED"
|
||||
|
||||
# 3) EXIF write:
|
||||
# DateTimeOriginal = ocr_ts (local, keep offset tags)
|
||||
# DateTimeDigitized = servertime (UTC)
|
||||
jpg_exif = jpg_bytes
|
||||
if not cfg.enable_exif:
|
||||
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
|
||||
if status_rank(status_now) < status_rank("EXIF_SKIPPED"):
|
||||
job_set_status(cur, s.uuid, "EXIF_SKIPPED")
|
||||
status_now = "EXIF_SKIPPED"
|
||||
elif status_rank(status_now) < status_rank("MOVED"):
|
||||
if ocr_ts is None:
|
||||
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
|
||||
job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing")
|
||||
status_now = "EXIF_SKIPPED"
|
||||
else:
|
||||
try:
|
||||
servertime = parse_servertime(meta)
|
||||
jpg_exif = exif_write_with_exiftool(jpg_bytes, ocr_ts, servertime)
|
||||
jpg_exif = exif_write_with_exiftool(
|
||||
jpg_bytes,
|
||||
ocr_ts,
|
||||
servertime,
|
||||
timeout_seconds=cfg.exiftool_timeout_seconds,
|
||||
)
|
||||
job_set_backfill(cur, s.uuid, needs_exif_backfill=False)
|
||||
job_set_status(cur, s.uuid, "EXIF_OK")
|
||||
status_now = "EXIF_OK"
|
||||
except Exception as e:
|
||||
job_set_backfill(cur, s.uuid, needs_exif_backfill=True)
|
||||
job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e))
|
||||
status_now = "EXIF_FAIL"
|
||||
|
||||
# 4) Upload processed JPG (EXIF patched)
|
||||
processed_key = f"{cfg.processed_prefix}{s.uuid}.jpg"
|
||||
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{s.uuid}.jpg"
|
||||
if status_rank(status_now) < status_rank("MOVED"):
|
||||
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
|
||||
s3.head(processed_key)
|
||||
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
|
||||
status_now = "MOVED"
|
||||
|
||||
# 5) Thumbnail
|
||||
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
||||
if status_rank(status_now) < status_rank("THUMB_OK"):
|
||||
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
|
||||
jpg_exif = s3.get_bytes(processed_key)
|
||||
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
|
||||
thumb_key = f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
||||
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
|
||||
s3.head(thumb_key)
|
||||
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
|
||||
status_now = "THUMB_OK"
|
||||
|
||||
# 6) Optional categories
|
||||
if s.cat_key:
|
||||
if s.cat_key and status_rank(status_now) < status_rank("CATEGORIES_OK"):
|
||||
cat_json = json.loads(s3.get_bytes(s.cat_key))
|
||||
insert_categories(cur, s.uuid, cat_json)
|
||||
insert_resource(cur, s.uuid, "classification")
|
||||
job_set_status(cur, s.uuid, "CATEGORIES_OK")
|
||||
status_now = "CATEGORIES_OK"
|
||||
|
||||
# 7) Cleanup entrance only after verify
|
||||
# 7) Optional DeerMapper API push (image + metadata)
|
||||
if cfg.enable_deermapper_api and status_rank(status_now) < status_rank("API_OK"):
|
||||
api_key = (cfg.deermapper_api_key or "").strip()
|
||||
api_url = (cfg.deermapper_api_base_url or "").strip()
|
||||
if not api_key or not api_url:
|
||||
err_msg = "DeerMapper API config missing"
|
||||
job_set_status(cur, s.uuid, "API_FAIL", err=err_msg)
|
||||
insert_api_response(cur, s.uuid, False, err_msg)
|
||||
status_now = "API_FAIL"
|
||||
else:
|
||||
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
|
||||
jpg_exif = s3.get_bytes(processed_key)
|
||||
imei = str(meta.get("imei") or "").strip()
|
||||
ok, msg = push_to_deermapper_api(cfg, s.uuid, imei, meta, jpg_exif)
|
||||
if ok:
|
||||
insert_api_response(cur, s.uuid, True, msg)
|
||||
job_set_status(cur, s.uuid, "API_OK")
|
||||
status_now = "API_OK"
|
||||
else:
|
||||
insert_api_response(cur, s.uuid, False, msg)
|
||||
job_set_status(cur, s.uuid, "API_FAIL", err=msg)
|
||||
status_now = "API_FAIL"
|
||||
|
||||
# 8) Cleanup entrance only after verify (+ API when enabled)
|
||||
api_ready_for_cleanup = (not cfg.enable_deermapper_api) or status_rank(status_now) >= status_rank("API_OK")
|
||||
if status_rank(status_now) >= status_rank("THUMB_OK") and api_ready_for_cleanup:
|
||||
try:
|
||||
s3.head(processed_key)
|
||||
s3.head(thumb_key)
|
||||
to_delete = [s.jpg_key, s.meta_key]
|
||||
if s.cat_key:
|
||||
to_delete.append(s.cat_key)
|
||||
s3.delete_keys(to_delete)
|
||||
job_set_status(cur, s.uuid, "CLEANED")
|
||||
status_now = "CLEANED"
|
||||
except Exception as e:
|
||||
job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e))
|
||||
|
||||
|
||||
def is_completed_job_materialized(cur, s3: S3, cfg: AppConfig, uuid: str) -> bool:
|
||||
job = job_get(cur, uuid)
|
||||
if not job:
|
||||
return False
|
||||
if status_rank(job.get("status")) < status_rank("THUMB_OK"):
|
||||
return False
|
||||
|
||||
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{uuid}.jpg"
|
||||
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{uuid}.jpg"
|
||||
try:
|
||||
s3.head(processed_key)
|
||||
s3.head(thumb_key)
|
||||
except Exception:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def cleanup_entrance_set(s3: S3, s: EntranceSet) -> int:
|
||||
to_delete = [s.jpg_key, s.meta_key]
|
||||
if s.cat_key:
|
||||
to_delete.append(s.cat_key)
|
||||
return s3.delete_keys(to_delete)
|
||||
|
||||
|
||||
def _process_ready_batch(cfg: AppConfig, batch: list[EntranceSet]):
|
||||
s3c = S3(cfg)
|
||||
with psycopg.connect(cfg.pg_dsn) as db:
|
||||
for s in batch:
|
||||
start = time.perf_counter()
|
||||
result = "failure"
|
||||
final_status = "UNKNOWN"
|
||||
err_text = None
|
||||
cleanup_deleted = None
|
||||
try:
|
||||
skip_duplicate = False
|
||||
with db.transaction():
|
||||
with db.cursor() as cur:
|
||||
if is_completed_job_materialized(cur, s3c, cfg, s.uuid):
|
||||
skip_duplicate = True
|
||||
else:
|
||||
process_one(cur, s3c, cfg, s)
|
||||
final_status = (job_get(cur, s.uuid) or {}).get("status", "UNKNOWN")
|
||||
result = "success" if is_terminal_success(final_status, cfg) else "failure"
|
||||
if skip_duplicate:
|
||||
cleanup_deleted = cleanup_entrance_set(s3c, s)
|
||||
final_status = "DUPLICATE_CLEANED"
|
||||
result = "success"
|
||||
except Exception as e:
|
||||
with db.transaction():
|
||||
with db.cursor() as cur:
|
||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||
job_set_status(cur, s.uuid, "ERROR", err=str(e))
|
||||
final_status = "ERROR"
|
||||
err_text = str(e)
|
||||
elapsed_ms = int((time.perf_counter() - start) * 1000)
|
||||
if err_text:
|
||||
print(
|
||||
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
|
||||
f"process_ms={elapsed_ms} error={err_text}"
|
||||
)
|
||||
elif cleanup_deleted is not None:
|
||||
print(
|
||||
f"[RESULT] uuid={s.uuid} result={result} status={final_status} "
|
||||
f"process_ms={elapsed_ms} cleanup_deleted={cleanup_deleted}"
|
||||
)
|
||||
else:
|
||||
print(f"[RESULT] uuid={s.uuid} result={result} status={final_status} process_ms={elapsed_ms}")
|
||||
if cfg.job_sleep_seconds > 0:
|
||||
time.sleep(cfg.job_sleep_seconds)
|
||||
|
||||
|
||||
def run_once(cfg: AppConfig) -> int:
|
||||
@@ -545,22 +957,20 @@ def run_once(cfg: AppConfig) -> int:
|
||||
if not ready:
|
||||
return 0
|
||||
|
||||
with psycopg.connect(cfg.pg_dsn) as db:
|
||||
with db.cursor() as cur:
|
||||
db_init(cur)
|
||||
db.commit()
|
||||
workers = max(1, min(cfg.parallel_workers, len(ready)))
|
||||
if workers == 1:
|
||||
_process_ready_batch(cfg, ready)
|
||||
return len(ready)
|
||||
|
||||
batches: list[list[EntranceSet]] = [[] for _ in range(workers)]
|
||||
for i, s in enumerate(ready):
|
||||
batches[i % workers].append(s)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||
futures = [pool.submit(_process_ready_batch, cfg, batch) for batch in batches if batch]
|
||||
for future in as_completed(futures):
|
||||
future.result()
|
||||
|
||||
for s in ready:
|
||||
try:
|
||||
with db.transaction():
|
||||
with db.cursor() as cur:
|
||||
process_one(cur, s3c, cfg, s)
|
||||
except Exception as e:
|
||||
with db.transaction():
|
||||
with db.cursor() as cur:
|
||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||
job_set_status(cur, s.uuid, "ERROR", err=str(e))
|
||||
print(f"[ERROR] {s.uuid}: {e}")
|
||||
return len(ready)
|
||||
|
||||
|
||||
@@ -568,6 +978,11 @@ def main():
|
||||
cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml")
|
||||
cfg = load_config(cfg_path)
|
||||
|
||||
with psycopg.connect(cfg.pg_dsn) as db:
|
||||
with db.cursor() as cur:
|
||||
db_init(cur)
|
||||
db.commit()
|
||||
|
||||
while True:
|
||||
n = run_once(cfg)
|
||||
if n == 0:
|
||||
|
||||
Reference in New Issue
Block a user