veylant/internal/admin/handler.go
2026-03-13 12:43:20 +01:00

579 lines
18 KiB
Go

// Package admin provides HTTP handlers for the routing rules management API.
// All endpoints require an authenticated JWT; tenantID is always derived from
// the token claims — it is never accepted from the request body.
package admin
import (
"database/sql"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"go.uber.org/zap"
"github.com/veylant/ia-gateway/internal/apierror"
"github.com/veylant/ia-gateway/internal/auditlog"
"github.com/veylant/ia-gateway/internal/circuitbreaker"
"github.com/veylant/ia-gateway/internal/crypto"
"github.com/veylant/ia-gateway/internal/flags"
"github.com/veylant/ia-gateway/internal/middleware"
"github.com/veylant/ia-gateway/internal/ratelimit"
"github.com/veylant/ia-gateway/internal/routing"
)
// ProviderRouter is the subset of router.Router used by the admin handler.
// Defined as an interface to avoid an import cycle.
type ProviderRouter interface {
ProviderStatuses() []circuitbreaker.Status
}
// Handler provides CRUD endpoints for routing rules, template seeding,
// read-only access to audit logs and cost aggregations, user management,
// provider circuit breaker status, rate limit configuration, and feature flags.
type Handler struct {
store routing.RuleStore
cache *routing.RuleCache
auditLogger auditlog.Logger // nil = logs/costs endpoints return 501
db *sql.DB // nil = users endpoints return 501
router ProviderRouter // nil = providers/status returns 501
adapterRouter ProviderAdapterRouter // nil = provider CRUD hot-reload disabled
encryptor *crypto.Encryptor // nil = API keys stored in plaintext (dev only)
rateLimiter *ratelimit.Limiter // nil = rate-limits endpoints return 501
rlStore *ratelimit.Store // nil if db is nil
flagStore flags.FlagStore // nil = flags endpoints return 501
logger *zap.Logger
}
// New creates a Handler.
// - store: underlying rule persistence (PgStore or MemStore for tests).
// - cache: engine cache to invalidate after mutations.
func New(store routing.RuleStore, cache *routing.RuleCache, logger *zap.Logger) *Handler {
return &Handler{store: store, cache: cache, logger: logger}
}
// NewWithAudit creates a Handler with audit log query support.
func NewWithAudit(store routing.RuleStore, cache *routing.RuleCache, al auditlog.Logger, logger *zap.Logger) *Handler {
return &Handler{store: store, cache: cache, auditLogger: al, logger: logger}
}
// WithDB adds database support for user management.
func (h *Handler) WithDB(db *sql.DB) *Handler {
h.db = db
return h
}
// WithRouter adds provider router for circuit breaker status.
func (h *Handler) WithRouter(r ProviderRouter) *Handler {
h.router = r
// If the router also supports adapter hot-reload, wire it up.
if ar, ok := r.(ProviderAdapterRouter); ok {
h.adapterRouter = ar
}
return h
}
// WithEncryptor adds an AES-256-GCM encryptor for provider API key storage.
func (h *Handler) WithEncryptor(enc *crypto.Encryptor) *Handler {
h.encryptor = enc
return h
}
// WithRateLimiter adds the in-process rate limiter and its PostgreSQL store
// so the admin API can manage per-tenant limits at runtime.
func (h *Handler) WithRateLimiter(rl *ratelimit.Limiter) *Handler {
h.rateLimiter = rl
if h.db != nil {
h.rlStore = ratelimit.NewStore(h.db, h.logger)
}
return h
}
// WithFlagStore adds a feature flag store so the admin API can manage
// feature flags per tenant (E11-07).
func (h *Handler) WithFlagStore(fs flags.FlagStore) *Handler {
h.flagStore = fs
return h
}
// Routes registers all admin endpoints on r.
// Callers are responsible for mounting r under an authenticated prefix.
func (h *Handler) Routes(r chi.Router) {
r.Get("/policies", h.listPolicies)
r.Post("/policies", h.createPolicy)
r.Get("/policies/{id}", h.getPolicy)
r.Put("/policies/{id}", h.updatePolicy)
r.Delete("/policies/{id}", h.deletePolicy)
r.Post("/policies/seed/{template}", h.seedTemplate)
r.Get("/logs", h.getLogs)
r.Get("/costs", h.getCosts)
// User management (E3-08).
r.Get("/users", h.listUsers)
r.Post("/users", h.createUser)
r.Get("/users/{id}", h.getUser)
r.Put("/users/{id}", h.updateUser)
r.Delete("/users/{id}", h.deleteUser)
// Provider circuit breaker status (E2-09 / E2-10).
r.Get("/providers/status", h.getProviderStatus)
// Provider config CRUD — stored in database, API keys encrypted at rest.
r.Get("/providers", h.listProviderConfigs)
r.Post("/providers", h.createProviderConfig)
r.Put("/providers/{id}", h.updateProviderConfig)
r.Delete("/providers/{id}", h.deleteProviderConfig)
r.Post("/providers/{id}/test", h.testProviderConfig)
// Rate limit configuration (E10-09).
r.Get("/rate-limits", h.listRateLimits)
r.Get("/rate-limits/{tenant_id}", h.getRateLimit)
r.Put("/rate-limits/{tenant_id}", h.upsertRateLimit)
r.Delete("/rate-limits/{tenant_id}", h.deleteRateLimit)
// Feature flags management (E11-07).
r.Get("/flags", h.listFlags)
r.Put("/flags/{name}", h.upsertFlag)
r.Delete("/flags/{name}", h.deleteFlag)
}
// ─── List ─────────────────────────────────────────────────────────────────────
func (h *Handler) listPolicies(w http.ResponseWriter, r *http.Request) {
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
rules, err := h.store.ListActive(r.Context(), tenantID)
if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to list policies: "+err.Error()))
return
}
writeJSON(w, http.StatusOK, map[string]interface{}{"data": rules})
}
// ─── Create ───────────────────────────────────────────────────────────────────
type createPolicyRequest struct {
Name string `json:"name"`
Description string `json:"description"`
Priority int `json:"priority"`
IsEnabled bool `json:"is_enabled"`
Conditions []routing.Condition `json:"conditions"`
Action routing.Action `json:"action"`
}
func (h *Handler) createPolicy(w http.ResponseWriter, r *http.Request) {
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
var req createPolicyRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
apierror.WriteError(w, apierror.NewBadRequestError("invalid JSON: "+err.Error()))
return
}
if err := validatePolicy(req.Name, req.Action, req.Conditions); err != nil {
apierror.WriteError(w, apierror.NewBadRequestError(err.Error()))
return
}
rule := routing.RoutingRule{
TenantID: tenantID,
Name: req.Name,
Description: req.Description,
Priority: req.Priority,
IsEnabled: req.IsEnabled,
Conditions: req.Conditions,
Action: req.Action,
}
if rule.Priority == 0 {
rule.Priority = 100
}
created, err := h.store.Create(r.Context(), rule)
if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to create policy: "+err.Error()))
return
}
h.cache.Invalidate(tenantID)
h.logger.Info("routing policy created",
zap.String("id", created.ID),
zap.String("tenant_id", tenantID),
)
writeJSON(w, http.StatusCreated, created)
}
// ─── Get ──────────────────────────────────────────────────────────────────────
func (h *Handler) getPolicy(w http.ResponseWriter, r *http.Request) {
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
id := chi.URLParam(r, "id")
rule, err := h.store.Get(r.Context(), id, tenantID)
if err != nil {
writeStoreError(w, err)
return
}
writeJSON(w, http.StatusOK, rule)
}
// ─── Update ───────────────────────────────────────────────────────────────────
func (h *Handler) updatePolicy(w http.ResponseWriter, r *http.Request) {
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
id := chi.URLParam(r, "id")
var req createPolicyRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
apierror.WriteError(w, apierror.NewBadRequestError("invalid JSON: "+err.Error()))
return
}
if err := validatePolicy(req.Name, req.Action, req.Conditions); err != nil {
apierror.WriteError(w, apierror.NewBadRequestError(err.Error()))
return
}
rule := routing.RoutingRule{
ID: id,
TenantID: tenantID,
Name: req.Name,
Description: req.Description,
Priority: req.Priority,
IsEnabled: req.IsEnabled,
Conditions: req.Conditions,
Action: req.Action,
}
updated, err := h.store.Update(r.Context(), rule)
if err != nil {
writeStoreError(w, err)
return
}
h.cache.Invalidate(tenantID)
h.logger.Info("routing policy updated",
zap.String("id", id),
zap.String("tenant_id", tenantID),
)
writeJSON(w, http.StatusOK, updated)
}
// ─── Delete ───────────────────────────────────────────────────────────────────
func (h *Handler) deletePolicy(w http.ResponseWriter, r *http.Request) {
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
id := chi.URLParam(r, "id")
if err := h.store.Delete(r.Context(), id, tenantID); err != nil {
writeStoreError(w, err)
return
}
h.cache.Invalidate(tenantID)
h.logger.Info("routing policy deleted",
zap.String("id", id),
zap.String("tenant_id", tenantID),
)
w.WriteHeader(http.StatusNoContent)
}
// ─── Seed template ────────────────────────────────────────────────────────────
func (h *Handler) seedTemplate(w http.ResponseWriter, r *http.Request) {
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
name := chi.URLParam(r, "template")
factory, exists := routing.Templates[name]
if !exists {
apierror.WriteError(w, apierror.NewBadRequestError(
"unknown template "+strQuote(name)+"; valid templates: hr, finance, engineering, catchall",
))
return
}
rule := factory(tenantID)
created, err := h.store.Create(r.Context(), rule)
if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to seed template: "+err.Error()))
return
}
h.cache.Invalidate(tenantID)
h.logger.Info("routing template seeded",
zap.String("template", name),
zap.String("tenant_id", tenantID),
zap.String("rule_id", created.ID),
)
writeJSON(w, http.StatusCreated, created)
}
// ─── Audit logs (E7-06) ───────────────────────────────────────────────────────
func (h *Handler) getLogs(w http.ResponseWriter, r *http.Request) {
if h.auditLogger == nil {
apierror.WriteError(w, &apierror.APIError{
Type: "not_implemented", Message: "audit logging not enabled", HTTPStatus: http.StatusNotImplemented,
})
return
}
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
q := auditlog.AuditQuery{
TenantID: tenantID,
Provider: r.URL.Query().Get("provider"),
MinSensitivity: r.URL.Query().Get("min_sensitivity"),
Limit: parseIntParam(r, "limit", 50),
Offset: parseIntParam(r, "offset", 0),
}
// Accept both RFC3339Nano (JavaScript toISOString: "2026-03-10T11:30:00.000Z")
// and RFC3339 (API clients without sub-second precision).
parseTime := func(s string) time.Time {
for _, layout := range []string{time.RFC3339Nano, time.RFC3339} {
if t, err := time.Parse(layout, s); err == nil {
return t
}
}
return time.Time{}
}
if s := r.URL.Query().Get("start"); s != "" {
if t := parseTime(s); !t.IsZero() {
q.StartTime = t
}
}
if s := r.URL.Query().Get("end"); s != "" {
if t := parseTime(s); !t.IsZero() {
q.EndTime = t
}
}
result, err := h.auditLogger.Query(r.Context(), q)
if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to query logs: "+err.Error()))
return
}
writeJSON(w, http.StatusOK, result)
}
// ─── Costs (E7-07) ───────────────────────────────────────────────────────────
func (h *Handler) getCosts(w http.ResponseWriter, r *http.Request) {
if h.auditLogger == nil {
apierror.WriteError(w, &apierror.APIError{
Type: "not_implemented", Message: "audit logging not enabled", HTTPStatus: http.StatusNotImplemented,
})
return
}
tenantID, ok := tenantFromCtx(w, r)
if !ok {
return
}
q := auditlog.CostQuery{
TenantID: tenantID,
GroupBy: r.URL.Query().Get("group_by"),
}
parseTime := func(s string) time.Time {
for _, layout := range []string{time.RFC3339Nano, time.RFC3339} {
if t, err := time.Parse(layout, s); err == nil {
return t
}
}
return time.Time{}
}
if s := r.URL.Query().Get("start"); s != "" {
if t := parseTime(s); !t.IsZero() {
q.StartTime = t
}
}
if s := r.URL.Query().Get("end"); s != "" {
if t := parseTime(s); !t.IsZero() {
q.EndTime = t
}
}
result, err := h.auditLogger.QueryCosts(r.Context(), q)
if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to query costs: "+err.Error()))
return
}
writeJSON(w, http.StatusOK, result)
}
// ─── Rate limits (E10-09) ─────────────────────────────────────────────────────
func (h *Handler) rateLimitNotEnabled(w http.ResponseWriter) bool {
if h.rateLimiter == nil || h.rlStore == nil {
apierror.WriteError(w, &apierror.APIError{
Type: "not_implemented",
Message: "rate limiting not enabled",
HTTPStatus: http.StatusNotImplemented,
})
return true
}
return false
}
func (h *Handler) listRateLimits(w http.ResponseWriter, r *http.Request) {
if h.rateLimitNotEnabled(w) {
return
}
cfgs := h.rateLimiter.ListConfigs()
writeJSON(w, http.StatusOK, map[string]interface{}{"data": cfgs})
}
func (h *Handler) getRateLimit(w http.ResponseWriter, r *http.Request) {
if h.rateLimitNotEnabled(w) {
return
}
tenantID := chi.URLParam(r, "tenant_id")
cfg, err := h.rlStore.Get(r.Context(), tenantID)
if err == ratelimit.ErrNotFound {
// Return effective config (which may be the default).
cfg = h.rateLimiter.GetConfig(tenantID)
writeJSON(w, http.StatusOK, cfg)
return
}
if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to get rate limit: "+err.Error()))
return
}
writeJSON(w, http.StatusOK, cfg)
}
type rateLimitRequest struct {
RequestsPerMin int `json:"requests_per_min"`
BurstSize int `json:"burst_size"`
UserRPM int `json:"user_rpm"`
UserBurst int `json:"user_burst"`
IsEnabled bool `json:"is_enabled"`
}
func (h *Handler) upsertRateLimit(w http.ResponseWriter, r *http.Request) {
if h.rateLimitNotEnabled(w) {
return
}
tenantID := chi.URLParam(r, "tenant_id")
var req rateLimitRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
apierror.WriteError(w, apierror.NewBadRequestError("invalid JSON: "+err.Error()))
return
}
cfg := ratelimit.RateLimitConfig{
TenantID: tenantID,
RequestsPerMin: req.RequestsPerMin,
BurstSize: req.BurstSize,
UserRPM: req.UserRPM,
UserBurst: req.UserBurst,
IsEnabled: req.IsEnabled,
}
saved, err := h.rlStore.Upsert(r.Context(), cfg)
if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to upsert rate limit: "+err.Error()))
return
}
// Apply immediately to the in-process limiter without restart.
h.rateLimiter.SetConfig(saved)
h.logger.Info("rate limit config updated",
zap.String("tenant_id", tenantID),
zap.Int("rpm", saved.RequestsPerMin),
)
writeJSON(w, http.StatusOK, saved)
}
func (h *Handler) deleteRateLimit(w http.ResponseWriter, r *http.Request) {
if h.rateLimitNotEnabled(w) {
return
}
tenantID := chi.URLParam(r, "tenant_id")
if err := h.rlStore.Delete(r.Context(), tenantID); err == ratelimit.ErrNotFound {
apierror.WriteError(w, &apierror.APIError{
Type: "not_found_error",
Message: "rate limit config not found",
HTTPStatus: http.StatusNotFound,
})
return
} else if err != nil {
apierror.WriteError(w, apierror.NewUpstreamError("failed to delete rate limit: "+err.Error()))
return
}
h.rateLimiter.DeleteConfig(tenantID)
h.logger.Info("rate limit config deleted", zap.String("tenant_id", tenantID))
w.WriteHeader(http.StatusNoContent)
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
// tenantFromCtx extracts the tenantID from JWT claims in the context.
// It writes a 401 and returns false if no claims are present.
func tenantFromCtx(w http.ResponseWriter, r *http.Request) (string, bool) {
claims, ok := middleware.ClaimsFromContext(r.Context())
if !ok || claims.TenantID == "" {
apierror.WriteError(w, apierror.NewAuthError("missing authentication"))
return "", false
}
return claims.TenantID, true
}
// validatePolicy performs basic validation on name, action provider, and conditions.
func validatePolicy(name string, action routing.Action, conditions []routing.Condition) error {
if name == "" {
return fmt.Errorf("name is required")
}
if action.Provider == "" {
return fmt.Errorf("action.provider is required")
}
return routing.ValidateConditions(conditions)
}
// writeStoreError maps routing.ErrNotFound to 404, other errors to 502.
func writeStoreError(w http.ResponseWriter, err error) {
if err == routing.ErrNotFound {
apierror.WriteError(w, &apierror.APIError{
Type: "not_found_error",
Message: "policy not found",
HTTPStatus: http.StatusNotFound,
})
return
}
apierror.WriteError(w, apierror.NewUpstreamError(err.Error()))
}
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
func strQuote(s string) string { return `"` + s + `"` }
func parseIntParam(r *http.Request, key string, defaultVal int) int {
s := r.URL.Query().Get(key)
if s == "" {
return defaultVal
}
v, err := strconv.Atoi(s)
if err != nil || v < 0 {
return defaultVal
}
return v
}