Add environment configuration and enhance Shiny app with AWS integration
This commit is contained in:
155
main.py
155
main.py
@@ -64,6 +64,8 @@ class AppConfig:
|
||||
ocr_crop_h_frac: float = 0.22 # bottom ~22% of height
|
||||
|
||||
thumb_max: int = 512
|
||||
exiftool_timeout_seconds: int = 15
|
||||
job_sleep_seconds: float = 1.0
|
||||
|
||||
|
||||
def load_config(path: str) -> AppConfig:
|
||||
@@ -92,6 +94,8 @@ def load_config(path: str) -> AppConfig:
|
||||
ocr_crop_w_frac=float(app.get("ocr_crop_w_frac", 0.42)),
|
||||
ocr_crop_h_frac=float(app.get("ocr_crop_h_frac", 0.22)),
|
||||
thumb_max=int(app.get("thumb_max", 512)),
|
||||
exiftool_timeout_seconds=int(app.get("exiftool_timeout_seconds", 15)),
|
||||
job_sleep_seconds=float(app.get("job_sleep_seconds", 1.0)),
|
||||
)
|
||||
|
||||
|
||||
@@ -210,7 +214,7 @@ def _parse_ocr_datetime(*texts: str) -> datetime:
|
||||
raise ValueError(f"OCR timestamp not found. OCR1='{texts[0] if texts else ''}' OCR2='{texts[1] if len(texts) > 1 else ''}'")
|
||||
|
||||
|
||||
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime) -> bytes:
|
||||
def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digitized_utc: datetime, *, timeout_seconds: int) -> bytes:
|
||||
"""
|
||||
DateTimeOriginal = OCR time (camera local) + OffsetTimeOriginal
|
||||
DateTimeDigitized = servertime (UTC) + OffsetTimeDigitized=+00:00
|
||||
@@ -237,7 +241,7 @@ def exif_write_with_exiftool(jpg_in: bytes, dt_original_local: datetime, dt_digi
|
||||
f"-OffsetTime={otd}",
|
||||
in_path,
|
||||
]
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_seconds)
|
||||
|
||||
with open(in_path, "rb") as f:
|
||||
return f.read()
|
||||
@@ -320,6 +324,41 @@ def job_set_status(cur, uuid: str, status: str, *, err: Optional[str] = None, pr
|
||||
""", (status, status, err, processed_key, thumb_key, uuid))
|
||||
|
||||
|
||||
def job_get(cur, uuid: str):
|
||||
cur.execute("""
|
||||
SELECT status, processed_jpg_key, thumbnail_key, has_categories
|
||||
FROM remote_cam.import_job
|
||||
WHERE image_uuid = %s;
|
||||
""", (uuid,))
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {
|
||||
"status": row[0],
|
||||
"processed_jpg_key": row[1],
|
||||
"thumbnail_key": row[2],
|
||||
"has_categories": row[3],
|
||||
}
|
||||
|
||||
|
||||
STATUS_ORDER = [
|
||||
"NEW",
|
||||
"META_OK",
|
||||
"OCR_OK",
|
||||
"EXIF_OK",
|
||||
"MOVED",
|
||||
"THUMB_OK",
|
||||
"CATEGORIES_OK",
|
||||
"CLEANED",
|
||||
]
|
||||
|
||||
|
||||
def status_rank(status: Optional[str]) -> int:
|
||||
if status in STATUS_ORDER:
|
||||
return STATUS_ORDER.index(status)
|
||||
return -1
|
||||
|
||||
|
||||
def insert_resource(cur, uuid: str, typ: str):
|
||||
cur.execute("""
|
||||
INSERT INTO remote_cam.resource(resource_uuid, type, import_ts)
|
||||
@@ -424,6 +463,16 @@ def update_ocr_ts(cur, uuid: str, ocr_ts: datetime):
|
||||
""", (ocr_ts, uuid))
|
||||
|
||||
|
||||
def get_ocr_ts(cur, uuid: str) -> Optional[datetime]:
|
||||
cur.execute("""
|
||||
SELECT ocr_ts
|
||||
FROM remote_cam.metadata
|
||||
WHERE metadata_uuid = %s;
|
||||
""", (uuid,))
|
||||
row = cur.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
def insert_categories(cur, uuid: str, cat_json: dict):
|
||||
version = cat_json.get("version")
|
||||
cats = cat_json.get("categories") or []
|
||||
@@ -528,54 +577,97 @@ def collect_ready_sets(s3: S3, cfg: AppConfig) -> list[EntranceSet]:
|
||||
|
||||
def process_one(cur, s3: S3, cfg: AppConfig, s: EntranceSet):
|
||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||
job = job_get(cur, s.uuid) or {}
|
||||
status_now = job.get("status", "NEW")
|
||||
|
||||
# 1) metadata.json -> DB
|
||||
meta = json.loads(s3.get_bytes(s.meta_key))
|
||||
insert_metadata(cur, s.uuid, meta)
|
||||
insert_resource(cur, s.uuid, "metadata")
|
||||
job_set_status(cur, s.uuid, "META_OK")
|
||||
if status_rank("META_OK") >= status_rank(status_now):
|
||||
job_set_status(cur, s.uuid, "META_OK")
|
||||
status_now = "META_OK"
|
||||
|
||||
# 2) JPG -> OCR bottom-right -> ocr_ts in CustomerTimezone
|
||||
jpg_bytes = s3.get_bytes(s.jpg_key)
|
||||
customer_tzinfo, _ = parse_customer_tz(meta)
|
||||
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
|
||||
update_ocr_ts(cur, s.uuid, ocr_ts)
|
||||
insert_resource(cur, s.uuid, "image")
|
||||
job_set_status(cur, s.uuid, "OCR_OK")
|
||||
ocr_ts = None
|
||||
if status_rank(status_now) < status_rank("OCR_OK"):
|
||||
try:
|
||||
customer_tzinfo, _ = parse_customer_tz(meta)
|
||||
ocr_ts = ocr_extract_timestamp(jpg_bytes, cfg.ocr_crop_w_frac, cfg.ocr_crop_h_frac, customer_tzinfo)
|
||||
update_ocr_ts(cur, s.uuid, ocr_ts)
|
||||
insert_resource(cur, s.uuid, "image")
|
||||
job_set_status(cur, s.uuid, "OCR_OK")
|
||||
status_now = "OCR_OK"
|
||||
except Exception as e:
|
||||
job_set_status(cur, s.uuid, "OCR_FAIL", err=str(e))
|
||||
status_now = "OCR_FAIL"
|
||||
elif status_rank(status_now) >= status_rank("OCR_OK"):
|
||||
ocr_ts = get_ocr_ts(cur, s.uuid)
|
||||
|
||||
# 3) EXIF write:
|
||||
# DateTimeOriginal = ocr_ts (local, keep offset tags)
|
||||
# DateTimeDigitized = servertime (UTC)
|
||||
servertime = parse_servertime(meta)
|
||||
jpg_exif = exif_write_with_exiftool(jpg_bytes, ocr_ts, servertime)
|
||||
job_set_status(cur, s.uuid, "EXIF_OK")
|
||||
jpg_exif = jpg_bytes
|
||||
if status_rank(status_now) < status_rank("MOVED"):
|
||||
if ocr_ts is None:
|
||||
job_set_status(cur, s.uuid, "EXIF_SKIPPED", err="OCR missing")
|
||||
status_now = "EXIF_SKIPPED"
|
||||
else:
|
||||
try:
|
||||
servertime = parse_servertime(meta)
|
||||
jpg_exif = exif_write_with_exiftool(
|
||||
jpg_bytes,
|
||||
ocr_ts,
|
||||
servertime,
|
||||
timeout_seconds=cfg.exiftool_timeout_seconds,
|
||||
)
|
||||
job_set_status(cur, s.uuid, "EXIF_OK")
|
||||
status_now = "EXIF_OK"
|
||||
except Exception as e:
|
||||
job_set_status(cur, s.uuid, "EXIF_FAIL", err=str(e))
|
||||
status_now = "EXIF_FAIL"
|
||||
|
||||
# 4) Upload processed JPG (EXIF patched)
|
||||
processed_key = f"{cfg.processed_prefix}{s.uuid}.jpg"
|
||||
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
|
||||
s3.head(processed_key)
|
||||
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
|
||||
processed_key = job.get("processed_jpg_key") or f"{cfg.processed_prefix}{s.uuid}.jpg"
|
||||
if status_rank(status_now) < status_rank("MOVED"):
|
||||
s3.put_bytes(processed_key, jpg_exif, "image/jpeg")
|
||||
s3.head(processed_key)
|
||||
job_set_status(cur, s.uuid, "MOVED", processed_key=processed_key)
|
||||
status_now = "MOVED"
|
||||
|
||||
# 5) Thumbnail
|
||||
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
|
||||
thumb_key = f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
||||
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
|
||||
s3.head(thumb_key)
|
||||
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
|
||||
thumb_key = job.get("thumbnail_key") or f"{cfg.thumb_prefix}{s.uuid}.jpg"
|
||||
if status_rank(status_now) < status_rank("THUMB_OK"):
|
||||
if status_rank(status_now) >= status_rank("MOVED") and jpg_exif is jpg_bytes:
|
||||
jpg_exif = s3.get_bytes(processed_key)
|
||||
thumb_bytes = make_thumbnail_bytes(jpg_exif, cfg.thumb_max)
|
||||
s3.put_bytes(thumb_key, thumb_bytes, "image/jpeg")
|
||||
s3.head(thumb_key)
|
||||
job_set_status(cur, s.uuid, "THUMB_OK", thumb_key=thumb_key)
|
||||
status_now = "THUMB_OK"
|
||||
|
||||
# 6) Optional categories
|
||||
if s.cat_key:
|
||||
if s.cat_key and status_rank(status_now) < status_rank("CATEGORIES_OK"):
|
||||
cat_json = json.loads(s3.get_bytes(s.cat_key))
|
||||
insert_categories(cur, s.uuid, cat_json)
|
||||
insert_resource(cur, s.uuid, "classification")
|
||||
job_set_status(cur, s.uuid, "CATEGORIES_OK")
|
||||
status_now = "CATEGORIES_OK"
|
||||
|
||||
# 7) Cleanup entrance only after verify
|
||||
to_delete = [s.jpg_key, s.meta_key]
|
||||
if s.cat_key:
|
||||
to_delete.append(s.cat_key)
|
||||
s3.delete_keys(to_delete)
|
||||
job_set_status(cur, s.uuid, "CLEANED")
|
||||
if status_rank(status_now) >= status_rank("THUMB_OK"):
|
||||
try:
|
||||
s3.head(processed_key)
|
||||
s3.head(thumb_key)
|
||||
to_delete = [s.jpg_key, s.meta_key]
|
||||
if s.cat_key:
|
||||
to_delete.append(s.cat_key)
|
||||
s3.delete_keys(to_delete)
|
||||
job_set_status(cur, s.uuid, "CLEANED")
|
||||
status_now = "CLEANED"
|
||||
except Exception as e:
|
||||
job_set_status(cur, s.uuid, "CLEANUP_SKIPPED", err=str(e))
|
||||
|
||||
|
||||
def run_once(cfg: AppConfig) -> int:
|
||||
@@ -585,10 +677,6 @@ def run_once(cfg: AppConfig) -> int:
|
||||
return 0
|
||||
|
||||
with psycopg.connect(cfg.pg_dsn) as db:
|
||||
with db.cursor() as cur:
|
||||
db_init(cur)
|
||||
db.commit()
|
||||
|
||||
for s in ready:
|
||||
try:
|
||||
with db.transaction():
|
||||
@@ -600,6 +688,8 @@ def run_once(cfg: AppConfig) -> int:
|
||||
job_upsert_new(cur, s.uuid, s.jpg_key, s.meta_key, s.cat_key)
|
||||
job_set_status(cur, s.uuid, "ERROR", err=str(e))
|
||||
print(f"[ERROR] {s.uuid}: {e}")
|
||||
if cfg.job_sleep_seconds > 0:
|
||||
time.sleep(cfg.job_sleep_seconds)
|
||||
return len(ready)
|
||||
|
||||
|
||||
@@ -607,6 +697,11 @@ def main():
|
||||
cfg_path = os.environ.get("CONFIG_YAML", "/app/config.yaml")
|
||||
cfg = load_config(cfg_path)
|
||||
|
||||
with psycopg.connect(cfg.pg_dsn) as db:
|
||||
with db.cursor() as cur:
|
||||
db_init(cur)
|
||||
db.commit()
|
||||
|
||||
while True:
|
||||
n = run_once(cfg)
|
||||
if n == 0:
|
||||
|
||||
Reference in New Issue
Block a user