242 lines
7.3 KiB
Go
242 lines
7.3 KiB
Go
package compliance
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// PgStore implements ComplianceStore using PostgreSQL.
|
|
type PgStore struct {
|
|
db *sql.DB
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewPgStore creates a PgStore backed by the given database connection.
|
|
func NewPgStore(db *sql.DB, logger *zap.Logger) *PgStore {
|
|
return &PgStore{db: db, logger: logger}
|
|
}
|
|
|
|
func (p *PgStore) List(ctx context.Context, tenantID string) ([]ProcessingEntry, error) {
|
|
const q = `
|
|
SELECT id, tenant_id, use_case_name, legal_basis, purpose,
|
|
data_categories, recipients, processors,
|
|
retention_period,
|
|
COALESCE(security_measures,''), COALESCE(controller_name,''),
|
|
COALESCE(risk_level,''), ai_act_answers,
|
|
is_active, created_at, updated_at
|
|
FROM processing_registry
|
|
WHERE tenant_id = $1 AND is_active = TRUE
|
|
ORDER BY created_at DESC`
|
|
|
|
rows, err := p.db.QueryContext(ctx, q, tenantID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("processing_registry list: %w", err)
|
|
}
|
|
defer rows.Close() //nolint:errcheck
|
|
|
|
var entries []ProcessingEntry
|
|
for rows.Next() {
|
|
e, err := scanEntry(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
entries = append(entries, e)
|
|
}
|
|
return entries, rows.Err()
|
|
}
|
|
|
|
func (p *PgStore) Get(ctx context.Context, id, tenantID string) (ProcessingEntry, error) {
|
|
const q = `
|
|
SELECT id, tenant_id, use_case_name, legal_basis, purpose,
|
|
data_categories, recipients, processors,
|
|
retention_period,
|
|
COALESCE(security_measures,''), COALESCE(controller_name,''),
|
|
COALESCE(risk_level,''), ai_act_answers,
|
|
is_active, created_at, updated_at
|
|
FROM processing_registry
|
|
WHERE id = $1 AND tenant_id = $2`
|
|
|
|
row := p.db.QueryRowContext(ctx, q, id, tenantID)
|
|
e, err := scanEntry(row)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return ProcessingEntry{}, ErrNotFound
|
|
}
|
|
return e, err
|
|
}
|
|
|
|
func (p *PgStore) Create(ctx context.Context, entry ProcessingEntry) (ProcessingEntry, error) {
|
|
catJSON, err := json.Marshal(entry.DataCategories)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal data_categories: %w", err)
|
|
}
|
|
recJSON, err := json.Marshal(entry.Recipients)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal recipients: %w", err)
|
|
}
|
|
procJSON, err := json.Marshal(entry.Processors)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal processors: %w", err)
|
|
}
|
|
|
|
var answersJSON []byte
|
|
if entry.AiActAnswers != nil {
|
|
answersJSON, err = json.Marshal(entry.AiActAnswers)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal ai_act_answers: %w", err)
|
|
}
|
|
}
|
|
|
|
const q = `
|
|
INSERT INTO processing_registry
|
|
(tenant_id, use_case_name, legal_basis, purpose,
|
|
data_categories, recipients, processors,
|
|
retention_period, security_measures, controller_name,
|
|
risk_level, ai_act_answers)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)
|
|
RETURNING id, tenant_id, use_case_name, legal_basis, purpose,
|
|
data_categories, recipients, processors,
|
|
retention_period,
|
|
COALESCE(security_measures,''), COALESCE(controller_name,''),
|
|
COALESCE(risk_level,''), ai_act_answers,
|
|
is_active, created_at, updated_at`
|
|
|
|
nilIfEmpty := func(s string) interface{} {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return s
|
|
}
|
|
|
|
row := p.db.QueryRowContext(ctx, q,
|
|
entry.TenantID, entry.UseCaseName, entry.LegalBasis, entry.Purpose,
|
|
catJSON, recJSON, procJSON,
|
|
entry.RetentionPeriod,
|
|
nilIfEmpty(entry.SecurityMeasures), nilIfEmpty(entry.ControllerName),
|
|
nilIfEmpty(entry.RiskLevel), answersJSON,
|
|
)
|
|
return scanEntry(row)
|
|
}
|
|
|
|
func (p *PgStore) Update(ctx context.Context, entry ProcessingEntry) (ProcessingEntry, error) {
|
|
catJSON, err := json.Marshal(entry.DataCategories)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal data_categories: %w", err)
|
|
}
|
|
recJSON, err := json.Marshal(entry.Recipients)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal recipients: %w", err)
|
|
}
|
|
procJSON, err := json.Marshal(entry.Processors)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal processors: %w", err)
|
|
}
|
|
|
|
var answersJSON []byte
|
|
if entry.AiActAnswers != nil {
|
|
answersJSON, err = json.Marshal(entry.AiActAnswers)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("marshal ai_act_answers: %w", err)
|
|
}
|
|
}
|
|
|
|
nilIfEmpty := func(s string) interface{} {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return s
|
|
}
|
|
|
|
const q = `
|
|
UPDATE processing_registry
|
|
SET use_case_name=$3, legal_basis=$4, purpose=$5,
|
|
data_categories=$6, recipients=$7, processors=$8,
|
|
retention_period=$9, security_measures=$10, controller_name=$11,
|
|
risk_level=$12, ai_act_answers=$13, updated_at=NOW()
|
|
WHERE id=$1 AND tenant_id=$2
|
|
RETURNING id, tenant_id, use_case_name, legal_basis, purpose,
|
|
data_categories, recipients, processors,
|
|
retention_period,
|
|
COALESCE(security_measures,''), COALESCE(controller_name,''),
|
|
COALESCE(risk_level,''), ai_act_answers,
|
|
is_active, created_at, updated_at`
|
|
|
|
row := p.db.QueryRowContext(ctx, q,
|
|
entry.ID, entry.TenantID,
|
|
entry.UseCaseName, entry.LegalBasis, entry.Purpose,
|
|
catJSON, recJSON, procJSON,
|
|
entry.RetentionPeriod,
|
|
nilIfEmpty(entry.SecurityMeasures), nilIfEmpty(entry.ControllerName),
|
|
nilIfEmpty(entry.RiskLevel), answersJSON,
|
|
)
|
|
e, err := scanEntry(row)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return ProcessingEntry{}, ErrNotFound
|
|
}
|
|
return e, err
|
|
}
|
|
|
|
func (p *PgStore) Delete(ctx context.Context, id, tenantID string) error {
|
|
const q = `UPDATE processing_registry SET is_active=FALSE, updated_at=NOW() WHERE id=$1 AND tenant_id=$2`
|
|
res, err := p.db.ExecContext(ctx, q, id, tenantID)
|
|
if err != nil {
|
|
return fmt.Errorf("processing_registry delete: %w", err)
|
|
}
|
|
n, _ := res.RowsAffected()
|
|
if n == 0 {
|
|
return ErrNotFound
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ─── scanner ─────────────────────────────────────────────────────────────────
|
|
|
|
type scanner interface {
|
|
Scan(dest ...interface{}) error
|
|
}
|
|
|
|
func scanEntry(s scanner) (ProcessingEntry, error) {
|
|
var (
|
|
e ProcessingEntry
|
|
catJSON []byte
|
|
recJSON []byte
|
|
procJSON []byte
|
|
answersJSON []byte
|
|
createdAt time.Time
|
|
updatedAt time.Time
|
|
)
|
|
err := s.Scan(
|
|
&e.ID, &e.TenantID, &e.UseCaseName, &e.LegalBasis, &e.Purpose,
|
|
&catJSON, &recJSON, &procJSON,
|
|
&e.RetentionPeriod, &e.SecurityMeasures, &e.ControllerName,
|
|
&e.RiskLevel, &answersJSON,
|
|
&e.IsActive, &createdAt, &updatedAt,
|
|
)
|
|
if err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("scanning processing_registry row: %w", err)
|
|
}
|
|
e.CreatedAt = createdAt
|
|
e.UpdatedAt = updatedAt
|
|
|
|
if err := json.Unmarshal(catJSON, &e.DataCategories); err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("parsing data_categories JSON: %w", err)
|
|
}
|
|
if err := json.Unmarshal(recJSON, &e.Recipients); err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("parsing recipients JSON: %w", err)
|
|
}
|
|
if err := json.Unmarshal(procJSON, &e.Processors); err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("parsing processors JSON: %w", err)
|
|
}
|
|
if len(answersJSON) > 0 && string(answersJSON) != "null" {
|
|
if err := json.Unmarshal(answersJSON, &e.AiActAnswers); err != nil {
|
|
return ProcessingEntry{}, fmt.Errorf("parsing ai_act_answers JSON: %w", err)
|
|
}
|
|
}
|
|
return e, nil
|
|
}
|