110 lines
3.6 KiB
Python
110 lines
3.6 KiB
Python
"""Layer 2 — NER-based PII detection using Presidio + spaCy.
|
|
|
|
Detects PERSON, LOCATION, ORGANIZATION entities.
|
|
The spaCy model is loaded lazily on first access and cached.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING
|
|
|
|
from layers.regex_layer import DetectedEntity
|
|
|
|
if TYPE_CHECKING:
|
|
from presidio_analyzer import AnalyzerEngine
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Presidio entity type → internal type mapping
|
|
_ENTITY_TYPE_MAP = {
|
|
"PERSON": "PERSON",
|
|
"LOCATION": "LOCATION",
|
|
"ORGANIZATION": "ORGANIZATION",
|
|
}
|
|
|
|
# Presidio entities to request
|
|
_PRESIDIO_ENTITIES = list(_ENTITY_TYPE_MAP.keys())
|
|
|
|
|
|
class NERLayer:
|
|
"""Detect named entities using Presidio backed by spaCy fr_core_news_lg."""
|
|
|
|
def __init__(self, fr_model: str = "fr_core_news_lg", en_model: str = "en_core_web_sm") -> None:
|
|
self._fr_model = fr_model
|
|
self._en_model = en_model
|
|
self._analyzer: "AnalyzerEngine | None" = None
|
|
|
|
@property
|
|
def analyzer(self) -> "AnalyzerEngine":
|
|
if self._analyzer is None:
|
|
self._analyzer = self._build_analyzer()
|
|
return self._analyzer
|
|
|
|
@property
|
|
def is_loaded(self) -> bool:
|
|
return self._analyzer is not None
|
|
|
|
def warm_up(self) -> None:
|
|
"""Force model loading — call at service startup to avoid cold-start latency."""
|
|
_ = self.analyzer
|
|
logger.info("NER model loaded: %s", self._fr_model)
|
|
|
|
def detect(self, text: str, confidence_threshold: float = 0.85) -> list[DetectedEntity]:
|
|
"""Return NER entities found in *text* above *confidence_threshold*."""
|
|
try:
|
|
results = self.analyzer.analyze(
|
|
text=text,
|
|
entities=_PRESIDIO_ENTITIES,
|
|
language="fr",
|
|
score_threshold=confidence_threshold,
|
|
)
|
|
except Exception:
|
|
# Fall back to English if French analysis fails
|
|
try:
|
|
results = self.analyzer.analyze(
|
|
text=text,
|
|
entities=_PRESIDIO_ENTITIES,
|
|
language="en",
|
|
score_threshold=confidence_threshold,
|
|
)
|
|
except Exception:
|
|
logger.exception("NER analysis failed")
|
|
return []
|
|
|
|
entities = []
|
|
for r in results:
|
|
internal_type = _ENTITY_TYPE_MAP.get(r.entity_type)
|
|
if internal_type is None:
|
|
continue
|
|
entities.append(
|
|
DetectedEntity(
|
|
entity_type=internal_type,
|
|
original_value=text[r.start : r.end],
|
|
start=r.start,
|
|
end=r.end,
|
|
confidence=r.score,
|
|
detection_layer="ner",
|
|
)
|
|
)
|
|
return entities
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Internal
|
|
# -----------------------------------------------------------------------
|
|
|
|
def _build_analyzer(self) -> "AnalyzerEngine":
|
|
from presidio_analyzer import AnalyzerEngine
|
|
from presidio_analyzer.nlp_engine import NlpEngineProvider
|
|
|
|
configuration = {
|
|
"nlp_engine_name": "spacy",
|
|
"models": [
|
|
{"lang_code": "fr", "model_name": self._fr_model},
|
|
{"lang_code": "en", "model_name": self._en_model},
|
|
],
|
|
}
|
|
provider = NlpEngineProvider(nlp_configuration=configuration)
|
|
nlp_engine = provider.create_engine()
|
|
return AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=["fr", "en"])
|