199 lines
6.3 KiB
Python
199 lines
6.3 KiB
Python
"""Layer 1 — Regex-based PII detection.
|
|
|
|
Sub-millisecond detection for structured PII: IBAN, email, phone, SSN, credit cards.
|
|
All patterns are pre-compiled at import time.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
@dataclass
|
|
class DetectedEntity:
|
|
entity_type: str
|
|
original_value: str
|
|
start: int
|
|
end: int
|
|
confidence: float = 1.0
|
|
detection_layer: str = "regex"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Luhn algorithm (credit card validation)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _luhn_valid(number: str) -> bool:
|
|
"""Return True if *number* (digits only) passes the Luhn check."""
|
|
digits = [int(d) for d in number]
|
|
odd_sum = sum(digits[-1::-2])
|
|
even_sum = sum(sum(divmod(d * 2, 10)) for d in digits[-2::-2])
|
|
return (odd_sum + even_sum) % 10 == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# IBAN MOD-97 checksum validation (ISO 13616)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IBAN_LETTER_MAP = {c: str(ord(c) - ord("A") + 10) for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ"}
|
|
|
|
|
|
def _iban_valid(raw: str) -> bool:
|
|
"""Return True if *raw* (spaces allowed) is a valid IBAN checksum."""
|
|
iban = raw.replace(" ", "").upper()
|
|
if len(iban) < 5:
|
|
return False
|
|
rearranged = iban[4:] + iban[:4]
|
|
numeric = "".join(_IBAN_LETTER_MAP.get(c, c) for c in rearranged)
|
|
try:
|
|
return int(numeric) % 97 == 1
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pre-compiled patterns
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# IBAN: 2-letter country code, 2 check digits, up to 30 alphanumeric chars (grouped by 4)
|
|
_RE_IBAN = re.compile(
|
|
r"\b([A-Z]{2}\d{2}(?:\s?[0-9A-Z]{4}){2,7}\s?[0-9A-Z]{1,4})\b"
|
|
)
|
|
|
|
# Email (RFC 5321 simplified)
|
|
_RE_EMAIL = re.compile(
|
|
r"\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b"
|
|
)
|
|
|
|
# French mobile: 06/07, with optional +33 or 0033 prefix, various separators
|
|
_RE_PHONE_FR = re.compile(
|
|
r"(?:(?:\+|00)33\s?|0)[67](?:[\s.\-]?\d{2}){4}"
|
|
)
|
|
|
|
# International phone: + followed by 1-3 digit country code + 6-12 digits
|
|
_RE_PHONE_INTL = re.compile(
|
|
r"(?<!\d)\+[1-9]\d{0,2}[\s.\-]?\d{4,14}(?!\d)"
|
|
)
|
|
|
|
# French SSN (NIR): 13 digits + 2-digit key, with optional separators
|
|
# Format: [12] YY MM DEP COM NNN CC
|
|
_RE_FR_SSN = re.compile(
|
|
r"\b([12]\d{2}(?:0[1-9]|1[0-2]|20)\d{2}\d{3}\d{3}\d{2})\b"
|
|
)
|
|
|
|
# Credit card: 16 digits in groups of 4 (Visa, Mastercard, etc.)
|
|
# Also handles Amex: 4+6+5 (not validated by Luhn here — Luhn handles it)
|
|
_RE_CREDIT_CARD = re.compile(
|
|
r"\b(\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4})\b"
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class RegexLayer:
|
|
"""Detect structured PII using pre-compiled regular expressions."""
|
|
|
|
def detect(self, text: str) -> list[DetectedEntity]:
|
|
"""Return all PII entities found in *text*, in document order."""
|
|
entities: list[DetectedEntity] = []
|
|
|
|
entities.extend(self._find_ibans(text))
|
|
entities.extend(self._find_emails(text))
|
|
entities.extend(self._find_phones(text))
|
|
entities.extend(self._find_ssns(text))
|
|
entities.extend(self._find_credit_cards(text))
|
|
|
|
return sorted(entities, key=lambda e: e.start)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Private detection helpers
|
|
# -----------------------------------------------------------------------
|
|
|
|
def _find_ibans(self, text: str) -> list[DetectedEntity]:
|
|
results = []
|
|
for m in _RE_IBAN.finditer(text):
|
|
value = m.group(1)
|
|
if _iban_valid(value):
|
|
results.append(
|
|
DetectedEntity(
|
|
entity_type="IBAN",
|
|
original_value=value,
|
|
start=m.start(1),
|
|
end=m.end(1),
|
|
)
|
|
)
|
|
return results
|
|
|
|
def _find_emails(self, text: str) -> list[DetectedEntity]:
|
|
return [
|
|
DetectedEntity(
|
|
entity_type="EMAIL",
|
|
original_value=m.group(),
|
|
start=m.start(),
|
|
end=m.end(),
|
|
)
|
|
for m in _RE_EMAIL.finditer(text)
|
|
]
|
|
|
|
def _find_phones(self, text: str) -> list[DetectedEntity]:
|
|
results = []
|
|
seen: set[tuple[int, int]] = set()
|
|
|
|
for m in _RE_PHONE_FR.finditer(text):
|
|
span = (m.start(), m.end())
|
|
if span not in seen:
|
|
seen.add(span)
|
|
results.append(
|
|
DetectedEntity(
|
|
entity_type="PHONE_FR",
|
|
original_value=m.group(),
|
|
start=m.start(),
|
|
end=m.end(),
|
|
)
|
|
)
|
|
|
|
for m in _RE_PHONE_INTL.finditer(text):
|
|
span = (m.start(), m.end())
|
|
if span not in seen:
|
|
seen.add(span)
|
|
results.append(
|
|
DetectedEntity(
|
|
entity_type="PHONE_INTL",
|
|
original_value=m.group(),
|
|
start=m.start(),
|
|
end=m.end(),
|
|
)
|
|
)
|
|
|
|
return results
|
|
|
|
def _find_ssns(self, text: str) -> list[DetectedEntity]:
|
|
return [
|
|
DetectedEntity(
|
|
entity_type="FR_SSN",
|
|
original_value=m.group(1),
|
|
start=m.start(1),
|
|
end=m.end(1),
|
|
)
|
|
for m in _RE_FR_SSN.finditer(text)
|
|
]
|
|
|
|
def _find_credit_cards(self, text: str) -> list[DetectedEntity]:
|
|
results = []
|
|
for m in _RE_CREDIT_CARD.finditer(text):
|
|
digits_only = re.sub(r"[\s\-]", "", m.group(1))
|
|
if _luhn_valid(digits_only):
|
|
results.append(
|
|
DetectedEntity(
|
|
entity_type="CREDIT_CARD",
|
|
original_value=m.group(1),
|
|
start=m.start(1),
|
|
end=m.end(1),
|
|
)
|
|
)
|
|
return results
|