veylant/internal/routing/pg_store.go
2026-02-23 13:35:04 +01:00

160 lines
4.6 KiB
Go

package routing
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"time"
"go.uber.org/zap"
)
// PgStore implements RuleStore using PostgreSQL (jackc/pgx/v5 via database/sql).
type PgStore struct {
db *sql.DB
logger *zap.Logger
}
// NewPgStore creates a PgStore backed by the given database connection.
func NewPgStore(db *sql.DB, logger *zap.Logger) *PgStore {
return &PgStore{db: db, logger: logger}
}
func (p *PgStore) ListActive(ctx context.Context, tenantID string) ([]RoutingRule, error) {
const q = `
SELECT id, tenant_id, name, COALESCE(description,''), conditions, action, priority, is_enabled, created_at, updated_at
FROM routing_rules
WHERE tenant_id = $1 AND is_enabled = TRUE
ORDER BY priority ASC`
rows, err := p.db.QueryContext(ctx, q, tenantID)
if err != nil {
return nil, fmt.Errorf("routing_rules list: %w", err)
}
defer rows.Close() //nolint:errcheck
var rules []RoutingRule
for rows.Next() {
r, err := scanRule(rows)
if err != nil {
return nil, err
}
rules = append(rules, r)
}
return rules, rows.Err()
}
func (p *PgStore) Get(ctx context.Context, id, tenantID string) (RoutingRule, error) {
const q = `
SELECT id, tenant_id, name, COALESCE(description,''), conditions, action, priority, is_enabled, created_at, updated_at
FROM routing_rules
WHERE id = $1 AND tenant_id = $2`
row := p.db.QueryRowContext(ctx, q, id, tenantID)
r, err := scanRule(row)
if errors.Is(err, sql.ErrNoRows) {
return RoutingRule{}, ErrNotFound
}
return r, err
}
func (p *PgStore) Create(ctx context.Context, rule RoutingRule) (RoutingRule, error) {
condJSON, err := json.Marshal(rule.Conditions)
if err != nil {
return RoutingRule{}, fmt.Errorf("marshal conditions: %w", err)
}
actionJSON, err := json.Marshal(rule.Action)
if err != nil {
return RoutingRule{}, fmt.Errorf("marshal action: %w", err)
}
const q = `
INSERT INTO routing_rules (tenant_id, name, description, conditions, action, priority, is_enabled)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id, tenant_id, name, COALESCE(description,''), conditions, action, priority, is_enabled, created_at, updated_at`
row := p.db.QueryRowContext(ctx, q,
rule.TenantID, rule.Name, rule.Description,
condJSON, actionJSON, rule.Priority, rule.IsEnabled,
)
return scanRule(row)
}
func (p *PgStore) Update(ctx context.Context, rule RoutingRule) (RoutingRule, error) {
condJSON, err := json.Marshal(rule.Conditions)
if err != nil {
return RoutingRule{}, fmt.Errorf("marshal conditions: %w", err)
}
actionJSON, err := json.Marshal(rule.Action)
if err != nil {
return RoutingRule{}, fmt.Errorf("marshal action: %w", err)
}
const q = `
UPDATE routing_rules
SET name=$3, description=$4, conditions=$5, action=$6, priority=$7, is_enabled=$8
WHERE id=$1 AND tenant_id=$2
RETURNING id, tenant_id, name, COALESCE(description,''), conditions, action, priority, is_enabled, created_at, updated_at`
row := p.db.QueryRowContext(ctx, q,
rule.ID, rule.TenantID, rule.Name, rule.Description,
condJSON, actionJSON, rule.Priority, rule.IsEnabled,
)
r, err := scanRule(row)
if errors.Is(err, sql.ErrNoRows) {
return RoutingRule{}, ErrNotFound
}
return r, err
}
func (p *PgStore) Delete(ctx context.Context, id, tenantID string) error {
const q = `DELETE FROM routing_rules WHERE id = $1 AND tenant_id = $2`
res, err := p.db.ExecContext(ctx, q, id, tenantID)
if err != nil {
return fmt.Errorf("routing_rules delete: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
return ErrNotFound
}
return nil
}
// ─── scanner ─────────────────────────────────────────────────────────────────
// scanner is satisfied by both *sql.Row and *sql.Rows.
type scanner interface {
Scan(dest ...interface{}) error
}
func scanRule(s scanner) (RoutingRule, error) {
var (
r RoutingRule
condJSON []byte
actionJSON []byte
createdAt time.Time
updatedAt time.Time
)
err := s.Scan(
&r.ID, &r.TenantID, &r.Name, &r.Description,
&condJSON, &actionJSON,
&r.Priority, &r.IsEnabled,
&createdAt, &updatedAt,
)
if err != nil {
return RoutingRule{}, fmt.Errorf("scanning routing_rule row: %w", err)
}
r.CreatedAt = createdAt
r.UpdatedAt = updatedAt
if err := json.Unmarshal(condJSON, &r.Conditions); err != nil {
return RoutingRule{}, fmt.Errorf("parsing conditions JSON: %w", err)
}
if err := json.Unmarshal(actionJSON, &r.Action); err != nil {
return RoutingRule{}, fmt.Errorf("parsing action JSON: %w", err)
}
return r, nil
}