569 lines
19 KiB
Python
569 lines
19 KiB
Python
# main.py
|
|
#!/usr/bin/env python3
|
|
import os
|
|
import re
|
|
import json
|
|
import time
|
|
import tempfile
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, Dict, Tuple
|
|
|
|
import boto3
|
|
import psycopg
|
|
from dateutil import tz
|
|
from PIL import Image, ImageOps, ImageEnhance
|
|
import pytesseract
|
|
import yaml
|
|
|
|
|
|
# -----------------------
|
|
# Config
|
|
# -----------------------
|
|
UUID_RE = re.compile(r"^([0-9a-fA-F-]{36})\.(jpg|json)$")
|
|
CAT_RE = re.compile(r"^([0-9a-fA-F-]{36})_categories\.json$")
|
|
|
|
# Overlay fixed: "HH:MM:SS DD-MM-YYYY"
|
|
OCR_DT_RE = re.compile(
|
|
r"(?P<h>\d{2}):(?P<m>\d{2}):(?P<s>\d{2})\s+(?P<d>\d{2})-(?P<mo>\d{2})-(?P<y>\d{4})"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class AppConfig:
|
|
s3_endpoint: str
|
|
s3_access_key: str
|
|
s3_secret_key: str
|
|
s3_bucket: str
|
|
|
|
pg_dsn: str
|
|
|
|
entrance_prefix: str = "icu/entrance/"
|
|
processed_prefix: str = "icu/processed/"
|
|
thumb_prefix: str = "icu/thumbnails/"
|
|
|
|
min_age_seconds: int = 90
|
|
poll_seconds: int = 30
|
|
|
|
# OCR crop settings (tune if needed)
|
|
ocr_crop_w_frac: float = 0.42 # right ~42% of width
|
|
ocr_crop_h_frac: float = 0.22 # bottom ~22% of height
|
|
|
|
thumb_max: int = 512
|
|
|
|
|
|
def load_config(path: str) -> AppConfig:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
raw = yaml.safe_load(f)
|
|
|
|
# allow env override for secret injection if desired
|
|
def env_or(key: str, default=None):
|
|
return os.environ.get(key, default)
|
|
|
|
s3 = raw["s3"]
|
|
pg = raw["postgres"]
|
|
app = raw.get("app", {})
|
|
|
|
return AppConfig(
|
|
s3_endpoint=env_or("S3_ENDPOINT", s3["endpoint"]),
|
|
s3_access_key=env_or("S3_ACCESS_KEY", s3["access_key"]),
|
|
s3_secret_key=env_or("S3_SECRET_KEY", s3["secret_key"]),
|
|
s3_bucket=env_or("S3_BUCKET", s3["bucket"]),
|
|
pg_dsn=env_or("PG_DSN", pg["dsn"]),
|
|
entrance_prefix=app.get("entrance_prefix", "icu/entrance/"),
|
|
processed_prefix=app.get("processed_prefix", "icu/processed/"),
|
|
thumb_prefix=app.get("thumb_prefix", "icu/thumbnails/"),
|
|
min_age_seconds=int(app.get("min_age_seconds", 90)),
|
|
poll_seconds=int(app.get("poll_seconds", 30)),
|
|
ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)),
|
|
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
|
|
thumb_max=int(app.get("thumb_max", 512)),
|
|
)
|
|
|
|
|
|
# -----------------------
|
|
# Helpers
|
|
# -----------------------
|
|
def is_old_enough(last_modified, min_age_seconds: int) -> bool:
|
|
now = datetime.now(timezone.utc)
|
|
return (now - last_modified).total_seconds() >= min_age_seconds
|
|
|
|
|
|
def to_exif_datetime(dt: datetime) -> str:
|
|
return dt.strftime("%Y:%m:%d %H:%M:%S")
|
|
|
|
|
|
def tz_offset_str(dt: datetime) -> str:
|
|
off = dt.utcoffset()
|
|
if off is None:
|
|
off = timezone.utc.utcoffset(dt)
|
|
total = int(off.total_seconds())
|
|
sign = "+" if total >= 0 else "-"
|
|
total = abs(total)
|
|
hh = total // 3600
|
|
mm = (total % 3600) // 60
|
|
return f"{sign}{hh:02d}:{mm:02d}"
|
|
|
|
|
|
def parse_servertime(meta: dict) -> datetime:
|
|
s = meta.get("servertime")
|
|
if not s:
|
|
raise ValueError("metadata.servertime missing")
|
|
|
|
# Example: "2025-06-30T04:03:04+0000"
|
|
if re.match(r".*[+-]\d{4}$", s):
|
|
return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z")
|
|
# Fallback: ISO-ish
|
|
return datetime.fromisoformat(s)
|
|
|
|
|
|
def parse_customer_tz(meta: dict):
|
|
# CustomerTimezone applies to OCR (camera overlay)
|
|
name = meta.get("CustomerTimezone") or "UTC"
|
|
z = tz.gettz(name)
|
|
if z is None:
|
|
raise ValueError(f"Unknown CustomerTimezone: {name}")
|
|
return z, name
|
|
|
|
|
|
def ocr_extract_timestamp(jpg_bytes: bytes, crop_w_frac: float, crop_h_frac: float, customer_tzinfo) -> datetime:
|
|
from io import BytesIO
|
|
im = Image.open(BytesIO(jpg_bytes)).convert("RGB")
|
|
|
|
w, h = im.size
|
|
x0 = int(w * (1.0 - crop_w_frac))
|
|
y0 = int(h * (1.0 - crop_h_frac))
|
|
crop = im.crop((x0, y0, w, h))
|
|
|
|
gray = ImageOps.grayscale(crop)
|
|
gray = ImageOps.autocontrast(gray)
|
|
gray = ImageEnhance.Contrast(gray).enhance(2.5)
|
|
gray = ImageEnhance.Sharpness(gray).enhance(2.0)
|
|
|
|
bw = gray.point(lambda p: 255 if p > 180 else 0)
|
|
|
|
cfg = r'--oem 3 --psm 6 -c tessedit_char_whitelist=0123456789:- '
|
|
text = pytesseract.image_to_string(bw, config=cfg).strip()
|
|
|
|
m = OCR_DT_RE.search(text)
|
|
if not m:
|
|
# retry without thresholding
|
|
text2 = pytesseract.image_to_string(gray, config=cfg).strip()
|
|
m = OCR_DT_RE.search(text2)
|
|
if not m:
|
|
raise ValueError(f"OCR timestamp not found. OCR1='{text}' OCR2='{text2}'")
|
|
|
|
dt_local_naive = datetime(
|
|
int(m.group("y")),
|
|
int(m.group("mo")),
|
|
int(m.group("d")),
|
|
int(m.group("h")),
|
|
int(m.group("m")),
|
|
int(m.group("s")),
|
|
)
|
|
|
|
# Keep camera-local timezone (no UTC conversion requested)
|
|
return dt_local_naive.replace(tzinfo=customer_tzinfo)
|
|
|
|
|
|
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime) -> bytes:
|
|
"""
|
|
DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal
|
|
DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00
|
|
"""
|
|
dto = to_exif_datetime(dt_original_local)
|
|
oto = tz_offset_str(dt_original_local)
|
|
|
|
dtd = to_exif_datetime(dt_digitized_utc.astimezone(timezone.utc))
|
|
otd = "+00:00"
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
in_path = os.path.join(td, "in.jpg")
|
|
with open(in_path, "wb") as f:
|
|
f.write(jpg_in)
|
|
|
|
cmd = [
|
|
"exiftool",
|
|
"-overwrite_original",
|
|
f"-DateTimeOriginal={dto}",
|
|
f"-OffsetTimeOriginal={oto}",
|
|
f"-DateTimeDigitized={dtd}",
|
|
f"-OffsetTimeDigitized={otd}",
|
|
f"-ModifyDate={dtd}",
|
|
f"-OffsetTime={otd}",
|
|
in_path,
|
|
]
|
|
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
with open(in_path, "rb") as f:
|
|
return f.read()
|
|
|
|
|
|
def make_thumbnail_bytes(jpg_bytes: bytes, thumb_max: int) -> bytes:
|
|
from io import BytesIO
|
|
im = Image.open(BytesIO(jpg_bytes)).convert("RGB")
|
|
im.thumbnail((thumb_max, thumb_max))
|
|
out = BytesIO()
|
|
im.save(out, format="JPEG", quality=85, optimize=True)
|
|
return out.getvalue()
|
|
|
|
|
|
# -----------------------
|
|
# DB ops
|
|
# -----------------------
|
|
def db_init(cur):
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS remote_cam.import_job (
|
|
image_uuid text PRIMARY KEY,
|
|
status text NOT NULL DEFAULT 'NEW',
|
|
attempts integer NOT NULL DEFAULT 0,
|
|
has_categories boolean,
|
|
last_error text,
|
|
entrance_jpg_key text,
|
|
entrance_meta_key text,
|
|
entrance_cat_key text,
|
|
processed_jpg_key text,
|
|
thumbnail_key text,
|
|
created_ts timestamptz NOT NULL DEFAULT now(),
|
|
updated_ts timestamptz NOT NULL DEFAULT now()
|
|
);
|
|
""")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS import_job_status_idx ON remote_cam.import_job(status);")
|
|
cur.execute("CREATE INDEX IF NOT EXISTS import_job_updated_idx ON remote_cam.import_job(updated_ts);")
|
|
|
|
cur.execute("""
|
|
CREATE OR REPLACE FUNCTION remote_cam.set_updated_ts()
|
|
RETURNS trigger AS $$
|
|
BEGIN
|
|
NEW.updated_ts = now();
|
|
RETURN NEW;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
""")
|
|
cur.execute("""
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'import_job_set_updated_ts') THEN
|
|
CREATE TRIGGER import_job_set_updated_ts
|
|
BEFORE UPDATE ON remote_cam.import_job
|
|
FOR EACH ROW EXECUTE FUNCTION remote_cam.set_updated_ts();
|
|
END IF;
|
|
END$$;
|
|
""")
|
|
|
|
|
|
def job_upsert_new(cur, uuid: str, jpg_key: str, meta_key: str, cat_key: Optional[str]):
|
|
cur.execute("""
|
|
INSERT INTO remote_cam.import_job (image_uuid, status, entrance_jpg_key, entrance_meta_key, entrance_cat_key, has_categories)
|
|
VALUES (%s, 'NEW', %s, %s, %s, %s)
|
|
ON CONFLICT (image_uuid) DO UPDATE
|
|
SET entrance_jpg_key = EXCLUDED.entrance_jpg_key,
|
|
entrance_meta_key = EXCLUDED.entrance_meta_key,
|
|
entrance_cat_key = EXCLUDED.entrance_cat_key,
|
|
has_categories = EXCLUDED.has_categories;
|
|
""", (uuid, jpg_key, meta_key, cat_key, cat_key is not None))
|
|
|
|
|
|
def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, processed_key: Optional[str] = None, thumb_key: Optional[str] = None):
|
|
cur.execute("""
|
|
UPDATE remote_cam.import_job
|
|
SET status = %s,
|
|
attempts = attempts + CASE WHEN %s = 'ERROR' THEN 1 ELSE 0 END,
|
|
last_error = CASE WHEN %s IS NULL THEN last_error ELSE %s END,
|
|
processed_jpg_key = COALESCE(%s, processed_jpg_key),
|
|
thumbnail_key = COALESCE(%s, thumbnail_key)
|
|
WHERE image_uuid = %s;
|
|
""", (status, status, err, err, err, processed_key, thumb_key, uuid))
|
|
|
|
|
|
def insert_resource(cur, uuid: str, typ: str):
|
|
cur.execute("""
|
|
INSERT INTO remote_cam.resource(resource_uuid, type, import_ts)
|
|
VALUES (%s, %s, now())
|
|
ON CONFLICT DO NOTHING;
|
|
""", (uuid, typ))
|
|
|
|
|
|
def insert_metadata(cur, uuid: str, meta: dict):
|
|
def get(k): return meta.get(k)
|
|
|
|
servertime = parse_servertime(meta)
|
|
|
|
icd_raw = get("ImageCreationDate")
|
|
icd = None
|
|
if icd_raw:
|
|
try:
|
|
# "06:03:01 30.06.2025" -> assume UTC if no tz info
|
|
icd = datetime.strptime(icd_raw, "%H:%M:%S %d.%m.%Y").replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
icd = None
|
|
|
|
lat = get("Latitude")
|
|
lon = get("Longitude")
|
|
lat_f = float(lat) if lat not in (None, "") else None
|
|
lon_f = float(lon) if lon not in (None, "") else None
|
|
|
|
# NOTE: DateTimeLastSettings looks like "05:45:35 30.06.2025" in sample; parse if needed later.
|
|
cur.execute("""
|
|
INSERT INTO remote_cam.metadata(
|
|
metadata_uuid, import_ts, ImageCreationDate, SizeInByte, ImageFormat, ImageNight,
|
|
DateTimeLastSettings, BatteryStatus, FirmwareVersion, SignalStrength, Temperature,
|
|
CoordinateSwitch, Latitude, Longitude, WorkPeriod, WorkStart, WorkEnd, ThumbnailSize,
|
|
PIRInterval, TimeScan, imei, iccid, INITKey, CameraName, CameraModel, CameraLanguage,
|
|
CameraNetwork, MobileNetworkCode, MobileCountryCode, LocationAreaCode, CustomerTimezone,
|
|
CustomerLanguage, servertime, ocr_ts
|
|
) VALUES (
|
|
%s, now(), %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, NULL
|
|
)
|
|
ON CONFLICT (metadata_uuid) DO UPDATE SET
|
|
import_ts = EXCLUDED.import_ts,
|
|
ImageCreationDate = EXCLUDED.ImageCreationDate,
|
|
SizeInByte = EXCLUDED.SizeInByte,
|
|
ImageFormat = EXCLUDED.ImageFormat,
|
|
ImageNight = EXCLUDED.ImageNight,
|
|
DateTimeLastSettings = EXCLUDED.DateTimeLastSettings,
|
|
BatteryStatus = EXCLUDED.BatteryStatus,
|
|
FirmwareVersion = EXCLUDED.FirmwareVersion,
|
|
SignalStrength = EXCLUDED.SignalStrength,
|
|
Temperature = EXCLUDED.Temperature,
|
|
CoordinateSwitch = EXCLUDED.CoordinateSwitch,
|
|
Latitude = EXCLUDED.Latitude,
|
|
Longitude = EXCLUDED.Longitude,
|
|
WorkPeriod = EXCLUDED.WorkPeriod,
|
|
WorkStart = EXCLUDED.WorkStart,
|
|
WorkEnd = EXCLUDED.WorkEnd,
|
|
ThumbnailSize = EXCLUDED.ThumbnailSize,
|
|
PIRInterval = EXCLUDED.PIRInterval,
|
|
TimeScan = EXCLUDED.TimeScan,
|
|
imei = EXCLUDED.imei,
|
|
iccid = EXCLUDED.iccid,
|
|
INITKey = EXCLUDED.INITKey,
|
|
CameraName = EXCLUDED.CameraName,
|
|
CameraModel = EXCLUDED.CameraModel,
|
|
CameraLanguage = EXCLUDED.CameraLanguage,
|
|
CameraNetwork = EXCLUDED.CameraNetwork,
|
|
MobileNetworkCode = EXCLUDED.MobileNetworkCode,
|
|
MobileCountryCode = EXCLUDED.MobileCountryCode,
|
|
LocationAreaCode = EXCLUDED.LocationAreaCode,
|
|
CustomerTimezone = EXCLUDED.CustomerTimezone,
|
|
CustomerLanguage = EXCLUDED.CustomerLanguage,
|
|
servertime = EXCLUDED.servertime;
|
|
""", (
|
|
uuid, icd, get("SizeInByte"), get("ImageFormat"), get("ImageNight"),
|
|
get("DateTimeLastSettings"), get("BatteryStatus"), get("FirmwareVersion"),
|
|
get("SignalStrength"), get("Temperature"),
|
|
get("CoordinateSwitch"), lat_f, lon_f, get("WorkPeriod"),
|
|
get("WorkStart"), get("WorkEnd"), get("ThumbnailSize"),
|
|
get("PIRInterval"), get("TimeScan"), get("imei"), get("iccid"), get("INITKey"),
|
|
get("CameraName"), get("CameraModel"), get("CameraLanguage"), get("CameraNetwork"),
|
|
get("MobileNetworkCode"), get("MobileCountryCode"), get("LocationAreaCode"),
|
|
get("CustomerTimezone"), get("CustomerLanguage"), servertime
|
|
))
|
|
|
|
|
|
def update_ocr_ts(cur, uuid: str, ocr_ts: datetime):
|
|
cur.execute("""
|
|
UPDATE remote_cam.metadata
|
|
SET ocr_ts = %s
|
|
WHERE metadata_uuid = %s;
|
|
""", (ocr_ts, uuid))
|
|
|
|
|
|
def insert_categories(cur, uuid: str, cat_json: dict):
|
|
version = cat_json.get("version")
|
|
cats = cat_json.get("categories") or []
|
|
cur.execute("DELETE FROM remote_cam.category WHERE image_uuid = %s;", (uuid,))
|
|
for c in cats:
|
|
cur.execute("""
|
|
INSERT INTO remote_cam.category(image_uuid, category, score, version)
|
|
VALUES (%s, %s, %s, %s);
|
|
""", (uuid, c.get("category"), c.get("score"), version))
|
|
|
|
|
|
# -----------------------
|
|
# S3 ops wrapper
|
|
# -----------------------
|
|
class S3:
|
|
def __init__(self, cfg: AppConfig):
|
|
self.cfg = cfg
|
|
self.client = boto3.client(
|
|
"s3",
|
|
endpoint_url=cfg.s3_endpoint,
|
|
aws_access_key_id=cfg.s3_access_key,
|
|
aws_secret_access_key=cfg.s3_secret_key,
|
|
)
|
|
|
|
def list_entrance_objects(self):
|
|
paginator = self.client.get_paginator("list_objects_v2")
|
|
for page in paginator.paginate(Bucket=self.cfg.s3_bucket, Prefix=self.cfg.entrance_prefix):
|
|
for obj in page.get("Contents", []):
|
|
yield obj
|
|
|
|
def get_bytes(self, key: str) -> bytes:
|
|
resp = self.client.get_object(Bucket=self.cfg.s3_bucket, Key=key)
|
|
return resp["Body"].read()
|
|
|
|
def put_bytes(self, key: str, data: bytes, content_type: str):
|
|
self.client.put_object(Bucket=self.cfg.s3_bucket, Key=key, Body=data, ContentType=content_type)
|
|
|
|
def head(self, key: str):
|
|
return self.client.head_object(Bucket=self.cfg.s3_bucket, Key=key)
|
|
|
|
def delete_keys(self, keys):
|
|
if not keys:
|
|
return
|
|
self.client.delete_objects(
|
|
Bucket=self.cfg.s3_bucket,
|
|
Delete={"Objects": [{"Key": k} for k in keys], "Quiet": True},
|
|
)
|
|
|
|
|
|
# -----------------------
|
|
# Main processing
|
|
# -----------------------
|
|
@dataclass
|
|
class EntranceSet:
|
|
uuid: str
|
|
jpg_key: str
|
|
meta_key: str
|
|
cat_key: Optional[str]
|
|
lm_jpg: datetime
|
|
lm_meta: datetime
|
|
lm_cat: Optional[datetime]
|
|
|
|
|
|
def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]:
|
|
groups: Dict[str, Dict[str, Tuple[str, datetime]]] = {}
|
|
for obj in s3.list_entrance_objects():
|
|
key = obj["Key"]
|
|
name = key.split("/")[-1]
|
|
lm = obj["LastModified"]
|
|
|
|
m1 = UUID_RE.match(name)
|
|
if m1:
|
|
uuid = m1.group(1)
|
|
ext = m1.group(2)
|
|
typ = "jpg" if ext == "jpg" else "meta"
|
|
groups.setdefault(uuid, {})[typ] = (key, lm)
|
|
continue
|
|
|
|
m2 = CAT_RE.match(name)
|
|
if m2:
|
|
uuid = m2.group(1)
|
|
groups.setdefault(uuid, {})["cat"] = (key, lm)
|
|
continue
|
|
|
|
ready: list[EntranceSet] = []
|
|
for uuid, parts in groups.items():
|
|
if "jpg" in parts and "meta" in parts:
|
|
jpg_key, lm_j = parts["jpg"]
|
|
meta_key, lm_m = parts["meta"]
|
|
cat = parts.get("cat")
|
|
cat_key = cat[0] if cat else None
|
|
lm_c = cat[1] if cat else None
|
|
|
|
if not (is_old_enough(lm_j, cfg.min_age_seconds) and is_old_enough(lm_m, cfg.min_age_seconds)):
|
|
continue
|
|
if cat and not is_old_enough(lm_c, cfg.min_age_seconds):
|
|
continue
|
|
|
|
ready.append(EntranceSet(uuid, jpg_key, meta_key, cat_key, lm_j, lm_m, lm_c))
|
|
return ready
|
|
|
|
|
|
def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
|
|
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
|
|
|
# 1) metadata.json -> DB
|
|
meta = json.loads(s3.get_bytes(s.meta_key))
|
|
insert_metadata(cur, s.uuid, meta)
|
|
insert_resource(cur, s.uuid, "metadata")
|
|
job_set_status(cur, s.uuid, "META_OK")
|
|
|
|
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
|
|
jpg_bytes = s3.get_bytes(s.jpg_key)
|
|
customer_tzinfo, _ = parse_customer_tz(meta)
|
|
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
|
|
update_ocr_ts(cur, s.uuid, ocr_ts)
|
|
insert_resource(cur, s.uuid, "image")
|
|
job_set_status(cur, s.uuid, "OCR_OK")
|
|
|
|
# 3) EXIF write:
|
|
# DateTimeOriginal = ocr_ts (local, keep offset tags)
|
|
# DateTimeDigitized = servertime (UTC)
|
|
servertime = parse_servertime(meta)
|
|
jpg_exif = exif_write_with_exiftool(jpg_bytes, ocr_ts, servertime)
|
|
job_set_status(cur, s.uuid, "EXIF_OK")
|
|
|
|
# 4) Upload processed JPG (EXIF patched)
|
|
processed_key = f"{cfg.processed_prefix}{s.uuid}.jpg"
|
|
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
|
|
s3.head(processed_key)
|
|
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
|
|
|
|
# 5) Thumbnail
|
|
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
|
|
thumb_key = f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
|
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
|
|
s3.head(thumb_key)
|
|
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
|
|
|
|
# 6) Optional categories
|
|
if s.cat_key:
|
|
cat_json = json.loads(s3.get_bytes(s.cat_key))
|
|
insert_categories(cur, s.uuid, cat_json)
|
|
insert_resource(cur, s.uuid, "classification")
|
|
job_set_status(cur, s.uuid, "CATEGORIES_OK")
|
|
|
|
# 7) Cleanup entrance only after verify
|
|
to_delete = [s.jpg_key, s.meta_key]
|
|
if s.cat_key:
|
|
to_delete.append(s.cat_key)
|
|
s3.delete_keys(to_delete)
|
|
job_set_status(cur, s.uuid, "CLEANED")
|
|
|
|
|
|
def run_once(cfg: AppConfig) -> int:
|
|
s3c = S3(cfg)
|
|
ready = collect_ready_sets(s3c, cfg)
|
|
if not ready:
|
|
return 0
|
|
|
|
with psycopg.connect(cfg.pg_dsn) as db:
|
|
with db.cursor() as cur:
|
|
db_init(cur)
|
|
db.commit()
|
|
|
|
for s in ready:
|
|
try:
|
|
with db.transaction():
|
|
with db.cursor() as cur:
|
|
process_one(cur, s3c, cfg, s)
|
|
except Exception as e:
|
|
with db.transaction():
|
|
with db.cursor() as cur:
|
|
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
|
job_set_status(cur, s.uuid, "ERROR", err=str(e))
|
|
print(f"[ERROR] {s.uuid}: {e}")
|
|
return len(ready)
|
|
|
|
|
|
def main():
|
|
cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml")
|
|
cfg = load_config(cfg_path)
|
|
|
|
while True:
|
|
n = run_once(cfg)
|
|
if n == 0:
|
|
time.sleep(cfg.poll_seconds)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|