579 lines
18 KiB
Go
579 lines
18 KiB
Go
// Package admin provides HTTP handlers for the routing rules management API.
|
|
// All endpoints require an authenticated JWT; tenantID is always derived from
|
|
// the token claims — it is never accepted from the request body.
|
|
package admin
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/veylant/ia-gateway/internal/apierror"
|
|
"github.com/veylant/ia-gateway/internal/auditlog"
|
|
"github.com/veylant/ia-gateway/internal/circuitbreaker"
|
|
"github.com/veylant/ia-gateway/internal/crypto"
|
|
"github.com/veylant/ia-gateway/internal/flags"
|
|
"github.com/veylant/ia-gateway/internal/middleware"
|
|
"github.com/veylant/ia-gateway/internal/ratelimit"
|
|
"github.com/veylant/ia-gateway/internal/routing"
|
|
)
|
|
|
|
// ProviderRouter is the subset of router.Router used by the admin handler.
|
|
// Defined as an interface to avoid an import cycle.
|
|
type ProviderRouter interface {
|
|
ProviderStatuses() []circuitbreaker.Status
|
|
}
|
|
|
|
// Handler provides CRUD endpoints for routing rules, template seeding,
|
|
// read-only access to audit logs and cost aggregations, user management,
|
|
// provider circuit breaker status, rate limit configuration, and feature flags.
|
|
type Handler struct {
|
|
store routing.RuleStore
|
|
cache *routing.RuleCache
|
|
auditLogger auditlog.Logger // nil = logs/costs endpoints return 501
|
|
db *sql.DB // nil = users endpoints return 501
|
|
router ProviderRouter // nil = providers/status returns 501
|
|
adapterRouter ProviderAdapterRouter // nil = provider CRUD hot-reload disabled
|
|
encryptor *crypto.Encryptor // nil = API keys stored in plaintext (dev only)
|
|
rateLimiter *ratelimit.Limiter // nil = rate-limits endpoints return 501
|
|
rlStore *ratelimit.Store // nil if db is nil
|
|
flagStore flags.FlagStore // nil = flags endpoints return 501
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// New creates a Handler.
|
|
// - store: underlying rule persistence (PgStore or MemStore for tests).
|
|
// - cache: engine cache to invalidate after mutations.
|
|
func New(store routing.RuleStore, cache *routing.RuleCache, logger *zap.Logger) *Handler {
|
|
return &Handler{store: store, cache: cache, logger: logger}
|
|
}
|
|
|
|
// NewWithAudit creates a Handler with audit log query support.
|
|
func NewWithAudit(store routing.RuleStore, cache *routing.RuleCache, al auditlog.Logger, logger *zap.Logger) *Handler {
|
|
return &Handler{store: store, cache: cache, auditLogger: al, logger: logger}
|
|
}
|
|
|
|
// WithDB adds database support for user management.
|
|
func (h *Handler) WithDB(db *sql.DB) *Handler {
|
|
h.db = db
|
|
return h
|
|
}
|
|
|
|
// WithRouter adds provider router for circuit breaker status.
|
|
func (h *Handler) WithRouter(r ProviderRouter) *Handler {
|
|
h.router = r
|
|
// If the router also supports adapter hot-reload, wire it up.
|
|
if ar, ok := r.(ProviderAdapterRouter); ok {
|
|
h.adapterRouter = ar
|
|
}
|
|
return h
|
|
}
|
|
|
|
// WithEncryptor adds an AES-256-GCM encryptor for provider API key storage.
|
|
func (h *Handler) WithEncryptor(enc *crypto.Encryptor) *Handler {
|
|
h.encryptor = enc
|
|
return h
|
|
}
|
|
|
|
// WithRateLimiter adds the in-process rate limiter and its PostgreSQL store
|
|
// so the admin API can manage per-tenant limits at runtime.
|
|
func (h *Handler) WithRateLimiter(rl *ratelimit.Limiter) *Handler {
|
|
h.rateLimiter = rl
|
|
if h.db != nil {
|
|
h.rlStore = ratelimit.NewStore(h.db, h.logger)
|
|
}
|
|
return h
|
|
}
|
|
|
|
// WithFlagStore adds a feature flag store so the admin API can manage
|
|
// feature flags per tenant (E11-07).
|
|
func (h *Handler) WithFlagStore(fs flags.FlagStore) *Handler {
|
|
h.flagStore = fs
|
|
return h
|
|
}
|
|
|
|
// Routes registers all admin endpoints on r.
|
|
// Callers are responsible for mounting r under an authenticated prefix.
|
|
func (h *Handler) Routes(r chi.Router) {
|
|
r.Get("/policies", h.listPolicies)
|
|
r.Post("/policies", h.createPolicy)
|
|
r.Get("/policies/{id}", h.getPolicy)
|
|
r.Put("/policies/{id}", h.updatePolicy)
|
|
r.Delete("/policies/{id}", h.deletePolicy)
|
|
r.Post("/policies/seed/{template}", h.seedTemplate)
|
|
|
|
r.Get("/logs", h.getLogs)
|
|
r.Get("/costs", h.getCosts)
|
|
|
|
// User management (E3-08).
|
|
r.Get("/users", h.listUsers)
|
|
r.Post("/users", h.createUser)
|
|
r.Get("/users/{id}", h.getUser)
|
|
r.Put("/users/{id}", h.updateUser)
|
|
r.Delete("/users/{id}", h.deleteUser)
|
|
|
|
// Provider circuit breaker status (E2-09 / E2-10).
|
|
r.Get("/providers/status", h.getProviderStatus)
|
|
|
|
// Provider config CRUD — stored in database, API keys encrypted at rest.
|
|
r.Get("/providers", h.listProviderConfigs)
|
|
r.Post("/providers", h.createProviderConfig)
|
|
r.Put("/providers/{id}", h.updateProviderConfig)
|
|
r.Delete("/providers/{id}", h.deleteProviderConfig)
|
|
r.Post("/providers/{id}/test", h.testProviderConfig)
|
|
|
|
// Rate limit configuration (E10-09).
|
|
r.Get("/rate-limits", h.listRateLimits)
|
|
r.Get("/rate-limits/{tenant_id}", h.getRateLimit)
|
|
r.Put("/rate-limits/{tenant_id}", h.upsertRateLimit)
|
|
r.Delete("/rate-limits/{tenant_id}", h.deleteRateLimit)
|
|
|
|
// Feature flags management (E11-07).
|
|
r.Get("/flags", h.listFlags)
|
|
r.Put("/flags/{name}", h.upsertFlag)
|
|
r.Delete("/flags/{name}", h.deleteFlag)
|
|
}
|
|
|
|
// ─── List ─────────────────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) listPolicies(w http.ResponseWriter, r *http.Request) {
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
rules, err := h.store.ListActive(r.Context(), tenantID)
|
|
if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to list policies: "+err.Error()))
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{"data": rules})
|
|
}
|
|
|
|
// ─── Create ───────────────────────────────────────────────────────────────────
|
|
|
|
type createPolicyRequest struct {
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
Priority int `json:"priority"`
|
|
IsEnabled bool `json:"is_enabled"`
|
|
Conditions []routing.Condition `json:"conditions"`
|
|
Action routing.Action `json:"action"`
|
|
}
|
|
|
|
func (h *Handler) createPolicy(w http.ResponseWriter, r *http.Request) {
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var req createPolicyRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
apierror.WriteError(w, apierror.NewBadRequestError("invalid JSON: "+err.Error()))
|
|
return
|
|
}
|
|
if err := validatePolicy(req.Name, req.Action, req.Conditions); err != nil {
|
|
apierror.WriteError(w, apierror.NewBadRequestError(err.Error()))
|
|
return
|
|
}
|
|
|
|
rule := routing.RoutingRule{
|
|
TenantID: tenantID,
|
|
Name: req.Name,
|
|
Description: req.Description,
|
|
Priority: req.Priority,
|
|
IsEnabled: req.IsEnabled,
|
|
Conditions: req.Conditions,
|
|
Action: req.Action,
|
|
}
|
|
if rule.Priority == 0 {
|
|
rule.Priority = 100
|
|
}
|
|
|
|
created, err := h.store.Create(r.Context(), rule)
|
|
if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to create policy: "+err.Error()))
|
|
return
|
|
}
|
|
|
|
h.cache.Invalidate(tenantID)
|
|
h.logger.Info("routing policy created",
|
|
zap.String("id", created.ID),
|
|
zap.String("tenant_id", tenantID),
|
|
)
|
|
writeJSON(w, http.StatusCreated, created)
|
|
}
|
|
|
|
// ─── Get ──────────────────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) getPolicy(w http.ResponseWriter, r *http.Request) {
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
id := chi.URLParam(r, "id")
|
|
rule, err := h.store.Get(r.Context(), id, tenantID)
|
|
if err != nil {
|
|
writeStoreError(w, err)
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, rule)
|
|
}
|
|
|
|
// ─── Update ───────────────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) updatePolicy(w http.ResponseWriter, r *http.Request) {
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
id := chi.URLParam(r, "id")
|
|
|
|
var req createPolicyRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
apierror.WriteError(w, apierror.NewBadRequestError("invalid JSON: "+err.Error()))
|
|
return
|
|
}
|
|
if err := validatePolicy(req.Name, req.Action, req.Conditions); err != nil {
|
|
apierror.WriteError(w, apierror.NewBadRequestError(err.Error()))
|
|
return
|
|
}
|
|
|
|
rule := routing.RoutingRule{
|
|
ID: id,
|
|
TenantID: tenantID,
|
|
Name: req.Name,
|
|
Description: req.Description,
|
|
Priority: req.Priority,
|
|
IsEnabled: req.IsEnabled,
|
|
Conditions: req.Conditions,
|
|
Action: req.Action,
|
|
}
|
|
updated, err := h.store.Update(r.Context(), rule)
|
|
if err != nil {
|
|
writeStoreError(w, err)
|
|
return
|
|
}
|
|
|
|
h.cache.Invalidate(tenantID)
|
|
h.logger.Info("routing policy updated",
|
|
zap.String("id", id),
|
|
zap.String("tenant_id", tenantID),
|
|
)
|
|
writeJSON(w, http.StatusOK, updated)
|
|
}
|
|
|
|
// ─── Delete ───────────────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) deletePolicy(w http.ResponseWriter, r *http.Request) {
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
id := chi.URLParam(r, "id")
|
|
if err := h.store.Delete(r.Context(), id, tenantID); err != nil {
|
|
writeStoreError(w, err)
|
|
return
|
|
}
|
|
|
|
h.cache.Invalidate(tenantID)
|
|
h.logger.Info("routing policy deleted",
|
|
zap.String("id", id),
|
|
zap.String("tenant_id", tenantID),
|
|
)
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// ─── Seed template ────────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) seedTemplate(w http.ResponseWriter, r *http.Request) {
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
name := chi.URLParam(r, "template")
|
|
factory, exists := routing.Templates[name]
|
|
if !exists {
|
|
apierror.WriteError(w, apierror.NewBadRequestError(
|
|
"unknown template "+strQuote(name)+"; valid templates: hr, finance, engineering, catchall",
|
|
))
|
|
return
|
|
}
|
|
|
|
rule := factory(tenantID)
|
|
created, err := h.store.Create(r.Context(), rule)
|
|
if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to seed template: "+err.Error()))
|
|
return
|
|
}
|
|
|
|
h.cache.Invalidate(tenantID)
|
|
h.logger.Info("routing template seeded",
|
|
zap.String("template", name),
|
|
zap.String("tenant_id", tenantID),
|
|
zap.String("rule_id", created.ID),
|
|
)
|
|
writeJSON(w, http.StatusCreated, created)
|
|
}
|
|
|
|
// ─── Audit logs (E7-06) ───────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) getLogs(w http.ResponseWriter, r *http.Request) {
|
|
if h.auditLogger == nil {
|
|
apierror.WriteError(w, &apierror.APIError{
|
|
Type: "not_implemented", Message: "audit logging not enabled", HTTPStatus: http.StatusNotImplemented,
|
|
})
|
|
return
|
|
}
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
q := auditlog.AuditQuery{
|
|
TenantID: tenantID,
|
|
Provider: r.URL.Query().Get("provider"),
|
|
MinSensitivity: r.URL.Query().Get("min_sensitivity"),
|
|
Limit: parseIntParam(r, "limit", 50),
|
|
Offset: parseIntParam(r, "offset", 0),
|
|
}
|
|
// Accept both RFC3339Nano (JavaScript toISOString: "2026-03-10T11:30:00.000Z")
|
|
// and RFC3339 (API clients without sub-second precision).
|
|
parseTime := func(s string) time.Time {
|
|
for _, layout := range []string{time.RFC3339Nano, time.RFC3339} {
|
|
if t, err := time.Parse(layout, s); err == nil {
|
|
return t
|
|
}
|
|
}
|
|
return time.Time{}
|
|
}
|
|
if s := r.URL.Query().Get("start"); s != "" {
|
|
if t := parseTime(s); !t.IsZero() {
|
|
q.StartTime = t
|
|
}
|
|
}
|
|
if s := r.URL.Query().Get("end"); s != "" {
|
|
if t := parseTime(s); !t.IsZero() {
|
|
q.EndTime = t
|
|
}
|
|
}
|
|
|
|
result, err := h.auditLogger.Query(r.Context(), q)
|
|
if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to query logs: "+err.Error()))
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, result)
|
|
}
|
|
|
|
// ─── Costs (E7-07) ───────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) getCosts(w http.ResponseWriter, r *http.Request) {
|
|
if h.auditLogger == nil {
|
|
apierror.WriteError(w, &apierror.APIError{
|
|
Type: "not_implemented", Message: "audit logging not enabled", HTTPStatus: http.StatusNotImplemented,
|
|
})
|
|
return
|
|
}
|
|
tenantID, ok := tenantFromCtx(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
q := auditlog.CostQuery{
|
|
TenantID: tenantID,
|
|
GroupBy: r.URL.Query().Get("group_by"),
|
|
}
|
|
parseTime := func(s string) time.Time {
|
|
for _, layout := range []string{time.RFC3339Nano, time.RFC3339} {
|
|
if t, err := time.Parse(layout, s); err == nil {
|
|
return t
|
|
}
|
|
}
|
|
return time.Time{}
|
|
}
|
|
if s := r.URL.Query().Get("start"); s != "" {
|
|
if t := parseTime(s); !t.IsZero() {
|
|
q.StartTime = t
|
|
}
|
|
}
|
|
if s := r.URL.Query().Get("end"); s != "" {
|
|
if t := parseTime(s); !t.IsZero() {
|
|
q.EndTime = t
|
|
}
|
|
}
|
|
|
|
result, err := h.auditLogger.QueryCosts(r.Context(), q)
|
|
if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to query costs: "+err.Error()))
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, result)
|
|
}
|
|
|
|
// ─── Rate limits (E10-09) ─────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) rateLimitNotEnabled(w http.ResponseWriter) bool {
|
|
if h.rateLimiter == nil || h.rlStore == nil {
|
|
apierror.WriteError(w, &apierror.APIError{
|
|
Type: "not_implemented",
|
|
Message: "rate limiting not enabled",
|
|
HTTPStatus: http.StatusNotImplemented,
|
|
})
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (h *Handler) listRateLimits(w http.ResponseWriter, r *http.Request) {
|
|
if h.rateLimitNotEnabled(w) {
|
|
return
|
|
}
|
|
cfgs := h.rateLimiter.ListConfigs()
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{"data": cfgs})
|
|
}
|
|
|
|
func (h *Handler) getRateLimit(w http.ResponseWriter, r *http.Request) {
|
|
if h.rateLimitNotEnabled(w) {
|
|
return
|
|
}
|
|
tenantID := chi.URLParam(r, "tenant_id")
|
|
cfg, err := h.rlStore.Get(r.Context(), tenantID)
|
|
if err == ratelimit.ErrNotFound {
|
|
// Return effective config (which may be the default).
|
|
cfg = h.rateLimiter.GetConfig(tenantID)
|
|
writeJSON(w, http.StatusOK, cfg)
|
|
return
|
|
}
|
|
if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to get rate limit: "+err.Error()))
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, cfg)
|
|
}
|
|
|
|
type rateLimitRequest struct {
|
|
RequestsPerMin int `json:"requests_per_min"`
|
|
BurstSize int `json:"burst_size"`
|
|
UserRPM int `json:"user_rpm"`
|
|
UserBurst int `json:"user_burst"`
|
|
IsEnabled bool `json:"is_enabled"`
|
|
}
|
|
|
|
func (h *Handler) upsertRateLimit(w http.ResponseWriter, r *http.Request) {
|
|
if h.rateLimitNotEnabled(w) {
|
|
return
|
|
}
|
|
tenantID := chi.URLParam(r, "tenant_id")
|
|
|
|
var req rateLimitRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
apierror.WriteError(w, apierror.NewBadRequestError("invalid JSON: "+err.Error()))
|
|
return
|
|
}
|
|
|
|
cfg := ratelimit.RateLimitConfig{
|
|
TenantID: tenantID,
|
|
RequestsPerMin: req.RequestsPerMin,
|
|
BurstSize: req.BurstSize,
|
|
UserRPM: req.UserRPM,
|
|
UserBurst: req.UserBurst,
|
|
IsEnabled: req.IsEnabled,
|
|
}
|
|
saved, err := h.rlStore.Upsert(r.Context(), cfg)
|
|
if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to upsert rate limit: "+err.Error()))
|
|
return
|
|
}
|
|
// Apply immediately to the in-process limiter without restart.
|
|
h.rateLimiter.SetConfig(saved)
|
|
h.logger.Info("rate limit config updated",
|
|
zap.String("tenant_id", tenantID),
|
|
zap.Int("rpm", saved.RequestsPerMin),
|
|
)
|
|
writeJSON(w, http.StatusOK, saved)
|
|
}
|
|
|
|
func (h *Handler) deleteRateLimit(w http.ResponseWriter, r *http.Request) {
|
|
if h.rateLimitNotEnabled(w) {
|
|
return
|
|
}
|
|
tenantID := chi.URLParam(r, "tenant_id")
|
|
if err := h.rlStore.Delete(r.Context(), tenantID); err == ratelimit.ErrNotFound {
|
|
apierror.WriteError(w, &apierror.APIError{
|
|
Type: "not_found_error",
|
|
Message: "rate limit config not found",
|
|
HTTPStatus: http.StatusNotFound,
|
|
})
|
|
return
|
|
} else if err != nil {
|
|
apierror.WriteError(w, apierror.NewUpstreamError("failed to delete rate limit: "+err.Error()))
|
|
return
|
|
}
|
|
h.rateLimiter.DeleteConfig(tenantID)
|
|
h.logger.Info("rate limit config deleted", zap.String("tenant_id", tenantID))
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
// tenantFromCtx extracts the tenantID from JWT claims in the context.
|
|
// It writes a 401 and returns false if no claims are present.
|
|
func tenantFromCtx(w http.ResponseWriter, r *http.Request) (string, bool) {
|
|
claims, ok := middleware.ClaimsFromContext(r.Context())
|
|
if !ok || claims.TenantID == "" {
|
|
apierror.WriteError(w, apierror.NewAuthError("missing authentication"))
|
|
return "", false
|
|
}
|
|
return claims.TenantID, true
|
|
}
|
|
|
|
// validatePolicy performs basic validation on name, action provider, and conditions.
|
|
func validatePolicy(name string, action routing.Action, conditions []routing.Condition) error {
|
|
if name == "" {
|
|
return fmt.Errorf("name is required")
|
|
}
|
|
if action.Provider == "" {
|
|
return fmt.Errorf("action.provider is required")
|
|
}
|
|
return routing.ValidateConditions(conditions)
|
|
}
|
|
|
|
// writeStoreError maps routing.ErrNotFound to 404, other errors to 502.
|
|
func writeStoreError(w http.ResponseWriter, err error) {
|
|
if err == routing.ErrNotFound {
|
|
apierror.WriteError(w, &apierror.APIError{
|
|
Type: "not_found_error",
|
|
Message: "policy not found",
|
|
HTTPStatus: http.StatusNotFound,
|
|
})
|
|
return
|
|
}
|
|
apierror.WriteError(w, apierror.NewUpstreamError(err.Error()))
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
_ = json.NewEncoder(w).Encode(v)
|
|
}
|
|
|
|
func strQuote(s string) string { return `"` + s + `"` }
|
|
|
|
func parseIntParam(r *http.Request, key string, defaultVal int) int {
|
|
s := r.URL.Query().Get(key)
|
|
if s == "" {
|
|
return defaultVal
|
|
}
|
|
v, err := strconv.Atoi(s)
|
|
if err != nil || v < 0 {
|
|
return defaultVal
|
|
}
|
|
return v
|
|
}
|