package compliance import ( "context" "database/sql" "encoding/json" "errors" "fmt" "time" "go.uber.org/zap" ) // PgStore implements ComplianceStore using PostgreSQL. type PgStore struct { db *sql.DB logger *zap.Logger } // NewPgStore creates a PgStore backed by the given database connection. func NewPgStore(db *sql.DB, logger *zap.Logger) *PgStore { return &PgStore{db: db, logger: logger} } func (p *PgStore) List(ctx context.Context, tenantID string) ([]ProcessingEntry, error) { const q = ` SELECT id, tenant_id, use_case_name, legal_basis, purpose, data_categories, recipients, processors, retention_period, COALESCE(security_measures,''), COALESCE(controller_name,''), COALESCE(risk_level,''), ai_act_answers, is_active, created_at, updated_at FROM processing_registry WHERE tenant_id = $1 AND is_active = TRUE ORDER BY created_at DESC` rows, err := p.db.QueryContext(ctx, q, tenantID) if err != nil { return nil, fmt.Errorf("processing_registry list: %w", err) } defer rows.Close() //nolint:errcheck var entries []ProcessingEntry for rows.Next() { e, err := scanEntry(rows) if err != nil { return nil, err } entries = append(entries, e) } return entries, rows.Err() } func (p *PgStore) Get(ctx context.Context, id, tenantID string) (ProcessingEntry, error) { const q = ` SELECT id, tenant_id, use_case_name, legal_basis, purpose, data_categories, recipients, processors, retention_period, COALESCE(security_measures,''), COALESCE(controller_name,''), COALESCE(risk_level,''), ai_act_answers, is_active, created_at, updated_at FROM processing_registry WHERE id = $1 AND tenant_id = $2` row := p.db.QueryRowContext(ctx, q, id, tenantID) e, err := scanEntry(row) if errors.Is(err, sql.ErrNoRows) { return ProcessingEntry{}, ErrNotFound } return e, err } func (p *PgStore) Create(ctx context.Context, entry ProcessingEntry) (ProcessingEntry, error) { catJSON, err := json.Marshal(entry.DataCategories) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal data_categories: %w", err) } recJSON, err := json.Marshal(entry.Recipients) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal recipients: %w", err) } procJSON, err := json.Marshal(entry.Processors) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal processors: %w", err) } var answersJSON []byte if entry.AiActAnswers != nil { answersJSON, err = json.Marshal(entry.AiActAnswers) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal ai_act_answers: %w", err) } } const q = ` INSERT INTO processing_registry (tenant_id, use_case_name, legal_basis, purpose, data_categories, recipients, processors, retention_period, security_measures, controller_name, risk_level, ai_act_answers) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) RETURNING id, tenant_id, use_case_name, legal_basis, purpose, data_categories, recipients, processors, retention_period, COALESCE(security_measures,''), COALESCE(controller_name,''), COALESCE(risk_level,''), ai_act_answers, is_active, created_at, updated_at` nilIfEmpty := func(s string) interface{} { if s == "" { return nil } return s } row := p.db.QueryRowContext(ctx, q, entry.TenantID, entry.UseCaseName, entry.LegalBasis, entry.Purpose, catJSON, recJSON, procJSON, entry.RetentionPeriod, nilIfEmpty(entry.SecurityMeasures), nilIfEmpty(entry.ControllerName), nilIfEmpty(entry.RiskLevel), answersJSON, ) return scanEntry(row) } func (p *PgStore) Update(ctx context.Context, entry ProcessingEntry) (ProcessingEntry, error) { catJSON, err := json.Marshal(entry.DataCategories) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal data_categories: %w", err) } recJSON, err := json.Marshal(entry.Recipients) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal recipients: %w", err) } procJSON, err := json.Marshal(entry.Processors) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal processors: %w", err) } var answersJSON []byte if entry.AiActAnswers != nil { answersJSON, err = json.Marshal(entry.AiActAnswers) if err != nil { return ProcessingEntry{}, fmt.Errorf("marshal ai_act_answers: %w", err) } } nilIfEmpty := func(s string) interface{} { if s == "" { return nil } return s } const q = ` UPDATE processing_registry SET use_case_name=$3, legal_basis=$4, purpose=$5, data_categories=$6, recipients=$7, processors=$8, retention_period=$9, security_measures=$10, controller_name=$11, risk_level=$12, ai_act_answers=$13, updated_at=NOW() WHERE id=$1 AND tenant_id=$2 RETURNING id, tenant_id, use_case_name, legal_basis, purpose, data_categories, recipients, processors, retention_period, COALESCE(security_measures,''), COALESCE(controller_name,''), COALESCE(risk_level,''), ai_act_answers, is_active, created_at, updated_at` row := p.db.QueryRowContext(ctx, q, entry.ID, entry.TenantID, entry.UseCaseName, entry.LegalBasis, entry.Purpose, catJSON, recJSON, procJSON, entry.RetentionPeriod, nilIfEmpty(entry.SecurityMeasures), nilIfEmpty(entry.ControllerName), nilIfEmpty(entry.RiskLevel), answersJSON, ) e, err := scanEntry(row) if errors.Is(err, sql.ErrNoRows) { return ProcessingEntry{}, ErrNotFound } return e, err } func (p *PgStore) Delete(ctx context.Context, id, tenantID string) error { const q = `UPDATE processing_registry SET is_active=FALSE, updated_at=NOW() WHERE id=$1 AND tenant_id=$2` res, err := p.db.ExecContext(ctx, q, id, tenantID) if err != nil { return fmt.Errorf("processing_registry delete: %w", err) } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } // ─── scanner ───────────────────────────────────────────────────────────────── type scanner interface { Scan(dest ...interface{}) error } func scanEntry(s scanner) (ProcessingEntry, error) { var ( e ProcessingEntry catJSON []byte recJSON []byte procJSON []byte answersJSON []byte createdAt time.Time updatedAt time.Time ) err := s.Scan( &e.ID, &e.TenantID, &e.UseCaseName, &e.LegalBasis, &e.Purpose, &catJSON, &recJSON, &procJSON, &e.RetentionPeriod, &e.SecurityMeasures, &e.ControllerName, &e.RiskLevel, &answersJSON, &e.IsActive, &createdAt, &updatedAt, ) if err != nil { return ProcessingEntry{}, fmt.Errorf("scanning processing_registry row: %w", err) } e.CreatedAt = createdAt e.UpdatedAt = updatedAt if err := json.Unmarshal(catJSON, &e.DataCategories); err != nil { return ProcessingEntry{}, fmt.Errorf("parsing data_categories JSON: %w", err) } if err := json.Unmarshal(recJSON, &e.Recipients); err != nil { return ProcessingEntry{}, fmt.Errorf("parsing recipients JSON: %w", err) } if err := json.Unmarshal(procJSON, &e.Processors); err != nil { return ProcessingEntry{}, fmt.Errorf("parsing processors JSON: %w", err) } if len(answersJSON) > 0 && string(answersJSON) != "null" { if err := json.Unmarshal(answersJSON, &e.AiActAnswers); err != nil { return ProcessingEntry{}, fmt.Errorf("parsing ai_act_answers JSON: %w", err) } } return e, nil }