"""PII detection pipeline — orchestrates regex and NER layers. Detection order: regex (always) → NER (if enabled). Overlapping entities are deduplicated: longest span wins; on equal length, regex takes precedence over NER. """ from __future__ import annotations import logging import config from layers.ner_layer import NERLayer from layers.regex_layer import DetectedEntity, RegexLayer logger = logging.getLogger(__name__) def _spans_overlap(a: DetectedEntity, b: DetectedEntity) -> bool: """Return True if entities *a* and *b* have overlapping character spans.""" return a.start < b.end and b.start < a.end def _deduplicate(entities: list[DetectedEntity]) -> list[DetectedEntity]: """Remove overlapping entities, keeping the best one per overlap group. Priority: longer span > shorter span; on tie, regex > ner. """ # Sort by start position, then by descending span length sorted_entities = sorted(entities, key=lambda e: (e.start, -(e.end - e.start))) result: list[DetectedEntity] = [] for candidate in sorted_entities: dominated = False for kept in result: if _spans_overlap(candidate, kept): # Kept entity dominates if it is longer or same length with better layer kept_len = kept.end - kept.start cand_len = candidate.end - candidate.start if kept_len > cand_len: dominated = True break if kept_len == cand_len and kept.detection_layer == "regex": dominated = True break # Candidate is better — replace kept result.remove(kept) break if not dominated: result.append(candidate) return sorted(result, key=lambda e: e.start) class Pipeline: """Orchestrates PII detection across all configured layers.""" def __init__(self) -> None: self._regex = RegexLayer() self._ner = NERLayer( fr_model=config.SPACY_FR_MODEL, en_model=config.SPACY_EN_MODEL, ) def detect( self, text: str, enable_ner: bool = True, confidence_threshold: float = 0.85, ) -> list[DetectedEntity]: """Return deduplicated PII entities found in *text*.""" entities = self._regex.detect(text) if enable_ner and config.NER_ENABLED: try: ner_entities = self._ner.detect(text, confidence_threshold) entities = _deduplicate(entities + ner_entities) except Exception: logger.exception("NER layer failed — using regex results only") return entities def warm_up(self) -> None: """Pre-load the NER model to avoid cold-start on first request.""" if config.NER_ENABLED: self._ner.warm_up() @property def ner_layer(self) -> NERLayer: return self._ner