badger-55-meterreader / rectifier.py
S3CUR's picture
Initial release: badger-55 meter reader
3800bd2 verified
"""Rectifier for full-meter images of a Badger Model 55 water meter.
A raw camera frame of the meter face is tilted (the camera doesn't sit
perfectly square to the meter) and the digit strip occupies a small
fraction of the image. This module takes a 1920Γ—1080 BGR frame and
produces eight 86Γ—105 BGR slot crops, one per digit drum, axis-aligned
and at canonical scale.
The pipeline:
1. Deskew β€” find the digit-strip tilt angle and rotate so the
strip is horizontal.
2. Detect β€” segment the dark digit-window borders against the
bright meter face and find their bounding boxes.
3. Assign β€” figure out which detected window corresponds to
which slot index (0..7), handling missing detections.
4. Fit β€” solve a partial-affine (rotation + uniform scale +
translation) from detected window centers to their
canonical positions.
5. Warp β€” apply the affine + a translation that maps the
strip directly into a tight (175, 736) crop.
6. Slice β€” cut the tight crop into 8 Γ— (105, 86) slot crops.
All geometric constants below were calibrated for the source meter and
camera used to build the published dataset. They are pixel coordinates,
not anything more interesting.
"""
from __future__ import annotations
import cv2
import numpy as np
# ── Canonical strip layout ────────────────────────────────────────────
CANONICAL_W, CANONICAL_H = 1920, 1080
WIN_W, WIN_H = 80, 105 # nominal digit-window size, px
STEP = 86 # horizontal spacing between slots
STRIP_X0 = 580 # canonical x of slot-0 left edge
STRIP_Y0 = 480 # canonical y of strip top
SLOT_W, SLOT_H = STEP, WIN_H # per-slot crop dims (86, 105)
TIGHT_PAD_X = 0
TIGHT_PAD_Y = 0
TIGHT_H = WIN_H + 2 * TIGHT_PAD_Y # 175
TIGHT_W = 8 * STEP + 2 * TIGHT_PAD_X # 736
# ── Stage 1: deskew ───────────────────────────────────────────────────
def detect_rotation_degrees(img_bgr: np.ndarray, max_abs_deg: float = 20.0) -> float:
"""Estimate the strip's tilt by finding digit-window centroids and
fitting a line through them. Returns degrees-clockwise to rotate the
image to level. Falls back to 0Β° if fewer than 4 windows are found."""
H, W = img_bgr.shape[:2]
y0, y1 = int(H * 0.36), int(H * 0.47)
x0, x1 = int(W * 0.25), int(W * 0.75)
roi = img_bgr[y0:y1, x0:x1]
if roi.size == 0:
return 0.0
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
_, thr = cv2.threshold(gray, 60, 255, cv2.THRESH_BINARY_INV)
n, _, stats, centroids = cv2.connectedComponentsWithStats(thr, connectivity=8)
if n < 5:
return 0.0
cx_list, cy_list = [], []
for i in range(1, n):
x, y, w, h, area = stats[i]
if w == 0: continue
aspect = h / max(w, 1)
if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200:
cx_list.append(float(centroids[i][0]))
cy_list.append(float(centroids[i][1]))
if len(cx_list) < 4:
return 0.0
cx = np.array(cx_list); cy = np.array(cy_list)
A = np.vstack([cx, np.ones_like(cx)]).T
slope, _ = np.linalg.lstsq(A, cy, rcond=None)[0]
angle = float(np.degrees(np.arctan(slope)))
return 0.0 if abs(angle) > max_abs_deg else angle
def deskew(img_bgr: np.ndarray) -> tuple[np.ndarray, float]:
"""Rotate `img_bgr` so the digit strip is horizontal. Returns
`(leveled_image, angle_applied_deg)`."""
angle = detect_rotation_degrees(img_bgr)
H, W = img_bgr.shape[:2]
M = cv2.getRotationMatrix2D((W / 2.0, H / 2.0), angle, 1.0)
rotated = cv2.warpAffine(img_bgr, M, (W, H), flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
return rotated, angle
# ── Stage 2: detect digit windows ─────────────────────────────────────
def detect_digit_windows(img_bgr: np.ndarray, threshold: int = 60,
use_otsu: bool = False) -> list[tuple[int, int, int, int]]:
"""Find dark digit-window rectangles against the bright meter face.
Apply *after* `deskew`. Returns `[(x0, y0, x1, y1), ...]` in image
coordinates, left to right. The threshold default works for the
dataset's source-camera exposure; pass `use_otsu=True` for re-warped
or off-camera images where the default misses borders."""
H, W = img_bgr.shape[:2]
y0_roi, y1_roi = int(H * 0.34), int(H * 0.49)
x0_roi, x1_roi = int(W * 0.20), int(W * 0.80)
roi = img_bgr[y0_roi:y1_roi, x0_roi:x1_roi]
if roi.size == 0: return []
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
if use_otsu:
_, thr = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
else:
_, thr = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY_INV)
n, _, stats, _ = cv2.connectedComponentsWithStats(thr, connectivity=8)
boxes = []
for i in range(1, n):
x, y, w, h, area = stats[i]
if w == 0: continue
aspect = h / max(w, 1)
if 30 <= w <= 90 and 50 <= h <= 110 and 1.3 <= aspect <= 3.0 and area >= 200:
boxes.append((x0_roi + x, y0_roi + y,
x0_roi + x + w, y0_roi + y + h))
return sorted(boxes, key=lambda b: b[0])
# ── Stage 3: slot assignment ──────────────────────────────────────────
def assign_slots(boxes: list[tuple[int, int, int, int]], img_w: int
) -> list[tuple[int, tuple[int, int, int, int]]]:
"""Map detected windows to slot indices 0..7.
Handles missing slots β€” e.g. a mid-roll digit whose contrast against
its window border collapses β€” by inferring slot index from inter-
detection gaps relative to the smallest gap (which is the true
slot-to-slot step)."""
if len(boxes) < 4: return []
centers_x = np.array([(b[0] + b[2]) / 2.0 for b in boxes])
order = np.argsort(centers_x)
sx = centers_x[order]
# Merge near-duplicate detections (motion-blur fragments)
groups = [[int(order[0])]]
for i in range(1, len(sx)):
if sx[i] - centers_x[groups[-1][0]] < 30:
groups[-1].append(int(order[i]))
else:
groups.append([int(order[i])])
rep_idx = [g[0] for g in groups]
pairs = sorted(zip(centers_x[rep_idx], rep_idx))
rep_cx = np.array([c for c, _ in pairs])
rep_idx = [i for _, i in pairs]
# Drop spatial outliers β€” keep the longest contiguous run where
# consecutive gaps are within 2.5Γ— the smallest gap (the true step).
if len(rep_cx) >= 2:
gaps = np.diff(rep_cx)
step_est = float(np.min(gaps))
in_cluster = gaps <= 2.5 * step_est
best_lo, best_hi, best_len = 0, len(rep_cx) - 1, 0
run_lo = 0
for i, ok in enumerate(in_cluster):
if not ok:
run_len = i - run_lo + 1
if run_len > best_len:
best_len, best_lo, best_hi = run_len, run_lo, i
run_lo = i + 1
run_len = len(rep_cx) - run_lo
if run_len > best_len:
best_len, best_lo, best_hi = run_len, run_lo, len(rep_cx) - 1
if best_len >= 4:
rep_cx = rep_cx[best_lo:best_hi + 1]
rep_idx = rep_idx[best_lo:best_hi + 1]
if len(rep_cx) < 2: return []
step = float(np.min(np.diff(rep_cx)))
leftmost = float(rep_cx[0])
gaps = np.diff(rep_cx)
gap_in_steps = np.round(gaps / step).astype(int)
rel_idx = np.concatenate([[0], np.cumsum(gap_in_steps)])
rightmost_rel = int(rel_idx[-1])
# Pick which slot the leftmost detection actually is (0..7-rightmost_rel)
# by closeness to the expected canonical position of slot 0.
expected_slot0 = img_w * 0.62 - 7 * step
best_k0, best_score = None, float('inf')
for k0 in range(0, 8 - rightmost_rel):
slot0_cx = leftmost - k0 * step
score = abs(slot0_cx - expected_slot0)
if score < best_score:
best_score, best_k0 = score, k0
if best_k0 is None: return []
out = []
for i, b_idx in enumerate(rep_idx):
slot_k = best_k0 + int(rel_idx[i])
if 0 <= slot_k <= 7:
out.append((int(slot_k), boxes[b_idx]))
return out
# ── Stage 4: affine fit ───────────────────────────────────────────────
def fit_affine_centers(slot_boxes
) -> tuple[np.ndarray | None, float | None, int]:
"""Partial-affine (rotation + uniform scale + translation) from
detected window centers to their canonical positions. Returns
`(M_3x3, mean_residual_px, n_used)` β€” `M` is in homography shape so
callers can use `cv2.warpPerspective` uniformly."""
if len(slot_boxes) < 3:
return None, None, len(slot_boxes)
src = np.array([[(b[0]+b[2])/2.0, (b[1]+b[3])/2.0] for _, b in slot_boxes],
dtype=np.float32)
dst = np.array([[STRIP_X0 + k*STEP + WIN_W/2.0, STRIP_Y0 + WIN_H/2.0]
for k, _ in slot_boxes], dtype=np.float32)
M, _ = cv2.estimateAffinePartial2D(src, dst, method=cv2.RANSAC,
ransacReprojThreshold=2.0)
if M is None:
return None, None, len(slot_boxes)
M3 = np.vstack([M, [0, 0, 1]]).astype(np.float32)
proj = cv2.transform(src.reshape(-1, 1, 2), M).reshape(-1, 2)
residuals = np.linalg.norm(proj - dst, axis=1)
return M3, float(np.mean(residuals)), len(slot_boxes)
# ── End-to-end ────────────────────────────────────────────────────────
def rectify(img_bgr: np.ndarray, max_residual_px: float = 5.0,
min_windows: int = 6
) -> tuple[np.ndarray, dict] | tuple[None, dict]:
"""Run the full pipeline on a 1920Γ—1080 BGR frame.
Returns `(tight_bgr, info)` on success, where `tight_bgr` is the
(175, 736, 3) BGR strip crop, or `(None, info)` on failure. `info`
always includes `deskew_angle`, `n_windows`, and the residual /
failure reason."""
img_lvl, angle = deskew(img_bgr)
info: dict = {'deskew_angle': float(angle)}
boxes = detect_digit_windows(img_lvl)
if len(boxes) < 6:
boxes = detect_digit_windows(img_lvl, use_otsu=True)
info['n_windows'] = len(boxes)
if len(boxes) < 4:
info['error'] = 'too few digit windows detected'
return None, info
slot_boxes = assign_slots(boxes, img_w=img_lvl.shape[1])
if len(slot_boxes) < min_windows:
info['error'] = f'only {len(slot_boxes)} slot assignments'
return None, info
H_mat, mean_resid, n_used = fit_affine_centers(slot_boxes)
info.update({'mean_residual_px': mean_resid, 'n_used': n_used})
if H_mat is None or mean_resid is None or mean_resid > max_residual_px:
info['error'] = 'affine fit too noisy'
return None, info
T = np.array([
[1.0, 0.0, -(STRIP_X0 - TIGHT_PAD_X)],
[0.0, 1.0, -(STRIP_Y0 - TIGHT_PAD_Y)],
[0.0, 0.0, 1.0],
], dtype=np.float32)
M_direct = (T @ H_mat).astype(np.float32)
tight = cv2.warpPerspective(img_lvl, M_direct, (TIGHT_W, TIGHT_H),
flags=cv2.INTER_LANCZOS4)
return tight, info
def tight_to_slots(tight_bgr: np.ndarray) -> list[np.ndarray]:
"""Cut a (175, 736, 3) tight strip into 8 Γ— (105, 86, 3) BGR slot
crops, slot-0 first."""
out = []
y0, y1 = TIGHT_PAD_Y, TIGHT_PAD_Y + SLOT_H
for s in range(8):
x0 = TIGHT_PAD_X + s * STEP
x1 = x0 + SLOT_W
out.append(tight_bgr[y0:y1, x0:x1].copy())
return out