starting out
This commit is contained in:
commit
c8f1ca4f4e
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
*.pyc
|
||||
*.pyo
|
||||
__pycache__/
|
||||
20
config.yaml
Normal file
20
config.yaml
Normal file
@ -0,0 +1,20 @@
|
||||
# config.yaml (Beispiel)
|
||||
s3:
|
||||
endpoint: "https://<dein-endpoint>"
|
||||
access_key: "<key>"
|
||||
secret_key: "<secret>"
|
||||
bucket: "<bucket>"
|
||||
|
||||
postgres:
|
||||
dsn: "postgresql://user:pass@host:5432/dbname"
|
||||
|
||||
app:
|
||||
entrance_prefix: "icu/entrance/"
|
||||
processed_prefix: "icu/processed/"
|
||||
thumb_prefix: "icu/thumbnails/"
|
||||
min_age_seconds: 90
|
||||
poll_seconds: 30
|
||||
# OCR crop tunables (defaults are usually fine)
|
||||
ocr_crop_w_frac: 0.42
|
||||
ocr_crop_h_frac: 0.22
|
||||
thumb_max: 512
|
||||
568
main.py
Normal file
568
main.py
Normal file
@ -0,0 +1,568 @@
|
||||
# ingest.py
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import time
|
||||
import tempfile
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Dict, Tuple
|
||||
|
||||
import boto3
|
||||
import psycopg
|
||||
from dateutil import tz
|
||||
from PIL import Image, ImageOps, ImageEnhance
|
||||
import pytesseract
|
||||
import yaml
|
||||
|
||||
|
||||
# -----------------------
|
||||
# Config
|
||||
# -----------------------
|
||||
UUID_RE = re.compile(r"^([0-9a-fA-F-]{36})\.(jpg|json)$")
|
||||
CAT_RE = re.compile(r"^([0-9a-fA-F-]{36})_categories\.json$")
|
||||
|
||||
# Overlay fixed: "HH:MM:SS DD-MM-YYYY"
|
||||
OCR_DT_RE = re.compile(
|
||||
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s+(?P<d>\d{2})-(?P<mo>\d{2})-(?P<y>\d{4})"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppConfig:
|
||||
s3_endpoint: str
|
||||
s3_access_key: str
|
||||
s3_secret_key: str
|
||||
s3_bucket: str
|
||||
|
||||
pg_dsn: str
|
||||
|
||||
entrance_prefix: str = "icu/entrance/"
|
||||
processed_prefix: str = "icu/processed/"
|
||||
thumb_prefix: str = "icu/thumbnails/"
|
||||
|
||||
min_age_seconds: int = 90
|
||||
poll_seconds: int = 30
|
||||
|
||||
# OCR crop settings (tune if needed)
|
||||
ocr_crop_w_frac: float = 0.42 # right ~42% of width
|
||||
ocr_crop_h_frac: float = 0.22 # bottom ~22% of height
|
||||
|
||||
thumb_max: int = 512
|
||||
|
||||
|
||||
def load_config(path: str) -> AppConfig:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
raw = yaml.safe_load(f)
|
||||
|
||||
# allow env override for secret injection if desired
|
||||
def env_or(key: str, default=None):
|
||||
return os.environ.get(key, default)
|
||||
|
||||
s3 = raw["s3"]
|
||||
pg = raw["postgres"]
|
||||
app = raw.get("app", {})
|
||||
|
||||
return AppConfig(
|
||||
s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]),
|
||||
s3_access_key=env_or("S3_ACCESS_KEY", s3["access_key"]),
|
||||
s3_secret_key=env_or("S3_SECRET_KEY", s3["secret_key"]),
|
||||
s3_bucket=env_or("S3_BUCKET", s3["bucket"]),
|
||||
pg_dsn=env_or("PG_DSN", pg["dsn"]),
|
||||
entrance_prefix=app.get("entrance_prefix", "icu/entrance/"),
|
||||
processed_prefix=app.get("processed_prefix", "icu/processed/"),
|
||||
thumb_prefix=app.get("thumb_prefix", "icu/thumbnails/"),
|
||||
min_age_seconds=int(app.get("min_age_seconds", 90)),
|
||||
poll_seconds=int(app.get("poll_seconds", 30)),
|
||||
ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)),
|
||||
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
|
||||
thumb_max=int(app.get("thumb_max", 512)),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------
|
||||
# Helpers
|
||||
# -----------------------
|
||||
def is_old_enough(last_modified, min_age_seconds: int) -> bool:
|
||||
now = datetime.now(timezone.utc)
|
||||
return (now - last_modified).total_seconds() >= min_age_seconds
|
||||
|
||||
|
||||
def to_exif_datetime(dt: datetime) -> str:
|
||||
return dt.strftime("%Y:%m:%d %H:%M:%S")
|
||||
|
||||
|
||||
def tz_offset_str(dt: datetime) -> str:
|
||||
off = dt.utcoffset()
|
||||
if off is None:
|
||||
off = timezone.utc.utcoffset(dt)
|
||||
total = int(off.total_seconds())
|
||||
sign = "+" if total >= 0 else "-"
|
||||
total = abs(total)
|
||||
hh = total // 3600
|
||||
mm = (total % 3600) // 60
|
||||
return f"{sign}{hh:02d}:{mm:02d}"
|
||||
|
||||
|
||||
def parse_servertime(meta: dict) -> datetime:
|
||||
s = meta.get("servertime")
|
||||
if not s:
|
||||
raise ValueError("metadata.servertime missing")
|
||||
|
||||
# Example: "2025-06-30T04:03:04+0000"
|
||||
if re.match(r".*[+-]\d{4}$", s):
|
||||
return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z")
|
||||
# Fallback: ISO-ish
|
||||
return datetime.fromisoformat(s)
|
||||
|
||||
|
||||
def parse_customer_tz(meta: dict):
|
||||
# CustomerTimezone applies to OCR (camera overlay)
|
||||
name = meta.get("CustomerTimezone") or "UTC"
|
||||
z = tz.gettz(name)
|
||||
if z is None:
|
||||
raise ValueError(f"Unknown CustomerTimezone: {name}")
|
||||
return z, name
|
||||
|
||||
|
||||
def ocr_extract_timestamp(jpg_bytes: bytes, crop_w_frac: float, crop_h_frac: float, customer_tzinfo) -> datetime:
|
||||
from io import BytesIO
|
||||
im = Image.open(BytesIO(jpg_bytes)).convert("RGB")
|
||||
|
||||
w, h = im.size
|
||||
x0 = int(w * (1.0 - crop_w_frac))
|
||||
y0 = int(h * (1.0 - crop_h_frac))
|
||||
crop = im.crop((x0, y0, w, h))
|
||||
|
||||
gray = ImageOps.grayscale(crop)
|
||||
gray = ImageOps.autocontrast(gray)
|
||||
gray = ImageEnhance.Contrast(gray).enhance(2.5)
|
||||
gray = ImageEnhance.Sharpness(gray).enhance(2.0)
|
||||
|
||||
bw = gray.point(lambda p: 255 if p > 180 else 0)
|
||||
|
||||
cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- '
|
||||
text = pytesseract.image_to_string(bw, config=cfg).strip()
|
||||
|
||||
m = OCR_DT_RE.search(text)
|
||||
if not m:
|
||||
# retry without thresholding
|
||||
text2 = pytesseract.image_to_string(gray, config=cfg).strip()
|
||||
m = OCR_DT_RE.search(text2)
|
||||
if not m:
|
||||
raise ValueError(f"OCR timestamp not found. OCR1='{text}' OCR2='{text2}'")
|
||||
|
||||
dt_local_naive = datetime(
|
||||
int(m.group("y")),
|
||||
int(m.group("mo")),
|
||||
int(m.group("d")),
|
||||
int(m.group("h")),
|
||||
int(m.group("m")),
|
||||
int(m.group("s")),
|
||||
)
|
||||
|
||||
# Keep camera-local timezone (no UTC conversion requested)
|
||||
return dt_local_naive.replace(tzinfo=customer_tzinfo)
|
||||
|
||||
|
||||
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime) -> bytes:
|
||||
"""
|
||||
DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal
|
||||
DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00
|
||||
"""
|
||||
dto = to_exif_datetime(dt_original_local)
|
||||
oto = tz_offset_str(dt_original_local)
|
||||
|
||||
dtd = to_exif_datetime(dt_digitized_utc.astimezone(timezone.utc))
|
||||
otd = "+00:00"
|
||||
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
in_path = os.path.join(td, "in.jpg")
|
||||
with open(in_path, "wb") as f:
|
||||
f.write(jpg_in)
|
||||
|
||||
cmd = [
|
||||
"exiftool",
|
||||
"-overwrite_original",
|
||||
f"-DateTimeOriginal={dto}",
|
||||
f"-OffsetTimeOriginal={oto}",
|
||||
f"-DateTimeDigitized={dtd}",
|
||||
f"-OffsetTimeDigitized={otd}",
|
||||
f"-ModifyDate={dtd}",
|
||||
f"-OffsetTime={otd}",
|
||||
in_path,
|
||||
]
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
with open(in_path, "rb") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes:
|
||||
from io import BytesIO
|
||||
im = Image.open(BytesIO(jpg_bytes)).convert("RGB")
|
||||
im.thumbnail((thumb_max, thumb_max))
|
||||
out = BytesIO()
|
||||
im.save(out, format="JPEG", quality=85, optimize=True)
|
||||
return out.getvalue()
|
||||
|
||||
|
||||
# -----------------------
|
||||
# DB ops
|
||||
# -----------------------
|
||||
def db_init(cur):
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS remote_cam.import_job (
|
||||
image_uuid text PRIMARY KEY,
|
||||
status text NOT NULL DEFAULT 'NEW',
|
||||
attempts integer NOT NULL DEFAULT 0,
|
||||
has_categories boolean,
|
||||
last_error text,
|
||||
entrance_jpg_key text,
|
||||
entrance_meta_key text,
|
||||
entrance_cat_key text,
|
||||
processed_jpg_key text,
|
||||
thumbnail_key text,
|
||||
created_ts timestamptz NOT NULL DEFAULT now(),
|
||||
updated_ts timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
""")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);")
|
||||
cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);")
|
||||
|
||||
cur.execute("""
|
||||
CREATE OR REPLACE FUNCTION remote_cam.set_updated_ts()
|
||||
RETURNS trigger AS $$
|
||||
BEGIN
|
||||
NEW.updated_ts = now();
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
""")
|
||||
cur.execute("""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'import_job_set_updated_ts') THEN
|
||||
CREATE TRIGGER import_job_set_updated_ts
|
||||
BEFORE UPDATE ON remote_cam.import_job
|
||||
FOR EACH ROW EXECUTE FUNCTION remote_cam.set_updated_ts();
|
||||
END IF;
|
||||
END$$;
|
||||
""")
|
||||
|
||||
|
||||
def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]):
|
||||
cur.execute("""
|
||||
INSERT INTO remote_cam.import_job (image_uuid, status, entrance_jpg_key, entrance_meta_key, entrance_cat_key, has_categories)
|
||||
VALUES (%s, 'NEW', %s, %s, %s, %s)
|
||||
ON CONFLICT (image_uuid) DO UPDATE
|
||||
SET entrance_jpg_key = EXCLUDED.entrance_jpg_key,
|
||||
entrance_meta_key = EXCLUDED.entrance_meta_key,
|
||||
entrance_cat_key = EXCLUDED.entrance_cat_key,
|
||||
has_categories = EXCLUDED.has_categories;
|
||||
""", (uuid, jpg_key, meta_key, cat_key, cat_key is not None))
|
||||
|
||||
|
||||
def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, processed_key: Optional[str] = None, thumb_key: Optional[str] = None):
|
||||
cur.execute("""
|
||||
UPDATE remote_cam.import_job
|
||||
SET status = %s,
|
||||
attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END,
|
||||
last_error = CASE WHEN %s IS NULL THEN last_error ELSE %s END,
|
||||
processed_jpg_key = COALESCE(%s, processed_jpg_key),
|
||||
thumbnail_key = COALESCE(%s, thumbnail_key)
|
||||
WHERE image_uuid = %s;
|
||||
""", (status, status, err, err, err, processed_key, thumb_key, uuid))
|
||||
|
||||
|
||||
def insert_resource(cur, uuid: str, typ: str):
|
||||
cur.execute("""
|
||||
INSERT INTO remote_cam.resource(resource_uuid, type, import_ts)
|
||||
VALUES (%s, %s, now())
|
||||
ON CONFLICT DO NOTHING;
|
||||
""", (uuid, typ))
|
||||
|
||||
|
||||
def insert_metadata(cur, uuid: str, meta: dict):
|
||||
def get(k): return meta.get(k)
|
||||
|
||||
servertime = parse_servertime(meta)
|
||||
|
||||
icd_raw = get("ImageCreationDate")
|
||||
icd = None
|
||||
if icd_raw:
|
||||
try:
|
||||
# "06:03:01 30.06.2025" -> assume UTC if no tz info
|
||||
icd = datetime.strptime(icd_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc)
|
||||
except Exception:
|
||||
icd = None
|
||||
|
||||
lat = get("Latitude")
|
||||
lon = get("Longitude")
|
||||
lat_f = float(lat) if lat not in (None, "") else None
|
||||
lon_f = float(lon) if lon not in (None, "") else None
|
||||
|
||||
# NOTE: DateTimeLastSettings looks like "05:45:35 30.06.2025" in sample; parse if needed later.
|
||||
cur.execute("""
|
||||
INSERT INTO remote_cam.metadata(
|
||||
metadata_uuid, import_ts, ImageCreationDate, SizeInByte, ImageFormat, ImageNight,
|
||||
DateTimeLastSettings, BatteryStatus, FirmwareVersion, SignalStrength, Temperature,
|
||||
CoordinateSwitch, Latitude, Longitude, WorkPeriod, WorkStart, WorkEnd, ThumbnailSize,
|
||||
PIRInterval, TimeScan, imei, iccid, INITKey, CameraName, CameraModel, CameraLanguage,
|
||||
CameraNetwork, MobileNetworkCode, MobileCountryCode, LocationAreaCode, CustomerTimezone,
|
||||
CustomerLanguage, servertime, ocr_ts
|
||||
) VALUES (
|
||||
%s, now(), %s, %s, %s, %s,
|
||||
%s, %s, %s, %s, %s,
|
||||
%s, %s, %s, %s, %s, %s, %s,
|
||||
%s, %s, %s, %s, %s, %s, %s, %s,
|
||||
%s, %s, %s, %s, %s,
|
||||
%s, %s, NULL
|
||||
)
|
||||
ON CONFLICT (metadata_uuid) DO UPDATE SET
|
||||
import_ts = EXCLUDED.import_ts,
|
||||
ImageCreationDate = EXCLUDED.ImageCreationDate,
|
||||
SizeInByte = EXCLUDED.SizeInByte,
|
||||
ImageFormat = EXCLUDED.ImageFormat,
|
||||
ImageNight = EXCLUDED.ImageNight,
|
||||
DateTimeLastSettings = EXCLUDED.DateTimeLastSettings,
|
||||
BatteryStatus = EXCLUDED.BatteryStatus,
|
||||
FirmwareVersion = EXCLUDED.FirmwareVersion,
|
||||
SignalStrength = EXCLUDED.SignalStrength,
|
||||
Temperature = EXCLUDED.Temperature,
|
||||
CoordinateSwitch = EXCLUDED.CoordinateSwitch,
|
||||
Latitude = EXCLUDED.Latitude,
|
||||
Longitude = EXCLUDED.Longitude,
|
||||
WorkPeriod = EXCLUDED.WorkPeriod,
|
||||
WorkStart = EXCLUDED.WorkStart,
|
||||
WorkEnd = EXCLUDED.WorkEnd,
|
||||
ThumbnailSize = EXCLUDED.ThumbnailSize,
|
||||
PIRInterval = EXCLUDED.PIRInterval,
|
||||
TimeScan = EXCLUDED.TimeScan,
|
||||
imei = EXCLUDED.imei,
|
||||
iccid = EXCLUDED.iccid,
|
||||
INITKey = EXCLUDED.INITKey,
|
||||
CameraName = EXCLUDED.CameraName,
|
||||
CameraModel = EXCLUDED.CameraModel,
|
||||
CameraLanguage = EXCLUDED.CameraLanguage,
|
||||
CameraNetwork = EXCLUDED.CameraNetwork,
|
||||
MobileNetworkCode = EXCLUDED.MobileNetworkCode,
|
||||
MobileCountryCode = EXCLUDED.MobileCountryCode,
|
||||
LocationAreaCode = EXCLUDED.LocationAreaCode,
|
||||
CustomerTimezone = EXCLUDED.CustomerTimezone,
|
||||
CustomerLanguage = EXCLUDED.CustomerLanguage,
|
||||
servertime = EXCLUDED.servertime;
|
||||
""", (
|
||||
uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"),
|
||||
get("DateTimeLastSettings"), get("BatteryStatus"), get("FirmwareVersion"),
|
||||
get("SignalStrength"), get("Temperature"),
|
||||
get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"),
|
||||
get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"),
|
||||
get("PIRInterval"), get("TimeScan"), get("imei"), get("iccid"), get("INITKey"),
|
||||
get("CameraName"), get("CameraModel"), get("CameraLanguage"), get("CameraNetwork"),
|
||||
get("MobileNetworkCode"), get("MobileCountryCode"), get("LocationAreaCode"),
|
||||
get("CustomerTimezone"), get("CustomerLanguage"), servertime
|
||||
))
|
||||
|
||||
|
||||
def update_ocr_ts(cur, uuid: str, ocr_ts: datetime):
|
||||
cur.execute("""
|
||||
UPDATE remote_cam.metadata
|
||||
SET ocr_ts = %s
|
||||
WHERE metadata_uuid = %s;
|
||||
""", (ocr_ts, uuid))
|
||||
|
||||
|
||||
def insert_categories(cur, uuid: str, cat_json: dict):
|
||||
version = cat_json.get("version")
|
||||
cats = cat_json.get("categories") or []
|
||||
cur.execute("DELETE FROM remote_cam.category WHERE image_uuid = %s;", (uuid,))
|
||||
for c in cats:
|
||||
cur.execute("""
|
||||
INSERT INTO remote_cam.category(image_uuid, category, score, version)
|
||||
VALUES (%s, %s, %s, %s);
|
||||
""", (uuid, c.get("category"), c.get("score"), version))
|
||||
|
||||
|
||||
# -----------------------
|
||||
# S3 ops wrapper
|
||||
# -----------------------
|
||||
class S3:
|
||||
def __init__(self, cfg: AppConfig):
|
||||
self.cfg = cfg
|
||||
self.client = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=cfg.s3_endpoint,
|
||||
aws_access_key_id=cfg.s3_access_key,
|
||||
aws_secret_access_key=cfg.s3_secret_key,
|
||||
)
|
||||
|
||||
def list_entrance_objects(self):
|
||||
paginator = self.client.get_paginator("list_objects_v2")
|
||||
for page in paginator.paginate(Bucket=self.cfg.s3_bucket, Prefix=self.cfg.entrance_prefix):
|
||||
for obj in page.get("Contents", []):
|
||||
yield obj
|
||||
|
||||
def get_bytes(self, key: str) -> bytes:
|
||||
resp = self.client.get_object(Bucket=self.cfg.s3_bucket, Key=key)
|
||||
return resp["Body"].read()
|
||||
|
||||
def put_bytes(self, key: str, data: bytes, content_type: str):
|
||||
self.client.put_object(Bucket=self.cfg.s3_bucket, Key=key, Body=data, ContentType=content_type)
|
||||
|
||||
def head(self, key: str):
|
||||
return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key)
|
||||
|
||||
def delete_keys(self, keys):
|
||||
if not keys:
|
||||
return
|
||||
self.client.delete_objects(
|
||||
Bucket=self.cfg.s3_bucket,
|
||||
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True},
|
||||
)
|
||||
|
||||
|
||||
# -----------------------
|
||||
# Main processing
|
||||
# -----------------------
|
||||
@dataclass
|
||||
class EntranceSet:
|
||||
uuid: str
|
||||
jpg_key: str
|
||||
meta_key: str
|
||||
cat_key: Optional[str]
|
||||
lm_jpg: datetime
|
||||
lm_meta: datetime
|
||||
lm_cat: Optional[datetime]
|
||||
|
||||
|
||||
def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]:
|
||||
groups: Dict[str, Dict[str, Tuple[str, datetime]]] = {}
|
||||
for obj in s3.list_entrance_objects():
|
||||
key = obj["Key"]
|
||||
name = key.split("/")[-1]
|
||||
lm = obj["LastModified"]
|
||||
|
||||
m1 = UUID_RE.match(name)
|
||||
if m1:
|
||||
uuid = m1.group(1)
|
||||
ext = m1.group(2)
|
||||
typ = "jpg" if ext == "jpg" else "meta"
|
||||
groups.setdefault(uuid, {})[typ] = (key, lm)
|
||||
continue
|
||||
|
||||
m2 = CAT_RE.match(name)
|
||||
if m2:
|
||||
uuid = m2.group(1)
|
||||
groups.setdefault(uuid, {})["cat"] = (key, lm)
|
||||
continue
|
||||
|
||||
ready: list[EntranceSet] = []
|
||||
for uuid, parts in groups.items():
|
||||
if "jpg" in parts and "meta" in parts:
|
||||
jpg_key, lm_j = parts["jpg"]
|
||||
meta_key, lm_m = parts["meta"]
|
||||
cat = parts.get("cat")
|
||||
cat_key = cat[0] if cat else None
|
||||
lm_c = cat[1] if cat else None
|
||||
|
||||
if not (is_old_enough(lm_j, cfg.min_age_seconds) and is_old_enough(lm_m, cfg.min_age_seconds)):
|
||||
continue
|
||||
if cat and not is_old_enough(lm_c, cfg.min_age_seconds):
|
||||
continue
|
||||
|
||||
ready.append(EntranceSet(uuid, jpg_key, meta_key, cat_key, lm_j, lm_m, lm_c))
|
||||
return ready
|
||||
|
||||
|
||||
def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
|
||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||
|
||||
# 1) metadata.json -> DB
|
||||
meta = json.loads(s3.get_bytes(s.meta_key))
|
||||
insert_metadata(cur, s.uuid, meta)
|
||||
insert_resource(cur, s.uuid, "metadata")
|
||||
job_set_status(cur, s.uuid, "META_OK")
|
||||
|
||||
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
|
||||
jpg_bytes = s3.get_bytes(s.jpg_key)
|
||||
customer_tzinfo, _ = parse_customer_tz(meta)
|
||||
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
|
||||
update_ocr_ts(cur, s.uuid, ocr_ts)
|
||||
insert_resource(cur, s.uuid, "image")
|
||||
job_set_status(cur, s.uuid, "OCR_OK")
|
||||
|
||||
# 3) EXIF write:
|
||||
# DateTimeOriginal = ocr_ts (local, keep offset tags)
|
||||
# DateTimeDigitized = servertime (UTC)
|
||||
servertime = parse_servertime(meta)
|
||||
jpg_exif = exif_write_with_exiftool(jpg_bytes, ocr_ts, servertime)
|
||||
job_set_status(cur, s.uuid, "EXIF_OK")
|
||||
|
||||
# 4) Upload processed JPG (EXIF patched)
|
||||
processed_key = f"{cfg.processed_prefix}{s.uuid}.jpg"
|
||||
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
|
||||
s3.head(processed_key)
|
||||
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
|
||||
|
||||
# 5) Thumbnail
|
||||
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
|
||||
thumb_key = f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
||||
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
|
||||
s3.head(thumb_key)
|
||||
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
|
||||
|
||||
# 6) Optional categories
|
||||
if s.cat_key:
|
||||
cat_json = json.loads(s3.get_bytes(s.cat_key))
|
||||
insert_categories(cur, s.uuid, cat_json)
|
||||
insert_resource(cur, s.uuid, "classification")
|
||||
job_set_status(cur, s.uuid, "CATEGORIES_OK")
|
||||
|
||||
# 7) Cleanup entrance only after verify
|
||||
to_delete = [s.jpg_key, s.meta_key]
|
||||
if s.cat_key:
|
||||
to_delete.append(s.cat_key)
|
||||
s3.delete_keys(to_delete)
|
||||
job_set_status(cur, s.uuid, "CLEANED")
|
||||
|
||||
|
||||
def run_once(cfg: AppConfig) -> int:
|
||||
s3c = S3(cfg)
|
||||
ready = collect_ready_sets(s3c, cfg)
|
||||
if not ready:
|
||||
return 0
|
||||
|
||||
with psycopg.connect(cfg.pg_dsn) as db:
|
||||
with db.cursor() as cur:
|
||||
db_init(cur)
|
||||
db.commit()
|
||||
|
||||
for s in ready:
|
||||
try:
|
||||
with db.transaction():
|
||||
with db.cursor() as cur:
|
||||
process_one(cur, s3c, cfg, s)
|
||||
except Exception as e:
|
||||
with db.transaction():
|
||||
with db.cursor() as cur:
|
||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||
job_set_status(cur, s.uuid, "ERROR", err=str(e))
|
||||
print(f"[ERROR] {s.uuid}: {e}")
|
||||
return len(ready)
|
||||
|
||||
|
||||
def main():
|
||||
cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml")
|
||||
cfg = load_config(cfg_path)
|
||||
|
||||
while True:
|
||||
n = run_once(cfg)
|
||||
if n == 0:
|
||||
time.sleep(cfg.poll_seconds)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
5
pyproject.toml
Normal file
5
pyproject.toml
Normal file
@ -0,0 +1,5 @@
|
||||
[project]
|
||||
name = "melesICUmover"
|
||||
version = "0.1.0"
|
||||
requires-python = ">= 3.11.9"
|
||||
dependencies = []
|
||||
Loading…
x
Reference in New Issue
Block a user