| |
| """ |
| cr_parser.py β Parse a CR DOCX's tracked changes into a JSON manifest. |
| |
| Each entry in the manifest is one of: |
| {"type": "text_replace", "location": {...}, "old": "...", "new": "..."} |
| {"type": "para_insert", "location": {...}, "paragraphs": [...]} |
| {"type": "row_insert", "location": {...}, "cells": [...]} |
| |
| Usage: |
| python3 cr_parser.py <cr.docx> [--output manifest.json] |
| # or import: from cr_parser import parse_cr |
| """ |
|
|
| import argparse |
| import json |
| import re |
| import sys |
| from pathlib import Path |
|
|
| import docx |
| from docx.oxml.ns import qn |
|
|
|
|
| |
|
|
| def _del_text(elem): |
| """Concatenate all w:delText descendants.""" |
| return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:delText'))) |
|
|
| def _ins_text(elem): |
| """Concatenate all w:t descendants (inside w:ins).""" |
| return ''.join(t.text or '' for t in elem.findall('.//' + qn('w:t'))) |
|
|
| def _para_new_text(p_elem): |
| """Text of a paragraph after accepting tracked changes (ins included, del excluded).""" |
| return ''.join(t.text or '' for t in p_elem.findall('.//' + qn('w:t'))) |
|
|
| def _para_orig_text(p_elem): |
| """Text of a paragraph as it exists in the TS (del included, ins excluded).""" |
| parts = [] |
| for node in p_elem.iter(): |
| if node.tag == qn('w:delText') and node.text: |
| parts.append(node.text) |
| elif node.tag == qn('w:t') and node.text: |
| |
| if not any(a.tag == qn('w:ins') for a in node.iterancestors()): |
| parts.append(node.text) |
| return ''.join(parts) |
|
|
| def _style_val(p_elem): |
| pPr = p_elem.find(qn('w:pPr')) |
| if pPr is None: |
| return None |
| pStyle = pPr.find(qn('w:pStyle')) |
| if pStyle is None: |
| return None |
| return pStyle.get(qn('w:val')) |
|
|
|
|
| _HEADING_NUM_RE = re.compile(r'^(\d+(?:\.\d+)*)\s+\S') |
| _SKIP_MARKER_RE = re.compile(r'^[\[\(]?\s*(?:\.{3}|β¦)\s*[\]\)]?$') |
|
|
|
|
| def _para_text_with_tabs(p_elem): |
| """Paragraph text with w:tab elements rendered as '\\t'. |
| Used for heading detection since ETSI headings store the number and title in |
| separate runs separated by <w:tab/>, which _para_orig_text would drop.""" |
| parts = [] |
| for node in p_elem.iter(): |
| if node.tag == qn('w:t') and node.text: |
| if not any(a.tag == qn('w:ins') for a in node.iterancestors()): |
| parts.append(node.text) |
| elif node.tag == qn('w:delText') and node.text: |
| parts.append(node.text) |
| elif node.tag == qn('w:tab'): |
| parts.append('\t') |
| return ''.join(parts) |
|
|
|
|
| def _heading_number(p_elem): |
| """Return dotted section number if this paragraph is a numbered heading, else None. |
| Requires the paragraph style to start with 'Heading' (case-insensitive) β this |
| prevents false positives from body paragraphs whose text starts with a digit, |
| notably bit-description lines like "1 = alphabet set." (style B30) that appear |
| in Terminal Profile sections.""" |
| style = (_style_val(p_elem) or '').lower() |
| if not style.startswith('heading'): |
| return None |
| text = _para_text_with_tabs(p_elem).strip() |
| m = _HEADING_NUM_RE.match(text) |
| return m.group(1) if m else None |
|
|
|
|
| def _is_skip_marker(text): |
| """True for [...] / [β¦] / ... / β¦ / (...) / (β¦) after .strip().""" |
| return bool(_SKIP_MARKER_RE.match(text.strip())) |
|
|
|
|
| def _is_rpr_ins(ins_elem): |
| """True if w:ins is inside w:rPr β a formatting change, not a content insertion.""" |
| p = ins_elem.getparent() |
| return p is not None and p.tag == qn('w:rPr') |
|
|
| def _is_inserted_para(p_elem): |
| """True if this paragraph's paragraph-mark is tracked as inserted (whole new para).""" |
| pPr = p_elem.find(qn('w:pPr')) |
| if pPr is None: |
| return False |
| rPr = pPr.find(qn('w:rPr')) |
| if rPr is None: |
| return False |
| return rPr.find(qn('w:ins')) is not None |
|
|
| def _is_deleted_para(p_elem): |
| """True if this paragraph's paragraph-mark is tracked as deleted (whole para deleted).""" |
| pPr = p_elem.find(qn('w:pPr')) |
| if pPr is None: |
| return False |
| rPr = pPr.find(qn('w:rPr')) |
| if rPr is None: |
| return False |
| return rPr.find(qn('w:del')) is not None |
|
|
| def _is_fully_deleted_tbl(tbl_elem): |
| """True if every row in the table is tracked as a row-level deletion.""" |
| rows = tbl_elem.findall(qn('w:tr')) |
| if not rows: |
| return False |
| return all( |
| tr.find(qn('w:trPr')) is not None and |
| tr.find(qn('w:trPr')).find(qn('w:del')) is not None |
| for tr in rows |
| ) |
|
|
| def _is_fully_inserted_tbl(tbl_elem): |
| """True if every row in the table is tracked as a row-level insertion.""" |
| rows = tbl_elem.findall(qn('w:tr')) |
| if not rows: |
| return False |
| return all( |
| tr.find(qn('w:trPr')) is not None and |
| tr.find(qn('w:trPr')).find(qn('w:ins')) is not None |
| for tr in rows |
| ) |
|
|
|
|
| |
|
|
| def _table_header(tbl_elem): |
| """First row cell texts β used as table identifier.""" |
| first_tr = tbl_elem.find(qn('w:tr')) |
| if first_tr is None: |
| return [] |
| cells = [] |
| for tc in first_tr.findall(qn('w:tc')): |
| p = tc.find('.//' + qn('w:p')) |
| cells.append(_para_new_text(p).strip() if p is not None else '') |
| return cells |
|
|
| def _row_col0(tr_elem): |
| """Col-0 text of a table row β used as row anchor.""" |
| tc = tr_elem.find(qn('w:tc')) |
| if tc is None: |
| return '' |
| p = tc.find('.//' + qn('w:p')) |
| return _para_new_text(p).strip() if p is not None else '' |
|
|
|
|
| |
|
|
| def _extract_inline_replacements(p_elem): |
| """ |
| Return list of (old_text, new_text) pairs from del+ins sibling pairs. |
| Handles: del-then-ins, ins-then-del, multi-fragment consecutive dels. |
| Filters: whitespace-only dels with no adjacent ins, empty dels, rPr ins. |
| """ |
| children = list(p_elem) |
| pairs = [] |
| skip = set() |
|
|
| for i, child in enumerate(children): |
| if i in skip: |
| continue |
|
|
| if child.tag != qn('w:del'): |
| continue |
|
|
| old_text = _del_text(child) |
|
|
| |
| if not old_text: |
| skip.add(i) |
| continue |
|
|
| |
| j = i + 1 |
| while j < len(children) and children[j].tag == qn('w:del'): |
| old_text += _del_text(children[j]) |
| skip.add(j) |
| j += 1 |
|
|
| |
| next_sib = children[j] if j < len(children) else None |
| prev_sib = children[i - 1] if i > 0 else None |
|
|
| new_text = None |
| if next_sib is not None and next_sib.tag == qn('w:ins') and not _is_rpr_ins(next_sib): |
| new_text = _ins_text(next_sib) |
| skip.add(j) |
| elif prev_sib is not None and prev_sib.tag == qn('w:ins') and not _is_rpr_ins(prev_sib): |
| new_text = _ins_text(prev_sib) |
|
|
| if new_text is None: |
| if not old_text.strip(): |
| skip.add(i) |
| continue |
| |
| pairs.append((old_text, '')) |
| else: |
| pairs.append((old_text, new_text)) |
|
|
| return pairs |
|
|
|
|
| |
|
|
| def _parse_table(tbl_elem, changes, section_heading='', section_number=''): |
| header = _table_header(tbl_elem) |
| header_key = header[:3] |
| rows = tbl_elem.findall(qn('w:tr')) |
|
|
| for tr_idx, tr in enumerate(rows): |
| trPr = tr.find(qn('w:trPr')) |
|
|
| |
| if trPr is not None and trPr.find(qn('w:ins')) is not None: |
| |
| stable_before = [] |
| for prev_idx in range(tr_idx - 1, -1, -1): |
| prev_tr = rows[prev_idx] |
| prev_trPr = prev_tr.find(qn('w:trPr')) |
| if prev_trPr is None or prev_trPr.find(qn('w:ins')) is None: |
| stable_before.append(_row_col0(prev_tr)) |
| if len(stable_before) >= 3: |
| break |
|
|
| after_anchor = stable_before[0] if stable_before else '' |
| context_rows_before = stable_before[1:] |
|
|
| cells = [] |
| for tc in tr.findall(qn('w:tc')): |
| tcPr = tc.find(qn('w:tcPr')) |
|
|
| |
| width = None |
| if tcPr is not None: |
| tcW = tcPr.find(qn('w:tcW')) |
| if tcW is not None: |
| try: |
| width = int(tcW.get(qn('w:w'), 0)) |
| except (ValueError, TypeError): |
| width = None |
|
|
| |
| is_vmerge = False |
| if tcPr is not None: |
| vm = tcPr.find(qn('w:vMerge')) |
| if vm is not None and vm.get(qn('w:val')) is None: |
| is_vmerge = True |
|
|
| |
| cell_ins_text = _ins_text(tc) |
| p = tc.find('.//' + qn('w:p')) |
| cell_text = cell_ins_text if cell_ins_text else (_para_new_text(p) if p else '') |
| style = _style_val(p) if p is not None else None |
|
|
| cells.append({ |
| 'text': cell_text.strip(), |
| 'width': width, |
| 'vmerge': is_vmerge, |
| 'style': style, |
| }) |
|
|
| changes.append({ |
| 'type': 'row_insert', |
| 'location': { |
| 'kind': 'table_row', |
| 'table_header': header_key, |
| 'after_row_anchor': after_anchor, |
| 'context_rows_before': context_rows_before, |
| 'section_heading': section_heading, |
| 'section_number': section_number, |
| }, |
| 'cells': cells, |
| }) |
| continue |
|
|
| |
| row_anchor = _row_col0(tr) |
| tcs = tr.findall(qn('w:tc')) |
| for col_idx, tc in enumerate(tcs): |
| for p in tc.findall('.//' + qn('w:p')): |
| for old_text, new_text in _extract_inline_replacements(p): |
| if not old_text: |
| continue |
| changes.append({ |
| 'type': 'text_replace', |
| 'location': { |
| 'kind': 'table_cell', |
| 'table_header': header_key, |
| 'row_anchor': row_anchor, |
| 'col_idx': col_idx, |
| 'section_heading': section_heading, |
| 'section_number': section_number, |
| }, |
| 'old': old_text, |
| 'new': new_text, |
| }) |
|
|
|
|
| |
|
|
| def _parse_body(body, changes): |
| """ |
| Walk direct children of w:body, emitting changes. |
| |
| Change types emitted: |
| section_replace β a contiguous block of fully-deleted elements (para and/or |
| table, tracked at the paragraph-mark / row level) followed |
| immediately by a contiguous block of fully-inserted elements. |
| The raw XML of ALL those CR elements is stored verbatim so |
| the applicator can transplant them directly into the TS β |
| exactly what Word does on a copy-paste. |
| text_replace β an inline del+ins pair inside an otherwise-stable paragraph. |
| para_insert β one or more wholly-new paragraphs with no corresponding |
| deletion (rare; kept for backward compatibility). |
| """ |
| from lxml import etree |
|
|
| prev_stable_text = '' |
| current_section = '' |
|
|
| |
| sec_del = [] |
| sec_sep = [] |
| sec_ins = [] |
| sec_state = 'stable' |
| sec_anchor = '' |
|
|
| def flush_section(): |
| nonlocal sec_state, sec_anchor, prev_stable_text |
| if not sec_del and not sec_ins: |
| sec_del.clear(); sec_sep.clear(); sec_ins.clear() |
| sec_state = 'stable' |
| return |
| |
| del_heading = '' |
| for e in sec_del: |
| tag = e.tag.split('}')[-1] if '}' in e.tag else e.tag |
| if tag == 'p': |
| t = _del_text(e).strip() or _para_orig_text(e).strip() |
| if t: |
| del_heading = t |
| break |
| |
| if not del_heading: |
| for e in sec_del: |
| tag = e.tag.split('}')[-1] if '}' in e.tag else e.tag |
| if tag == 'tbl': |
| first_tc = e.find('.//' + qn('w:tc')) |
| if first_tc is not None: |
| p = first_tc.find('.//' + qn('w:p')) |
| del_heading = (_para_new_text(p) if p is not None |
| else _para_new_text(first_tc)).strip() |
| break |
| |
| all_elems = sec_del + sec_sep + sec_ins |
| elements_xml = [etree.tostring(e, encoding='unicode') for e in all_elems] |
| has_del_table = any( |
| (e.tag.split('}')[-1] if '}' in e.tag else e.tag) == 'tbl' |
| for e in sec_del |
| ) |
| changes.append({ |
| 'type': 'section_replace', |
| 'location': { |
| 'kind': 'body', |
| 'del_heading': del_heading, |
| 'has_del_table': has_del_table, |
| 'anchor_text': sec_anchor, |
| 'section_number': current_section, |
| }, |
| 'elements_xml': elements_xml, |
| }) |
| |
| if sec_ins: |
| last_p = next((e for e in reversed(sec_ins) |
| if (e.tag.split('}')[-1] if '}' in e.tag else e.tag) == 'p'), None) |
| if last_p is not None: |
| candidate = _para_new_text(last_p).strip() |
| if candidate: |
| prev_stable_text = candidate |
| sec_del.clear(); sec_sep.clear(); sec_ins.clear() |
| sec_state = 'stable' |
|
|
| |
| insert_group = [] |
|
|
| def flush_group(): |
| if not insert_group: |
| return |
| paras = [ |
| {'text': _para_new_text(p).strip(), 'style': _style_val(p)} |
| for p in insert_group |
| ] |
| paras = [p for p in paras if p['text'] or p['style']] |
| if paras: |
| changes.append({ |
| 'type': 'para_insert', |
| 'location': { |
| 'kind': 'body', |
| 'anchor_text': prev_stable_text, |
| 'section_number': current_section, |
| }, |
| 'paragraphs': paras, |
| }) |
| insert_group.clear() |
|
|
| for elem in body: |
| tag = elem.tag.split('}')[-1] if '}' in elem.tag else elem.tag |
|
|
| if tag == 'p': |
| hn = _heading_number(elem) |
| if hn: |
| current_section = hn |
|
|
| is_del = _is_deleted_para(elem) |
| is_ins = _is_inserted_para(elem) |
| is_empty = not _para_orig_text(elem).strip() and not _para_new_text(elem).strip() |
|
|
| if is_del: |
| |
| if sec_state == 'ins': |
| flush_section() |
| if sec_state == 'stable': |
| flush_group() |
| sec_anchor = prev_stable_text |
| sec_state = 'del' |
| sec_del.append(elem) |
|
|
| elif is_ins: |
| if sec_state in ('del', 'sep'): |
| |
| sec_state = 'ins' |
| sec_ins.append(elem) |
| elif sec_state == 'ins': |
| sec_ins.append(elem) |
| else: |
| |
| flush_group() |
| insert_group.append(elem) |
|
|
| elif is_empty: |
| if sec_state == 'del': |
| |
| sec_state = 'sep' |
| sec_sep.append(elem) |
| elif sec_state in ('sep', 'ins'): |
| sec_ins.append(elem) |
| else: |
| |
| pass |
|
|
| else: |
| |
| flush_section() |
| flush_group() |
|
|
| for old_text, new_text in _extract_inline_replacements(elem): |
| if not old_text: |
| continue |
| changes.append({ |
| 'type': 'text_replace', |
| 'location': { |
| 'kind': 'body_para', |
| 'para_context': _para_orig_text(elem).strip(), |
| 'section_number': current_section, |
| }, |
| 'old': old_text, |
| 'new': new_text, |
| }) |
|
|
| orig = _para_orig_text(elem).strip() |
| if orig and not _is_skip_marker(orig): |
| prev_stable_text = orig |
|
|
| elif tag == 'tbl': |
| if _is_fully_deleted_tbl(elem): |
| if sec_state == 'ins': |
| flush_section() |
| if sec_state == 'stable': |
| flush_group() |
| sec_anchor = prev_stable_text |
| sec_state = 'del' |
| sec_del.append(elem) |
|
|
| elif _is_fully_inserted_tbl(elem): |
| if sec_state in ('del', 'sep', 'ins'): |
| sec_state = 'ins' |
| sec_ins.append(elem) |
| else: |
| |
| flush_group() |
| sec_anchor = prev_stable_text |
| sec_state = 'ins' |
| sec_ins.append(elem) |
|
|
| else: |
| |
| flush_section() |
| flush_group() |
| _parse_table(elem, changes, section_heading=prev_stable_text, |
| section_number=current_section) |
|
|
| flush_section() |
| flush_group() |
|
|
|
|
| |
|
|
| def parse_cr(cr_path, output_json=None): |
| """ |
| Parse all tracked changes in a CR DOCX. |
| Returns list of change dicts. Optionally saves to JSON. |
| """ |
| doc = docx.Document(str(cr_path)) |
| body = doc.element.body |
| changes = [] |
| _parse_body(body, changes) |
|
|
| if output_json: |
| Path(output_json).write_text( |
| json.dumps(changes, indent=2, ensure_ascii=False), encoding='utf-8' |
| ) |
| return changes |
|
|
|
|
| |
|
|
| def main(): |
| ap = argparse.ArgumentParser(description='Parse CR DOCX tracked changes into JSON manifest.') |
| ap.add_argument('cr_docx', help='CR DOCX file path') |
| ap.add_argument('--output', default=None, help='Output JSON path (default: print to stdout)') |
| args = ap.parse_args() |
|
|
| changes = parse_cr(args.cr_docx, output_json=args.output) |
|
|
| if args.output: |
| print(f'Wrote {len(changes)} change(s) β {args.output}') |
| else: |
| print(json.dumps(changes, indent=2, ensure_ascii=False)) |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|