| import shutil
|
| import bm25s
|
| from bm25s.hf import BM25HF
|
| import threading, re, time, concurrent.futures, requests, os, hashlib, traceback, io, zipfile, subprocess, tempfile, json, fitz
|
| import pandas as pd
|
| import numpy as np
|
|
|
| from bs4 import BeautifulSoup
|
| from datasets import load_dataset, Dataset
|
| from datasets.data_files import EmptyDatasetError
|
| from dotenv import load_dotenv
|
|
|
| load_dotenv()
|
|
|
| class TDocIndexer:
|
| def __init__(self, max_workers=33):
|
| self.indexer_length = 0
|
| self.dataset = "OrganizedProgrammers/3GPPTDocLocation"
|
|
|
| self.indexer = self.load_indexer()
|
| self.main_ftp_url = "https://3gpp.org/ftp"
|
| self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE)
|
| self.max_workers = max_workers
|
|
|
| self.print_lock = threading.Lock()
|
| self.indexer_lock = threading.Lock()
|
|
|
| self.total_indexed = 0
|
| self.processed_count = 0
|
| self.total_count = 0
|
|
|
| def load_indexer(self):
|
| self.indexer_length = 0
|
| all_docs = {}
|
| tdoc_locations = load_dataset(self.dataset)
|
| tdoc_locations = tdoc_locations["train"].to_list()
|
| for doc in tdoc_locations:
|
| self.indexer_length += 1
|
| all_docs[doc["doc_id"]] = doc["url"]
|
|
|
| return all_docs
|
|
|
| def save_indexer(self):
|
| """Save the updated index"""
|
| data = []
|
| for doc_id, url in self.indexer.items():
|
| data.append({"doc_id": doc_id, "url": url})
|
|
|
| dataset = Dataset.from_list(data)
|
| dataset.push_to_hub(self.dataset, token=os.environ["HF"])
|
| self.indexer = self.load_indexer()
|
|
|
| def get_docs_from_url(self, url):
|
| try:
|
| response = requests.get(url, verify=False, timeout=10)
|
| soup = BeautifulSoup(response.text, "html.parser")
|
| return [item.get_text() for item in soup.select("tr td a")]
|
| except Exception as e:
|
| with self.print_lock:
|
| print(f"Erreur lors de l'accès à {url}: {e}")
|
| return []
|
|
|
| def is_valid_document_pattern(self, filename):
|
| return bool(self.valid_doc_pattern.match(filename))
|
|
|
| def is_zip_file(self, filename):
|
| return filename.lower().endswith('.zip')
|
|
|
| def extract_doc_id(self, filename):
|
| if self.is_valid_document_pattern(filename):
|
| match = self.valid_doc_pattern.match(filename)
|
| if match:
|
|
|
| full_id = filename.split('.')[0]
|
| return full_id.split('_')[0]
|
| return None
|
|
|
| def process_zip_files(self, files_list, base_url, workshop=False):
|
| """Traiter une liste de fichiers pour trouver et indexer les ZIP valides"""
|
| indexed_count = 0
|
|
|
| for file in files_list:
|
| if file in ['./', '../', 'ZIP/', 'zip/']:
|
| continue
|
|
|
|
|
| if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop):
|
| file_url = f"{base_url}/{file}"
|
|
|
|
|
| doc_id = self.extract_doc_id(file)
|
| if doc_id is None:
|
| doc_id = file.split('.')[0]
|
| if doc_id:
|
|
|
| with self.indexer_lock:
|
| if doc_id in self.indexer and self.indexer[doc_id] == file_url:
|
| continue
|
|
|
|
|
| self.indexer[doc_id] = file_url
|
| indexed_count += 1
|
| self.total_indexed += 1
|
|
|
| return indexed_count
|
|
|
| def process_meeting(self, meeting, wg_url, workshop=False):
|
| """Traiter une réunion individuelle avec multithreading"""
|
| try:
|
| if meeting in ['./', '../']:
|
| return 0
|
|
|
| meeting_url = f"{wg_url}/{meeting}"
|
|
|
| with self.print_lock:
|
| print(f"Vérification du meeting: {meeting}")
|
|
|
|
|
| meeting_contents = self.get_docs_from_url(meeting_url)
|
|
|
| key = None
|
| for item in meeting_contents:
|
| normalized = item.lower().rstrip('/')
|
| if normalized in ("docs", "tdocs", "tdoc"):
|
| key = item.rstrip('/')
|
| break
|
|
|
| if key is not None:
|
| docs_url = f"{meeting_url}/{key}"
|
|
|
| with self.print_lock:
|
| print(f"Vérification des documents présent dans {docs_url}")
|
|
|
|
|
| docs_files = self.get_docs_from_url(docs_url)
|
|
|
|
|
| docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop)
|
|
|
| if docs_indexed_count > 0:
|
| with self.print_lock:
|
| print(f"{docs_indexed_count} fichiers trouvés")
|
|
|
|
|
| zip_folder = None
|
| for item in docs_files:
|
| if item.lower().rstrip('/') == "zip":
|
| zip_folder = item.rstrip('/')
|
| break
|
| if zip_folder:
|
| zip_url = f"{docs_url}/{zip_folder}"
|
|
|
| with self.print_lock:
|
| print(f"Vérification du dossier ./zip: {zip_url}")
|
|
|
|
|
| zip_files = self.get_docs_from_url(zip_url)
|
|
|
|
|
| zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop)
|
|
|
| if zip_indexed_count > 0:
|
| with self.print_lock:
|
| print(f"{zip_indexed_count} fichiers trouvés")
|
|
|
|
|
| with self.indexer_lock:
|
| self.processed_count += 1
|
|
|
|
|
| with self.print_lock:
|
| progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0
|
| print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)")
|
|
|
| return 1
|
|
|
| except Exception as e:
|
| with self.print_lock:
|
| print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}")
|
| return 0
|
|
|
| def process_workgroup(self, wg, main_url):
|
| """Traiter un groupe de travail avec multithreading pour ses réunions"""
|
| if wg in ['./', '../']:
|
| return
|
|
|
| wg_url = f"{main_url}/{wg}"
|
|
|
| with self.print_lock:
|
| print(f"Vérification du working group: {wg}")
|
|
|
|
|
| meeting_folders = self.get_docs_from_url(wg_url)
|
|
|
|
|
| self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
|
|
|
|
|
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
|
| futures = [executor.submit(self.process_meeting, meeting, wg_url)
|
| for meeting in meeting_folders if meeting not in ['./', '../']]
|
|
|
| total = len(futures)
|
| done_count = 0
|
| yield f"event: get-maximum\ndata: {total}\n\n"
|
|
|
| try:
|
| for future in concurrent.futures.as_completed(futures):
|
| done_count += 1
|
| yield f"event: progress\ndata: {done_count}\n\n"
|
| except GeneratorExit:
|
| for f in futures:
|
| f.cancel()
|
| executor.shutdown(wait=False, cancel_futures=True)
|
| return
|
| executor.shutdown(wait=False)
|
|
|
| def index_all_tdocs(self):
|
| """Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading"""
|
| print("Démarrage de l'indexation des TDocs 3GPP complète")
|
|
|
| start_time = time.time()
|
| docs_count_before = self.indexer_length
|
|
|
|
|
| main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"]
|
|
|
| for main_tsg in main_groups:
|
| print(f"Indexation de {main_tsg.upper()}...")
|
|
|
| main_url = f"{self.main_ftp_url}/{main_tsg}"
|
|
|
|
|
| workgroups = self.get_docs_from_url(main_url)
|
|
|
|
|
|
|
| for wg in workgroups:
|
| yield f"event: info\ndata: {main_tsg}-{wg}\n\n"
|
| for content in self.process_workgroup(wg, main_url):
|
| yield content
|
|
|
| docs_count_after = len(self.indexer)
|
| new_docs_count = abs(docs_count_after - docs_count_before)
|
|
|
| print(f"Indexation terminée en {time.time() - start_time:.2f} secondes")
|
| print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
|
| print(f"Total des documents dans l'index: {docs_count_after}")
|
|
|
| return self.indexer
|
|
|
| def index_all_workshops(self):
|
| print("Démarrage de l'indexation des workshops ZIP 3GPP...")
|
| start_time = time.time()
|
| docs_count_before = len(self.indexer)
|
|
|
| print("\nIndexation du dossier 'workshop'")
|
| main_url = f"{self.main_ftp_url}/workshop"
|
|
|
|
|
| meeting_folders = self.get_docs_from_url(main_url)
|
|
|
|
|
| self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
|
|
|
|
|
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
|
| futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True)
|
| for meeting in meeting_folders if meeting not in ['./', '../']]
|
| total = len(futures)
|
| done_count = 0
|
|
|
| yield f"event: get-maximum\ndata: {total}\n\n"
|
|
|
| try:
|
| for future in concurrent.futures.as_completed(futures):
|
| done_count += 1
|
| yield f"event: progress\ndata: {done_count}\n\n"
|
| except GeneratorExit:
|
| for f in futures:
|
| f.cancel()
|
| executor.shutdown(wait=False, cancel_futures=True)
|
| return
|
| executor.shutdown(wait=False)
|
|
|
| docs_count_after = len(self.indexer)
|
| new_docs_count = docs_count_after - docs_count_before
|
|
|
| print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes")
|
| print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
|
| print(f"Total des documents dans l'index: {docs_count_after}")
|
|
|
| return self.indexer
|
|
|
| class Spec3GPPIndexer:
|
| def __init__(self, max_workers=16):
|
| self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
|
| self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
|
| self.indexed_specifications = {}
|
| self.specifications_passed = set()
|
| self.processed_count = 0
|
| self.total_count = 0
|
|
|
| try:
|
| self.failed_specifications = set(
|
| item["spec_id"] for item in load_dataset("OrganizedProgrammers/3GPPFailedSpecs")["train"].to_list()
|
| )
|
| print(f"Loaded {len(self.failed_specifications)} previously failed specifications")
|
| except (EmptyDatasetError, Exception):
|
| self.failed_specifications = set()
|
|
|
| self.DICT_LOCK = threading.Lock()
|
| self.DOCUMENT_LOCK = threading.Lock()
|
| self.STOP_EVENT = threading.Event()
|
| self.max_workers = max_workers
|
| self.LIBREOFFICE_SEMAPHORE = threading.Semaphore(self.max_workers)
|
|
|
| def _make_doc_index(self, specs):
|
| doc_index = {}
|
| for section in specs:
|
| if section["doc_id"] not in doc_index:
|
| doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
|
| else:
|
| doc_index[section["doc_id"]]["content"][section["section"]] = section["content"]
|
| return doc_index
|
|
|
| @staticmethod
|
| def version_to_code(version_str):
|
| chars = "0123456789abcdefghijklmnopqrstuvwxyz"
|
| parts = version_str.split('.')
|
| if len(parts) != 3:
|
| return None
|
| try:
|
| x, y, z = [int(p) for p in parts]
|
| except ValueError:
|
| return None
|
| if x < 36 and y < 36 and z < 36:
|
| return f"{chars[x]}{chars[y]}{chars[z]}"
|
| else:
|
| return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}"
|
|
|
| @staticmethod
|
| def hasher(specification, version_code):
|
| return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest()
|
|
|
| @staticmethod
|
| def get_scope(content):
|
| for title, text in content.items():
|
| if title.lower().endswith("scope"):
|
| return text
|
| return ""
|
|
|
| def get_text(self, specification, version_code):
|
| if self.STOP_EVENT.is_set():
|
| return []
|
|
|
| doc_id = specification
|
| series = doc_id.split(".")[0]
|
| url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip"
|
|
|
| try:
|
| response = requests.get(url, verify=False, timeout=(10, 120))
|
| if response.status_code != 200:
|
| return []
|
|
|
| zip_bytes = io.BytesIO(response.content)
|
| with zipfile.ZipFile(zip_bytes) as zip_file:
|
|
|
| docx_files = [f for f in zip_file.namelist() if f.lower().endswith(('.doc', '.docx'))]
|
| if not docx_files:
|
| return []
|
|
|
| full_text = []
|
|
|
| for doc_file in docx_files:
|
| with tempfile.TemporaryDirectory() as tmpdir:
|
| extracted_path = os.path.join(tmpdir, os.path.basename(doc_file))
|
| with open(extracted_path, 'wb') as f:
|
| f.write(zip_file.read(doc_file))
|
|
|
|
|
| profile_dir = tempfile.mkdtemp(prefix="libreoffice_profile_")
|
|
|
| try:
|
| with self.LIBREOFFICE_SEMAPHORE:
|
| cmd = [
|
| 'soffice',
|
| '--headless',
|
| f'-env:UserInstallation=file://{profile_dir}',
|
| '--convert-to', 'txt:Text',
|
| '--outdir', tmpdir,
|
| extracted_path
|
| ]
|
| subprocess.run(cmd, check=True, timeout=60*2, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
| txt_file = os.path.splitext(extracted_path)[0] + '.txt'
|
| if os.path.exists(txt_file):
|
| with open(txt_file, 'r', encoding='utf-8', errors='ignore') as ftxt:
|
| full_text.extend(ftxt.readlines())
|
| finally:
|
| shutil.rmtree(profile_dir, ignore_errors=True)
|
|
|
| return full_text
|
|
|
| except Exception as e:
|
| print(f"Error getting text for {specification} v{version_code}: {e}")
|
| if isinstance(e, (subprocess.TimeoutExpired, subprocess.CalledProcessError)):
|
| with self.DICT_LOCK:
|
| self.failed_specifications.add(specification)
|
| print(f"Spec {specification}: marked as failed for future indexation runs")
|
| return []
|
|
|
| def get_spec_content(self, specification, version_code):
|
| if self.STOP_EVENT.is_set():
|
| return {}
|
|
|
| text = self.get_text(specification, version_code)
|
| if not text:
|
| return {}
|
|
|
| chapters = []
|
| chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$")
|
| for i, line in enumerate(text):
|
| if chapter_regex.fullmatch(line):
|
| chapters.append((i, line))
|
|
|
| document = {}
|
| for i in range(len(chapters)):
|
| start_index, chapter_title = chapters[i]
|
| end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text)
|
| content_lines = text[start_index + 1:end_index]
|
| document[chapter_title.replace("\t", " ")] = "\n".join(content_lines)
|
|
|
| return document
|
|
|
| def fetch_spec_table(self):
|
| response = requests.get(
|
| 'https://www.3gpp.org/dynareport?code=status-report.htm',
|
| headers={"User-Agent": 'Mozilla/5.0'},
|
| verify=False,
|
| timeout=(10, 60)
|
| )
|
| dfs = pd.read_html(io.StringIO(response.text))
|
| for x in range(len(dfs)):
|
| dfs[x] = dfs[x].replace({np.nan: None})
|
| columns_needed = [0, 1, 2, 3, 4]
|
| extracted_dfs = [df.iloc[:, columns_needed] for df in dfs]
|
| columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns]
|
| specifications = []
|
| for df in extracted_dfs:
|
| for index, row in df.iterrows():
|
| doc = row.to_list()
|
| doc_dict = dict(zip(columns, doc))
|
| specifications.append(doc_dict)
|
| return specifications
|
|
|
| def process_specification(self, spec):
|
| if self.STOP_EVENT.is_set():
|
| return
|
| try:
|
| spec_num = spec.get('spec_num')
|
| vers = spec.get('vers')
|
| if spec_num is None or vers is None:
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| return
|
| doc_id = str(spec_num)
|
| version_code = self.version_to_code(str(vers))
|
| if not version_code:
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| return
|
|
|
| if doc_id in self.failed_specifications:
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| print(f"Spec {doc_id} ({spec.get('title', '')}): skipped (previously failed) - Progress {self.processed_count}/{self.total_count}")
|
| return
|
|
|
| document = None
|
| already_indexed = False
|
| needs_fetch = False
|
|
|
| with self.DOCUMENT_LOCK:
|
| if doc_id in self.specifications_passed:
|
| document = self.documents_by_spec_num.get(doc_id)
|
| already_indexed = True
|
| elif (doc_id in self.documents_by_spec_num
|
| and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version_code)):
|
| document = self.documents_by_spec_num[doc_id]
|
| self.specifications_passed.add(doc_id)
|
| already_indexed = True
|
| else:
|
| self.specifications_passed.add(doc_id)
|
| needs_fetch = True
|
|
|
| if needs_fetch:
|
| doc_content = self.get_spec_content(doc_id, version_code)
|
| if doc_content:
|
| document = {"content": doc_content, "hash": self.hasher(doc_id, version_code)}
|
| with self.DOCUMENT_LOCK:
|
| self.documents_by_spec_num[doc_id] = document
|
| already_indexed = False
|
|
|
| if document:
|
| url = f"https://www.3gpp.org/ftp/Specs/archive/{doc_id.split('.')[0]}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip"
|
| metadata = {
|
| "id": doc_id,
|
| "title": spec.get("title", ""),
|
| "type": spec.get("type", ""),
|
| "version": str(spec.get("vers", "")),
|
| "working_group": spec.get("WG", ""),
|
| "url": url,
|
| "scope": self.get_scope(document["content"])
|
| }
|
| key = f"{doc_id}+-+{spec.get('title', '')}+-+{spec.get('type', '')}+-+{spec.get('vers', '')}+-+{spec.get('WG', '')}"
|
| with self.DICT_LOCK:
|
| self.indexed_specifications[key] = metadata
|
|
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| status = "already indexed" if already_indexed else "indexed now"
|
| print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}")
|
|
|
| except Exception as e:
|
| traceback.print_exc()
|
| print(f"Error processing spec {spec.get('spec_num')} v{spec.get('vers')}: {e}")
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| print(f"Progress: {self.processed_count}/{self.total_count} specs processed")
|
|
|
| def get_document(self, spec_id: str, spec_title: str):
|
| text = [f"{spec_id} - {spec_title}\n"]
|
| doc_data = self.documents_by_spec_num.get(spec_id)
|
| if doc_data:
|
| for section_title, content in doc_data["content"].items():
|
| text.append(f"{section_title}\n\n{content}")
|
| return text
|
|
|
| def create_bm25_index(self):
|
| dataset_metadata = self.indexed_specifications.values()
|
| unique_specs = set()
|
| corpus_json = []
|
|
|
| for specification in dataset_metadata:
|
| if specification['id'] in unique_specs: continue
|
| unique_specs.add(specification['id'])
|
| doc_data = self.documents_by_spec_num.get(specification['id'])
|
| if doc_data:
|
| for section_title, content in doc_data["content"].items():
|
| corpus_json.append({"text": f"{section_title}\n{content}", "metadata": {
|
| "id": specification['id'],
|
| "title": specification['title'],
|
| "section_title": section_title,
|
| "version": specification['version'],
|
| "type": specification['type'],
|
| "working_group": specification['working_group'],
|
| "url": specification['url'],
|
| "scope": specification['scope']
|
| }})
|
|
|
| corpus_text = [doc["text"] for doc in corpus_json]
|
| corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
|
|
|
| print("Indexing BM25")
|
| retriever = BM25HF(corpus=corpus_json)
|
| retriever.index(corpus_tokens)
|
|
|
| retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSections", token=os.environ.get("HF"))
|
|
|
| unique_specs = set()
|
| corpus_json = []
|
|
|
| for specification in dataset_metadata:
|
| if specification['id'] in unique_specs: continue
|
| text_list = self.get_document(specification['id'], specification['title'])
|
| text = "\n".join(text_list)
|
| if len(text_list) == 1: continue
|
| corpus_json.append({"text": text, "metadata": specification})
|
| unique_specs.add(specification['id'])
|
|
|
| corpus_text = [doc["text"] for doc in corpus_json]
|
| corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
|
|
|
| print("Indexing BM25")
|
| retriever = BM25HF(corpus=corpus_json)
|
| retriever.index(corpus_tokens)
|
|
|
| retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSingle", token=os.environ.get("HF"))
|
|
|
| def run(self):
|
| print("Fetching specification tables from 3GPP...")
|
| yield "event: info\ndata: Indexing 3GPP specs ...\n\n"
|
| specifications = self.fetch_spec_table()
|
| self.total_count = len(specifications)
|
| print(f"Processing {self.total_count} specs with {self.max_workers} threads...")
|
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
|
| futures = [executor.submit(self.process_specification, spec) for spec in specifications]
|
| total = len(futures)
|
| done_count = 0
|
| yield f"event: get-maximum\ndata: {total}\n\n"
|
|
|
| try:
|
| for future in concurrent.futures.as_completed(futures):
|
| done_count += 1
|
| yield f"event: progress\ndata: {done_count}\n\n"
|
| if self.STOP_EVENT.is_set():
|
| break
|
| except GeneratorExit:
|
| for f in futures:
|
| f.cancel()
|
| executor.shutdown(wait=False, cancel_futures=True)
|
| return
|
| executor.shutdown(wait=False)
|
| print("All specs processed.")
|
|
|
|
|
| def save(self):
|
| print("Saving indexed data...")
|
| flat_metadata = [metadata for metadata in self.indexed_specifications.values()]
|
| flat_docs = []
|
| print("Flatting doc contents")
|
| for doc_id, data in self.documents_by_spec_num.items():
|
| for title, content in data["content"].items():
|
| flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
|
| print("Creating datasets ...")
|
| push_spec_content = Dataset.from_list(flat_docs)
|
| push_spec_metadata = Dataset.from_list(flat_metadata)
|
|
|
| print("Pushing ...")
|
| push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"])
|
| push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"])
|
|
|
| if self.failed_specifications:
|
| failed_list = [{"spec_id": spec_id} for spec_id in sorted(self.failed_specifications)]
|
| Dataset.from_list(failed_list).push_to_hub("OrganizedProgrammers/3GPPFailedSpecs", token=os.environ["HF"])
|
| print(f"Saved {len(failed_list)} failed specifications to 3GPPFailedSpecs")
|
|
|
| self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
|
| self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
|
| print("Save finished.")
|
|
|
| class SpecETSIIndexer:
|
| def __init__(self, max_workers=16):
|
| self.session = requests.Session()
|
| self.session.verify = False
|
|
|
| self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list()
|
| self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
|
| self.indexed_specifications = {}
|
| self.specifications_passed = set()
|
| self.processed_count = 0
|
| self.total_count = 0
|
|
|
| self.DICT_LOCK = threading.Lock()
|
| self.DOCUMENT_LOCK = threading.Lock()
|
| self.STOP_EVENT = threading.Event()
|
| self.max_workers = max_workers
|
|
|
| self.df = self._fetch_spec_table()
|
|
|
| def _make_doc_index(self, specs):
|
| doc_index = {}
|
| for section in specs:
|
| if section["doc_id"] not in doc_index:
|
| doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
|
| else:
|
| doc_index[section["doc_id"]]["content"][section["section"]] = section["content"]
|
| return doc_index
|
|
|
| def _fetch_spec_table(self):
|
|
|
| print("Connexion login ETSI...")
|
| self.session.post(
|
| "https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
|
| verify=False,
|
| headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."},
|
| data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}),
|
| timeout=(10, 30),
|
| )
|
|
|
| print("Récupération des métadonnées TS/TR …")
|
| url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1"
|
| url_tr = url_ts.replace("stdType=TS", "stdType=TR")
|
| data_ts = self.session.get(url_ts, verify=False, timeout=(10, 120)).content
|
| data_tr = self.session.get(url_tr, verify=False, timeout=(10, 120)).content
|
| df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False)
|
| df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False)
|
|
|
| backup_ts = df_ts["ETSI deliverable"]
|
| backup_tr = df_tr["ETSI deliverable"]
|
| df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)")
|
| df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)")
|
| version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
|
| version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
|
| df_ts["Version"] = version1[0]
|
| df_tr["Version"] = version2[0]
|
|
|
| def ver_tuple(v):
|
| if not isinstance(v, str):
|
| return (0, 0, 0)
|
| return tuple(map(int, v.split(".")))
|
| df_ts["temp"] = df_ts["Version"].apply(ver_tuple)
|
| df_tr["temp"] = df_tr["Version"].apply(ver_tuple)
|
| df_ts["Type"] = "TS"
|
| df_tr["Type"] = "TR"
|
| df = pd.concat([df_ts, df_tr])
|
| df = df.dropna(subset=["ETSI deliverable", "Version"])
|
| unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()]
|
| unique_df = unique_df.drop(columns="temp")
|
| unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))]
|
| return unique_df
|
|
|
| @staticmethod
|
| def hasher(specification: str, version: str):
|
| return hashlib.md5(f"{specification}{version}".encode()).hexdigest()
|
|
|
| @staticmethod
|
| def get_scope(content):
|
| for title, text in content.items():
|
| if title.lower().endswith("scope"):
|
| return text
|
| return ""
|
|
|
| def get_document(self, spec_id: str, spec_title: str):
|
| text = [f"{spec_id} - {spec_title}\n"]
|
| doc_data = self.documents_by_spec_num.get(spec_id)
|
| if doc_data:
|
| for section_title, content in doc_data["content"].items():
|
| text.append(f"{section_title}\n\n{content}")
|
| return text
|
|
|
| def get_text(self, specification: str):
|
| if self.STOP_EVENT.is_set():
|
| return None, []
|
| print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True)
|
| try:
|
|
|
| row = self.df[self.df["ETSI deliverable"] == specification]
|
| if row.empty:
|
| print(f"[WARN] Spécification {specification} absente du tableau")
|
| return None, []
|
|
|
| pdf_link = row.iloc[0]["PDF link"]
|
| response = self.session.get(
|
| pdf_link,
|
| headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...'},
|
| timeout=(10, 120)
|
| )
|
| if response.status_code != 200:
|
| print(f"[ERREUR] Echec du téléchargement du PDF pour {specification}.")
|
| return None, []
|
| pdf = fitz.open(stream=response.content, filetype="pdf")
|
| return pdf, pdf.get_toc()
|
| except Exception as e:
|
| print(f"[ERROR] Échec get_text pour {specification} : {e}", flush=True)
|
| return None, []
|
|
|
| def get_spec_content(self, specification: str):
|
| def extract_sections(text, titles):
|
| sections = {}
|
| sorted_titles = sorted(titles, key=lambda t: text.find(t))
|
| for i, title in enumerate(sorted_titles):
|
| start = text.find(title)
|
| if i + 1 < len(sorted_titles):
|
| end = text.find(sorted_titles[i + 1])
|
| sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip())
|
| else:
|
| sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip())
|
| return sections
|
|
|
| if self.STOP_EVENT.is_set():
|
| return {}
|
| print(f"[INFO] Extraction du contenu de {specification}", flush=True)
|
| pdf, doc_toc = self.get_text(specification)
|
| text = []
|
| if not pdf or not doc_toc:
|
| print("[ERREUR] Pas de texte ou table of contents trouvé !")
|
| return {}
|
|
|
| first_page = 0
|
| for level, title, page in doc_toc:
|
| first_page = page - 1
|
| break
|
| for page in pdf[first_page:]:
|
| text.append("\n".join([line.strip() for line in page.get_text().splitlines()]))
|
| text = "\n".join(text)
|
| if not text or not doc_toc or self.STOP_EVENT.is_set():
|
| print("[ERREUR] Pas de texte/table of contents récupéré !")
|
| return {}
|
| titles = []
|
| for level, title, page in doc_toc:
|
| if self.STOP_EVENT.is_set():
|
| return {}
|
| if title and title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text:
|
| titles.append('\n'.join(title.strip().split(" ", 1)))
|
| return extract_sections(text, titles)
|
|
|
| def process_specification(self, spec):
|
| if self.STOP_EVENT.is_set():
|
| return
|
| doc_id = "unknown"
|
| try:
|
| version = spec.get('Version')
|
| if not version or (isinstance(version, float) and pd.isna(version)):
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| return
|
| doc_id = spec.get("ETSI deliverable")
|
| if not doc_id or (isinstance(doc_id, float) and pd.isna(doc_id)):
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| return
|
| doc_id = str(doc_id)
|
| document = None
|
| already_indexed = False
|
| needs_fetch = False
|
|
|
| with self.DOCUMENT_LOCK:
|
| if doc_id in self.specifications_passed:
|
| document = self.documents_by_spec_num.get(doc_id)
|
| already_indexed = True
|
| elif (doc_id in self.documents_by_spec_num
|
| and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version)):
|
| document = self.documents_by_spec_num[doc_id]
|
| self.specifications_passed.add(doc_id)
|
| already_indexed = True
|
| else:
|
| self.specifications_passed.add(doc_id)
|
| needs_fetch = True
|
|
|
| if needs_fetch:
|
| document_content = self.get_spec_content(doc_id)
|
| if document_content:
|
| document = {"content": document_content, "hash": self.hasher(doc_id, version)}
|
| with self.DOCUMENT_LOCK:
|
| self.documents_by_spec_num[doc_id] = document
|
| already_indexed = False
|
|
|
| if document:
|
| title = spec.get("title", "")
|
| if isinstance(title, float) and pd.isna(title):
|
| title = ""
|
| spec_type = spec.get("Type", "")
|
| pdf_link = spec.get("PDF link", "")
|
| string_key = f"{doc_id}+-+{title}+-+{spec_type}+-+{version}"
|
| metadata = {
|
| "id": str(doc_id),
|
| "title": title,
|
| "type": spec_type,
|
| "version": version,
|
| "url": pdf_link,
|
| "scope": self.get_scope(document["content"])
|
| }
|
| with self.DICT_LOCK:
|
| self.indexed_specifications[string_key] = metadata
|
|
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| status = "already indexed" if already_indexed else "indexed now"
|
| print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}")
|
|
|
| except Exception as e:
|
| traceback.print_exc()
|
| print(f"\n[ERREUR] Échec du traitement de {doc_id} {spec.get('Version')}: {e}", flush=True)
|
| with self.DICT_LOCK:
|
| self.processed_count += 1
|
| print(f"Progress: {self.processed_count}/{self.total_count} specs processed")
|
|
|
| def run(self):
|
| print("Démarrage indexation ETSI…")
|
| yield "event: info\ndata: Indexing ETSI specs ...\n\n"
|
| specifications = self.df.to_dict(orient="records")
|
| self.total_count = len(specifications)
|
| print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n")
|
|
|
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers)
|
| futures = [executor.submit(self.process_specification, spec) for spec in specifications]
|
| total = len(futures)
|
| done_count = 0
|
| yield f"event: get-maximum\ndata: {total}\n\n"
|
|
|
| try:
|
| for future in concurrent.futures.as_completed(futures):
|
| done_count += 1
|
| yield f"event: progress\ndata: {done_count}\n\n"
|
| if self.STOP_EVENT.is_set():
|
| break
|
| except GeneratorExit:
|
| for f in futures:
|
| f.cancel()
|
| executor.shutdown(wait=False, cancel_futures=True)
|
| return
|
| executor.shutdown(wait=False)
|
| print(f"\nAll {self.processed_count}/{self.total_count} specs processed.")
|
|
|
| def save(self):
|
| print("\nSauvegarde en cours...", flush=True)
|
| flat_metadata = [metadata for metadata in self.indexed_specifications.values()]
|
| flat_docs = []
|
| for doc_id, data in self.documents_by_spec_num.items():
|
| for title, content in data["content"].items():
|
| flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
|
| push_spec_content = Dataset.from_list(flat_docs)
|
| push_spec_metadata = Dataset.from_list(flat_metadata)
|
| push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF"])
|
| push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF"])
|
|
|
| self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list()
|
| self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
|
| print("Sauvegarde terminée.")
|
|
|
| def create_bm25_index(self):
|
| dataset_metadata = self.indexed_specifications.values()
|
| unique_specs = set()
|
| corpus_json = []
|
|
|
| for specification in dataset_metadata:
|
| if specification['id'] in unique_specs: continue
|
| unique_specs.add(specification['id'])
|
| doc_data = self.documents_by_spec_num.get(specification['id'])
|
| if doc_data:
|
| for section_title, content in doc_data["content"].items():
|
| corpus_json.append({"text": f"{section_title}\n{content}", "metadata": {
|
| "id": specification['id'],
|
| "title": specification['title'],
|
| "section_title": section_title,
|
| "version": specification['version'],
|
| "type": specification['type'],
|
| "url": specification['url'],
|
| "scope": specification['scope']
|
| }})
|
|
|
| corpus_text = [doc["text"] for doc in corpus_json]
|
| corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
|
|
|
| print("Indexing BM25")
|
| retriever = BM25HF(corpus=corpus_json)
|
| retriever.index(corpus_tokens)
|
|
|
| retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSections", token=os.environ.get("HF"))
|
|
|
| unique_specs = set()
|
| corpus_json = []
|
|
|
| for specification in dataset_metadata:
|
| if specification['id'] in unique_specs: continue
|
| text_list = self.get_document(specification['id'], specification['title'])
|
| text = "\n".join(text_list)
|
| if len(text_list) == 1: continue
|
| corpus_json.append({"text": text, "metadata": specification})
|
| unique_specs.add(specification['id'])
|
|
|
| corpus_text = [doc["text"] for doc in corpus_json]
|
| corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
|
|
|
| print("Indexing BM25")
|
| retriever = BM25HF(corpus=corpus_json)
|
| retriever.index(corpus_tokens)
|
|
|
| retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSingle", token=os.environ.get("HF"))
|
| |