"""Rectifier for full-meter images of a Badger Model 55 water meter. A raw camera frame of the meter face is tilted (the camera doesn't sit perfectly square to the meter) and the digit strip occupies a small fraction of the image. This module takes a 1920×1080 BGR frame and produces eight 86×105 BGR slot crops, one per digit drum, axis-aligned and at canonical scale. The pipeline: 1. Deskew — find the digit-strip tilt angle and rotate so the strip is horizontal. 2. Detect — segment the dark digit-window borders against the bright meter face and find their bounding boxes. 3. Assign — figure out which detected window corresponds to which slot index (0..7), handling missing detections. 4. Fit — solve a partial-affine (rotation + uniform scale + translation) from detected window centers to their canonical positions. 5. Warp — apply the affine + a translation that maps the strip directly into a tight (175, 736) crop. 6. Slice — cut the tight crop into 8 × (105, 86) slot crops. All geometric constants below were calibrated for the source meter and camera used to build the published dataset. They are pixel coordinates, not anything more interesting. """ from __future__ import annotations import cv2 import numpy as np # ── Canonical strip layout ──────────────────────────────────────────── CANONICAL_W, CANONICAL_H = 1920, 1080 WIN_W, WIN_H = 80, 105 # nominal digit-window size, px STEP = 86 # horizontal spacing between slots STRIP_X0 = 580 # canonical x of slot-0 left edge STRIP_Y0 = 480 # canonical y of strip top SLOT_W, SLOT_H = STEP, WIN_H # per-slot crop dims (86, 105) TIGHT_PAD_X = 0 TIGHT_PAD_Y = 0 TIGHT_H = WIN_H + 2 * TIGHT_PAD_Y # 175 TIGHT_W = 8 * STEP + 2 * TIGHT_PAD_X # 736 # ── Stage 1: deskew ─────────────────────────────────────────────────── def detect_rotation_degrees(img_bgr: np.ndarray, max_abs_deg: float = 20.0) -> float: """Estimate the strip's tilt by finding digit-window centroids and fitting a line through them. Returns degrees-clockwise to rotate the image to level. Falls back to 0° if fewer than 4 windows are found.""" H, W = img_bgr.shape[:2] y0, y1 = int(H * 0.36), int(H * 0.47) x0, x1 = int(W * 0.25), int(W * 0.75) roi = img_bgr[y0:y1, x0:x1] if roi.size == 0: return 0.0 gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) _, thr = cv2.threshold(gray, 60, 255, cv2.THRESH_BINARY_INV) n, _, stats, centroids = cv2.connectedComponentsWithStats(thr, connectivity=8) if n < 5: return 0.0 cx_list, cy_list = [], [] for i in range(1, n): x, y, w, h, area = stats[i] if w == 0: continue aspect = h / max(w, 1) if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200: cx_list.append(float(centroids[i][0])) cy_list.append(float(centroids[i][1])) if len(cx_list) < 4: return 0.0 cx = np.array(cx_list); cy = np.array(cy_list) A = np.vstack([cx, np.ones_like(cx)]).T slope, _ = np.linalg.lstsq(A, cy, rcond=None)[0] angle = float(np.degrees(np.arctan(slope))) return 0.0 if abs(angle) > max_abs_deg else angle def deskew(img_bgr: np.ndarray) -> tuple[np.ndarray, float]: """Rotate `img_bgr` so the digit strip is horizontal. Returns `(leveled_image, angle_applied_deg)`.""" angle = detect_rotation_degrees(img_bgr) H, W = img_bgr.shape[:2] M = cv2.getRotationMatrix2D((W / 2.0, H / 2.0), angle, 1.0) rotated = cv2.warpAffine(img_bgr, M, (W, H), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) return rotated, angle # ── Stage 2: detect digit windows ───────────────────────────────────── def detect_digit_windows(img_bgr: np.ndarray, threshold: int = 60, use_otsu: bool = False) -> list[tuple[int, int, int, int]]: """Find dark digit-window rectangles against the bright meter face. Apply *after* `deskew`. Returns `[(x0, y0, x1, y1), ...]` in image coordinates, left to right. The threshold default works for the dataset's source-camera exposure; pass `use_otsu=True` for re-warped or off-camera images where the default misses borders.""" H, W = img_bgr.shape[:2] y0_roi, y1_roi = int(H * 0.34), int(H * 0.49) x0_roi, x1_roi = int(W * 0.20), int(W * 0.80) roi = img_bgr[y0_roi:y1_roi, x0_roi:x1_roi] if roi.size == 0: return [] gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) if use_otsu: _, thr = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) else: _, thr = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY_INV) n, _, stats, _ = cv2.connectedComponentsWithStats(thr, connectivity=8) boxes = [] for i in range(1, n): x, y, w, h, area = stats[i] if w == 0: continue aspect = h / max(w, 1) if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200: boxes.append((x0_roi + x, y0_roi + y, x0_roi + x + w, y0_roi + y + h)) return sorted(boxes, key=lambda b: b[0]) # ── Stage 3: slot assignment ────────────────────────────────────────── def assign_slots(boxes: list[tuple[int, int, int, int]], img_w: int ) -> list[tuple[int, tuple[int, int, int, int]]]: """Map detected windows to slot indices 0..7. Handles missing slots — e.g. a mid-roll digit whose contrast against its window border collapses — by inferring slot index from inter- detection gaps relative to the smallest gap (which is the true slot-to-slot step).""" if len(boxes) < 4: return [] centers_x = np.array([(b[0] + b[2]) / 2.0 for b in boxes]) order = np.argsort(centers_x) sx = centers_x[order] # Merge near-duplicate detections (motion-blur fragments) groups = [[int(order[0])]] for i in range(1, len(sx)): if sx[i] - centers_x[groups[-1][0]] < 30: groups[-1].append(int(order[i])) else: groups.append([int(order[i])]) rep_idx = [g[0] for g in groups] pairs = sorted(zip(centers_x[rep_idx], rep_idx)) rep_cx = np.array([c for c, _ in pairs]) rep_idx = [i for _, i in pairs] # Drop spatial outliers — keep the longest contiguous run where # consecutive gaps are within 2.5× the smallest gap (the true step). if len(rep_cx) >= 2: gaps = np.diff(rep_cx) step_est = float(np.min(gaps)) in_cluster = gaps <= 2.5 * step_est best_lo, best_hi, best_len = 0, len(rep_cx) - 1, 0 run_lo = 0 for i, ok in enumerate(in_cluster): if not ok: run_len = i - run_lo + 1 if run_len > best_len: best_len, best_lo, best_hi = run_len, run_lo, i run_lo = i + 1 run_len = len(rep_cx) - run_lo if run_len > best_len: best_len, best_lo, best_hi = run_len, run_lo, len(rep_cx) - 1 if best_len >= 4: rep_cx = rep_cx[best_lo:best_hi + 1] rep_idx = rep_idx[best_lo:best_hi + 1] if len(rep_cx) < 2: return [] step = float(np.min(np.diff(rep_cx))) leftmost = float(rep_cx[0]) gaps = np.diff(rep_cx) gap_in_steps = np.round(gaps / step).astype(int) rel_idx = np.concatenate([[0], np.cumsum(gap_in_steps)]) rightmost_rel = int(rel_idx[-1]) # Pick which slot the leftmost detection actually is (0..7-rightmost_rel) # by closeness to the expected canonical position of slot 0. expected_slot0 = img_w * 0.62 - 7 * step best_k0, best_score = None, float('inf') for k0 in range(0, 8 - rightmost_rel): slot0_cx = leftmost - k0 * step score = abs(slot0_cx - expected_slot0) if score < best_score: best_score, best_k0 = score, k0 if best_k0 is None: return [] out = [] for i, b_idx in enumerate(rep_idx): slot_k = best_k0 + int(rel_idx[i]) if 0 <= slot_k <= 7: out.append((int(slot_k), boxes[b_idx])) return out # ── Stage 4: affine fit ─────────────────────────────────────────────── def fit_affine_centers(slot_boxes ) -> tuple[np.ndarray | None, float | None, int]: """Partial-affine (rotation + uniform scale + translation) from detected window centers to their canonical positions. Returns `(M_3x3, mean_residual_px, n_used)` — `M` is in homography shape so callers can use `cv2.warpPerspective` uniformly.""" if len(slot_boxes) < 3: return None, None, len(slot_boxes) src = np.array([[(b[0]+b[2])/2.0, (b[1]+b[3])/2.0] for _, b in slot_boxes], dtype=np.float32) dst = np.array([[STRIP_X0 + k*STEP + WIN_W/2.0, STRIP_Y0 + WIN_H/2.0] for k, _ in slot_boxes], dtype=np.float32) M, _ = cv2.estimateAffinePartial2D(src, dst, method=cv2.RANSAC, ransacReprojThreshold=2.0) if M is None: return None, None, len(slot_boxes) M3 = np.vstack([M, [0, 0, 1]]).astype(np.float32) proj = cv2.transform(src.reshape(-1, 1, 2), M).reshape(-1, 2) residuals = np.linalg.norm(proj - dst, axis=1) return M3, float(np.mean(residuals)), len(slot_boxes) # ── End-to-end ──────────────────────────────────────────────────────── def rectify(img_bgr: np.ndarray, max_residual_px: float = 5.0, min_windows: int = 6 ) -> tuple[np.ndarray, dict] | tuple[None, dict]: """Run the full pipeline on a 1920×1080 BGR frame. Returns `(tight_bgr, info)` on success, where `tight_bgr` is the (175, 736, 3) BGR strip crop, or `(None, info)` on failure. `info` always includes `deskew_angle`, `n_windows`, and the residual / failure reason.""" img_lvl, angle = deskew(img_bgr) info: dict = {'deskew_angle': float(angle)} boxes = detect_digit_windows(img_lvl) if len(boxes) < 6: boxes = detect_digit_windows(img_lvl, use_otsu=True) info['n_windows'] = len(boxes) if len(boxes) < 4: info['error'] = 'too few digit windows detected' return None, info slot_boxes = assign_slots(boxes, img_w=img_lvl.shape[1]) if len(slot_boxes) < min_windows: info['error'] = f'only {len(slot_boxes)} slot assignments' return None, info H_mat, mean_resid, n_used = fit_affine_centers(slot_boxes) info.update({'mean_residual_px': mean_resid, 'n_used': n_used}) if H_mat is None or mean_resid is None or mean_resid > max_residual_px: info['error'] = 'affine fit too noisy' return None, info T = np.array([ [1.0, 0.0, -(STRIP_X0 - TIGHT_PAD_X)], [0.0, 1.0, -(STRIP_Y0 - TIGHT_PAD_Y)], [0.0, 0.0, 1.0], ], dtype=np.float32) M_direct = (T @ H_mat).astype(np.float32) tight = cv2.warpPerspective(img_lvl, M_direct, (TIGHT_W, TIGHT_H), flags=cv2.INTER_LANCZOS4) return tight, info def tight_to_slots(tight_bgr: np.ndarray) -> list[np.ndarray]: """Cut a (175, 736, 3) tight strip into 8 × (105, 86, 3) BGR slot crops, slot-0 first.""" out = [] y0, y1 = TIGHT_PAD_Y, TIGHT_PAD_Y + SLOT_H for s in range(8): x0 = TIGHT_PAD_X + s * STEP x1 = x0 + SLOT_W out.append(tight_bgr[y0:y1, x0:x1].copy()) return out