91 lines
2.9 KiB
Python
91 lines
2.9 KiB
Python
"""PII detection pipeline — orchestrates regex and NER layers.
|
|
|
|
Detection order: regex (always) → NER (if enabled).
|
|
Overlapping entities are deduplicated: longest span wins;
|
|
on equal length, regex takes precedence over NER.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
import config
|
|
from layers.ner_layer import NERLayer
|
|
from layers.regex_layer import DetectedEntity, RegexLayer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _spans_overlap(a: DetectedEntity, b: DetectedEntity) -> bool:
|
|
"""Return True if entities *a* and *b* have overlapping character spans."""
|
|
return a.start < b.end and b.start < a.end
|
|
|
|
|
|
def _deduplicate(entities: list[DetectedEntity]) -> list[DetectedEntity]:
|
|
"""Remove overlapping entities, keeping the best one per overlap group.
|
|
|
|
Priority: longer span > shorter span; on tie, regex > ner.
|
|
"""
|
|
# Sort by start position, then by descending span length
|
|
sorted_entities = sorted(entities, key=lambda e: (e.start, -(e.end - e.start)))
|
|
result: list[DetectedEntity] = []
|
|
|
|
for candidate in sorted_entities:
|
|
dominated = False
|
|
for kept in result:
|
|
if _spans_overlap(candidate, kept):
|
|
# Kept entity dominates if it is longer or same length with better layer
|
|
kept_len = kept.end - kept.start
|
|
cand_len = candidate.end - candidate.start
|
|
if kept_len > cand_len:
|
|
dominated = True
|
|
break
|
|
if kept_len == cand_len and kept.detection_layer == "regex":
|
|
dominated = True
|
|
break
|
|
# Candidate is better — replace kept
|
|
result.remove(kept)
|
|
break
|
|
if not dominated:
|
|
result.append(candidate)
|
|
|
|
return sorted(result, key=lambda e: e.start)
|
|
|
|
|
|
class Pipeline:
|
|
"""Orchestrates PII detection across all configured layers."""
|
|
|
|
def __init__(self) -> None:
|
|
self._regex = RegexLayer()
|
|
self._ner = NERLayer(
|
|
fr_model=config.SPACY_FR_MODEL,
|
|
en_model=config.SPACY_EN_MODEL,
|
|
)
|
|
|
|
def detect(
|
|
self,
|
|
text: str,
|
|
enable_ner: bool = True,
|
|
confidence_threshold: float = 0.85,
|
|
) -> list[DetectedEntity]:
|
|
"""Return deduplicated PII entities found in *text*."""
|
|
entities = self._regex.detect(text)
|
|
|
|
if enable_ner and config.NER_ENABLED:
|
|
try:
|
|
ner_entities = self._ner.detect(text, confidence_threshold)
|
|
entities = _deduplicate(entities + ner_entities)
|
|
except Exception:
|
|
logger.exception("NER layer failed — using regex results only")
|
|
|
|
return entities
|
|
|
|
def warm_up(self) -> None:
|
|
"""Pre-load the NER model to avoid cold-start on first request."""
|
|
if config.NER_ENABLED:
|
|
self._ner.warm_up()
|
|
|
|
@property
|
|
def ner_layer(self) -> NERLayer:
|
|
return self._ner
|