| """Rectifier for full-meter images of a Badger Model 55 water meter. |
| |
| A raw camera frame of the meter face is tilted (the camera doesn't sit |
| perfectly square to the meter) and the digit strip occupies a small |
| fraction of the image. This module takes a 1920Γ1080 BGR frame and |
| produces eight 86Γ105 BGR slot crops, one per digit drum, axis-aligned |
| and at canonical scale. |
| |
| The pipeline: |
| |
| 1. Deskew β find the digit-strip tilt angle and rotate so the |
| strip is horizontal. |
| 2. Detect β segment the dark digit-window borders against the |
| bright meter face and find their bounding boxes. |
| 3. Assign β figure out which detected window corresponds to |
| which slot index (0..7), handling missing detections. |
| 4. Fit β solve a partial-affine (rotation + uniform scale + |
| translation) from detected window centers to their |
| canonical positions. |
| 5. Warp β apply the affine + a translation that maps the |
| strip directly into a tight (175, 736) crop. |
| 6. Slice β cut the tight crop into 8 Γ (105, 86) slot crops. |
| |
| All geometric constants below were calibrated for the source meter and |
| camera used to build the published dataset. They are pixel coordinates, |
| not anything more interesting. |
| """ |
| from __future__ import annotations |
|
|
| import cv2 |
| import numpy as np |
|
|
|
|
| |
| CANONICAL_W, CANONICAL_H = 1920, 1080 |
| WIN_W, WIN_H = 80, 105 |
| STEP = 86 |
| STRIP_X0 = 580 |
| STRIP_Y0 = 480 |
| SLOT_W, SLOT_H = STEP, WIN_H |
| TIGHT_PAD_X = 0 |
| TIGHT_PAD_Y = 0 |
| TIGHT_H = WIN_H + 2 * TIGHT_PAD_Y |
| TIGHT_W = 8 * STEP + 2 * TIGHT_PAD_X |
|
|
|
|
| |
| def detect_rotation_degrees(img_bgr: np.ndarray, max_abs_deg: float = 20.0) -> float: |
| """Estimate the strip's tilt by finding digit-window centroids and |
| fitting a line through them. Returns degrees-clockwise to rotate the |
| image to level. Falls back to 0Β° if fewer than 4 windows are found.""" |
| H, W = img_bgr.shape[:2] |
| y0, y1 = int(H * 0.36), int(H * 0.47) |
| x0, x1 = int(W * 0.25), int(W * 0.75) |
| roi = img_bgr[y0:y1, x0:x1] |
| if roi.size == 0: |
| return 0.0 |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) |
| _, thr = cv2.threshold(gray, 60, 255, cv2.THRESH_BINARY_INV) |
| n, _, stats, centroids = cv2.connectedComponentsWithStats(thr, connectivity=8) |
| if n < 5: |
| return 0.0 |
| cx_list, cy_list = [], [] |
| for i in range(1, n): |
| x, y, w, h, area = stats[i] |
| if w == 0: continue |
| aspect = h / max(w, 1) |
| if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200: |
| cx_list.append(float(centroids[i][0])) |
| cy_list.append(float(centroids[i][1])) |
| if len(cx_list) < 4: |
| return 0.0 |
| cx = np.array(cx_list); cy = np.array(cy_list) |
| A = np.vstack([cx, np.ones_like(cx)]).T |
| slope, _ = np.linalg.lstsq(A, cy, rcond=None)[0] |
| angle = float(np.degrees(np.arctan(slope))) |
| return 0.0 if abs(angle) > max_abs_deg else angle |
|
|
|
|
| def deskew(img_bgr: np.ndarray) -> tuple[np.ndarray, float]: |
| """Rotate `img_bgr` so the digit strip is horizontal. Returns |
| `(leveled_image, angle_applied_deg)`.""" |
| angle = detect_rotation_degrees(img_bgr) |
| H, W = img_bgr.shape[:2] |
| M = cv2.getRotationMatrix2D((W / 2.0, H / 2.0), angle, 1.0) |
| rotated = cv2.warpAffine(img_bgr, M, (W, H), flags=cv2.INTER_LINEAR, |
| borderMode=cv2.BORDER_REPLICATE) |
| return rotated, angle |
|
|
|
|
| |
| def detect_digit_windows(img_bgr: np.ndarray, threshold: int = 60, |
| use_otsu: bool = False) -> list[tuple[int, int, int, int]]: |
| """Find dark digit-window rectangles against the bright meter face. |
| |
| Apply *after* `deskew`. Returns `[(x0, y0, x1, y1), ...]` in image |
| coordinates, left to right. The threshold default works for the |
| dataset's source-camera exposure; pass `use_otsu=True` for re-warped |
| or off-camera images where the default misses borders.""" |
| H, W = img_bgr.shape[:2] |
| y0_roi, y1_roi = int(H * 0.34), int(H * 0.49) |
| x0_roi, x1_roi = int(W * 0.20), int(W * 0.80) |
| roi = img_bgr[y0_roi:y1_roi, x0_roi:x1_roi] |
| if roi.size == 0: return [] |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) |
| if use_otsu: |
| _, thr = cv2.threshold(gray, 0, 255, |
| cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) |
| else: |
| _, thr = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY_INV) |
| n, _, stats, _ = cv2.connectedComponentsWithStats(thr, connectivity=8) |
| boxes = [] |
| for i in range(1, n): |
| x, y, w, h, area = stats[i] |
| if w == 0: continue |
| aspect = h / max(w, 1) |
| if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200: |
| boxes.append((x0_roi + x, y0_roi + y, |
| x0_roi + x + w, y0_roi + y + h)) |
| return sorted(boxes, key=lambda b: b[0]) |
|
|
|
|
| |
| def assign_slots(boxes: list[tuple[int, int, int, int]], img_w: int |
| ) -> list[tuple[int, tuple[int, int, int, int]]]: |
| """Map detected windows to slot indices 0..7. |
| |
| Handles missing slots β e.g. a mid-roll digit whose contrast against |
| its window border collapses β by inferring slot index from inter- |
| detection gaps relative to the smallest gap (which is the true |
| slot-to-slot step).""" |
| if len(boxes) < 4: return [] |
| centers_x = np.array([(b[0] + b[2]) / 2.0 for b in boxes]) |
| order = np.argsort(centers_x) |
| sx = centers_x[order] |
|
|
| |
| groups = [[int(order[0])]] |
| for i in range(1, len(sx)): |
| if sx[i] - centers_x[groups[-1][0]] < 30: |
| groups[-1].append(int(order[i])) |
| else: |
| groups.append([int(order[i])]) |
| rep_idx = [g[0] for g in groups] |
| pairs = sorted(zip(centers_x[rep_idx], rep_idx)) |
| rep_cx = np.array([c for c, _ in pairs]) |
| rep_idx = [i for _, i in pairs] |
|
|
| |
| |
| if len(rep_cx) >= 2: |
| gaps = np.diff(rep_cx) |
| step_est = float(np.min(gaps)) |
| in_cluster = gaps <= 2.5 * step_est |
| best_lo, best_hi, best_len = 0, len(rep_cx) - 1, 0 |
| run_lo = 0 |
| for i, ok in enumerate(in_cluster): |
| if not ok: |
| run_len = i - run_lo + 1 |
| if run_len > best_len: |
| best_len, best_lo, best_hi = run_len, run_lo, i |
| run_lo = i + 1 |
| run_len = len(rep_cx) - run_lo |
| if run_len > best_len: |
| best_len, best_lo, best_hi = run_len, run_lo, len(rep_cx) - 1 |
| if best_len >= 4: |
| rep_cx = rep_cx[best_lo:best_hi + 1] |
| rep_idx = rep_idx[best_lo:best_hi + 1] |
|
|
| if len(rep_cx) < 2: return [] |
| step = float(np.min(np.diff(rep_cx))) |
| leftmost = float(rep_cx[0]) |
|
|
| gaps = np.diff(rep_cx) |
| gap_in_steps = np.round(gaps / step).astype(int) |
| rel_idx = np.concatenate([[0], np.cumsum(gap_in_steps)]) |
| rightmost_rel = int(rel_idx[-1]) |
|
|
| |
| |
| expected_slot0 = img_w * 0.62 - 7 * step |
| best_k0, best_score = None, float('inf') |
| for k0 in range(0, 8 - rightmost_rel): |
| slot0_cx = leftmost - k0 * step |
| score = abs(slot0_cx - expected_slot0) |
| if score < best_score: |
| best_score, best_k0 = score, k0 |
| if best_k0 is None: return [] |
|
|
| out = [] |
| for i, b_idx in enumerate(rep_idx): |
| slot_k = best_k0 + int(rel_idx[i]) |
| if 0 <= slot_k <= 7: |
| out.append((int(slot_k), boxes[b_idx])) |
| return out |
|
|
|
|
| |
| def fit_affine_centers(slot_boxes |
| ) -> tuple[np.ndarray | None, float | None, int]: |
| """Partial-affine (rotation + uniform scale + translation) from |
| detected window centers to their canonical positions. Returns |
| `(M_3x3, mean_residual_px, n_used)` β `M` is in homography shape so |
| callers can use `cv2.warpPerspective` uniformly.""" |
| if len(slot_boxes) < 3: |
| return None, None, len(slot_boxes) |
| src = np.array([[(b[0]+b[2])/2.0, (b[1]+b[3])/2.0] for _, b in slot_boxes], |
| dtype=np.float32) |
| dst = np.array([[STRIP_X0 + k*STEP + WIN_W/2.0, STRIP_Y0 + WIN_H/2.0] |
| for k, _ in slot_boxes], dtype=np.float32) |
| M, _ = cv2.estimateAffinePartial2D(src, dst, method=cv2.RANSAC, |
| ransacReprojThreshold=2.0) |
| if M is None: |
| return None, None, len(slot_boxes) |
| M3 = np.vstack([M, [0, 0, 1]]).astype(np.float32) |
| proj = cv2.transform(src.reshape(-1, 1, 2), M).reshape(-1, 2) |
| residuals = np.linalg.norm(proj - dst, axis=1) |
| return M3, float(np.mean(residuals)), len(slot_boxes) |
|
|
|
|
| |
| def rectify(img_bgr: np.ndarray, max_residual_px: float = 5.0, |
| min_windows: int = 6 |
| ) -> tuple[np.ndarray, dict] | tuple[None, dict]: |
| """Run the full pipeline on a 1920Γ1080 BGR frame. |
| |
| Returns `(tight_bgr, info)` on success, where `tight_bgr` is the |
| (175, 736, 3) BGR strip crop, or `(None, info)` on failure. `info` |
| always includes `deskew_angle`, `n_windows`, and the residual / |
| failure reason.""" |
| img_lvl, angle = deskew(img_bgr) |
| info: dict = {'deskew_angle': float(angle)} |
| boxes = detect_digit_windows(img_lvl) |
| if len(boxes) < 6: |
| boxes = detect_digit_windows(img_lvl, use_otsu=True) |
| info['n_windows'] = len(boxes) |
| if len(boxes) < 4: |
| info['error'] = 'too few digit windows detected' |
| return None, info |
| slot_boxes = assign_slots(boxes, img_w=img_lvl.shape[1]) |
| if len(slot_boxes) < min_windows: |
| info['error'] = f'only {len(slot_boxes)} slot assignments' |
| return None, info |
| H_mat, mean_resid, n_used = fit_affine_centers(slot_boxes) |
| info.update({'mean_residual_px': mean_resid, 'n_used': n_used}) |
| if H_mat is None or mean_resid is None or mean_resid > max_residual_px: |
| info['error'] = 'affine fit too noisy' |
| return None, info |
| T = np.array([ |
| [1.0, 0.0, -(STRIP_X0 - TIGHT_PAD_X)], |
| [0.0, 1.0, -(STRIP_Y0 - TIGHT_PAD_Y)], |
| [0.0, 0.0, 1.0], |
| ], dtype=np.float32) |
| M_direct = (T @ H_mat).astype(np.float32) |
| tight = cv2.warpPerspective(img_lvl, M_direct, (TIGHT_W, TIGHT_H), |
| flags=cv2.INTER_LANCZOS4) |
| return tight, info |
|
|
|
|
| def tight_to_slots(tight_bgr: np.ndarray) -> list[np.ndarray]: |
| """Cut a (175, 736, 3) tight strip into 8 Γ (105, 86, 3) BGR slot |
| crops, slot-0 first.""" |
| out = [] |
| y0, y1 = TIGHT_PAD_Y, TIGHT_PAD_Y + SLOT_H |
| for s in range(8): |
| x0 = TIGHT_PAD_X + s * STEP |
| x1 = x0 + SLOT_W |
| out.append(tight_bgr[y0:y1, x0:x1].copy()) |
| return out |
|
|