"""Layer 2 — NER-based PII detection using Presidio + spaCy. Detects PERSON, LOCATION, ORGANIZATION entities. The spaCy model is loaded lazily on first access and cached. """ from __future__ import annotations import logging from typing import TYPE_CHECKING from layers.regex_layer import DetectedEntity if TYPE_CHECKING: from presidio_analyzer import AnalyzerEngine # noqa: F401 — type hint only logger = logging.getLogger(__name__) # Presidio entity type → internal type mapping _ENTITY_TYPE_MAP = { "PERSON": "PERSON", "LOCATION": "LOCATION", "ORGANIZATION": "ORGANIZATION", } # Presidio entities to request _PRESIDIO_ENTITIES = list(_ENTITY_TYPE_MAP.keys()) class NERLayer: """Detect named entities using Presidio backed by spaCy fr_core_news_lg.""" def __init__(self, fr_model: str = "fr_core_news_lg", en_model: str = "en_core_web_sm") -> None: self._fr_model = fr_model self._en_model = en_model self._analyzer: "AnalyzerEngine | None" = None @property def analyzer(self) -> "AnalyzerEngine": if self._analyzer is None: self._analyzer = self._build_analyzer() return self._analyzer @property def is_loaded(self) -> bool: return self._analyzer is not None def warm_up(self) -> None: """Force model loading — call at service startup to avoid cold-start latency.""" _ = self.analyzer logger.info("NER model loaded: %s", self._fr_model) def detect(self, text: str, confidence_threshold: float = 0.85) -> list[DetectedEntity]: """Return NER entities found in *text* above *confidence_threshold*.""" try: results = self.analyzer.analyze( text=text, entities=_PRESIDIO_ENTITIES, language="fr", score_threshold=confidence_threshold, ) except Exception: # Fall back to English if French analysis fails try: results = self.analyzer.analyze( text=text, entities=_PRESIDIO_ENTITIES, language="en", score_threshold=confidence_threshold, ) except Exception: logger.exception("NER analysis failed") return [] entities = [] for r in results: internal_type = _ENTITY_TYPE_MAP.get(r.entity_type) if internal_type is None: continue entities.append( DetectedEntity( entity_type=internal_type, original_value=text[r.start : r.end], start=r.start, end=r.end, confidence=r.score, detection_layer="ner", ) ) return entities # ----------------------------------------------------------------------- # Internal # ----------------------------------------------------------------------- def _build_analyzer(self) -> "AnalyzerEngine": from presidio_analyzer import AnalyzerEngine from presidio_analyzer.nlp_engine import NerModelConfiguration, SpacyNlpEngine # The default NerModelConfiguration incorrectly places 'ORG' and 'ORGANIZATION' # in labels_to_ignore, which prevents ORGANIZATION entities from being returned. # We build a custom configuration that keeps those labels. default_cfg = NerModelConfiguration() custom_labels_to_ignore = default_cfg.labels_to_ignore - {"ORG", "ORGANIZATION"} ner_config = NerModelConfiguration( model_to_presidio_entity_mapping=default_cfg.model_to_presidio_entity_mapping, low_score_entity_names=default_cfg.low_score_entity_names, labels_to_ignore=custom_labels_to_ignore, ) nlp_engine = SpacyNlpEngine( models=[ {"lang_code": "fr", "model_name": self._fr_model}, {"lang_code": "en", "model_name": self._en_model}, ], ner_model_configuration=ner_config, ) return AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=["fr", "en"])