216 lines
6.6 KiB
Go
216 lines
6.6 KiB
Go
package auditlog_test
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/veylant/ia-gateway/internal/auditlog"
|
|
)
|
|
|
|
// ─── MemLogger tests ──────────────────────────────────────────────────────────
|
|
|
|
func TestMemLogger_Log_And_Entries(t *testing.T) {
|
|
ml := auditlog.NewMemLogger()
|
|
ml.Log(auditlog.AuditEntry{RequestID: "req-1", TenantID: "t1"})
|
|
ml.Log(auditlog.AuditEntry{RequestID: "req-2", TenantID: "t1"})
|
|
|
|
entries := ml.Entries()
|
|
assert.Len(t, entries, 2)
|
|
assert.Equal(t, "req-1", entries[0].RequestID)
|
|
}
|
|
|
|
func TestMemLogger_Query_FiltersByTenant(t *testing.T) {
|
|
ml := auditlog.NewMemLogger()
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", RequestID: "a", SensitivityLevel: "low"})
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t2", RequestID: "b", SensitivityLevel: "high"})
|
|
|
|
result, err := ml.Query(context.Background(), auditlog.AuditQuery{TenantID: "t1", Limit: 10})
|
|
require.NoError(t, err)
|
|
assert.Len(t, result.Data, 1)
|
|
assert.Equal(t, "a", result.Data[0].RequestID)
|
|
}
|
|
|
|
func TestMemLogger_Query_FiltersByMinSensitivity(t *testing.T) {
|
|
ml := auditlog.NewMemLogger()
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", RequestID: "none", SensitivityLevel: "none"})
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", RequestID: "low", SensitivityLevel: "low"})
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", RequestID: "high", SensitivityLevel: "high"})
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", RequestID: "critical", SensitivityLevel: "critical"})
|
|
|
|
result, err := ml.Query(context.Background(), auditlog.AuditQuery{
|
|
TenantID: "t1", MinSensitivity: "high", Limit: 10,
|
|
})
|
|
require.NoError(t, err)
|
|
assert.Len(t, result.Data, 2)
|
|
}
|
|
|
|
func TestMemLogger_Query_Pagination(t *testing.T) {
|
|
ml := auditlog.NewMemLogger()
|
|
for i := 0; i < 10; i++ {
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1"})
|
|
}
|
|
|
|
result, err := ml.Query(context.Background(), auditlog.AuditQuery{
|
|
TenantID: "t1", Limit: 3, Offset: 5,
|
|
})
|
|
require.NoError(t, err)
|
|
assert.Len(t, result.Data, 3)
|
|
assert.Equal(t, 10, result.Total)
|
|
}
|
|
|
|
func TestMemLogger_QueryCosts_GroupByProvider(t *testing.T) {
|
|
ml := auditlog.NewMemLogger()
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", Provider: "openai", TokenTotal: 1000, CostUSD: 0.005})
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", Provider: "openai", TokenTotal: 500, CostUSD: 0.0025})
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t1", Provider: "ollama", TokenTotal: 2000, CostUSD: 0})
|
|
ml.Log(auditlog.AuditEntry{TenantID: "t2", Provider: "openai", TokenTotal: 1000, CostUSD: 0.005})
|
|
|
|
result, err := ml.QueryCosts(context.Background(), auditlog.CostQuery{
|
|
TenantID: "t1", GroupBy: "provider",
|
|
})
|
|
require.NoError(t, err)
|
|
assert.Len(t, result.Data, 2)
|
|
|
|
// Find openai summary
|
|
var openaiSummary auditlog.CostSummary
|
|
for _, s := range result.Data {
|
|
if s.Key == "openai" {
|
|
openaiSummary = s
|
|
}
|
|
}
|
|
assert.Equal(t, 1500, openaiSummary.TotalTokens)
|
|
assert.InDelta(t, 0.0075, openaiSummary.TotalCostUSD, 1e-9)
|
|
assert.Equal(t, 2, openaiSummary.RequestCount)
|
|
}
|
|
|
|
// ─── BatchWriter tests ────────────────────────────────────────────────────────
|
|
|
|
// mockFlusher records received batches for assertions.
|
|
type mockFlusher struct {
|
|
mu sync.Mutex
|
|
batches [][]auditlog.AuditEntry
|
|
total int
|
|
}
|
|
|
|
func (f *mockFlusher) InsertBatch(_ context.Context, entries []auditlog.AuditEntry) error {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
cp := make([]auditlog.AuditEntry, len(entries))
|
|
copy(cp, entries)
|
|
f.batches = append(f.batches, cp)
|
|
f.total += len(entries)
|
|
return nil
|
|
}
|
|
|
|
func (f *mockFlusher) Total() int {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
return f.total
|
|
}
|
|
|
|
func TestBatchWriter_FlushOnSize(t *testing.T) {
|
|
flusher := &mockFlusher{}
|
|
bw := auditlog.NewBatchWriterForTest(flusher, 5, 10*time.Second, zap.NewNop())
|
|
bw.Start()
|
|
defer bw.Stop()
|
|
|
|
for i := 0; i < 5; i++ {
|
|
bw.Log(auditlog.AuditEntry{RequestID: "r"})
|
|
}
|
|
|
|
// Wait for flush to happen (should be almost immediate on batch size).
|
|
require.Eventually(t, func() bool { return flusher.Total() == 5 },
|
|
2*time.Second, 10*time.Millisecond, "expected 5 entries flushed")
|
|
}
|
|
|
|
func TestBatchWriter_FlushOnTick(t *testing.T) {
|
|
flusher := &mockFlusher{}
|
|
bw := auditlog.NewBatchWriterForTest(flusher, 100, 50*time.Millisecond, zap.NewNop())
|
|
bw.Start()
|
|
defer bw.Stop()
|
|
|
|
// Send only 3 entries (below batch size).
|
|
for i := 0; i < 3; i++ {
|
|
bw.Log(auditlog.AuditEntry{RequestID: "r"})
|
|
}
|
|
|
|
require.Eventually(t, func() bool { return flusher.Total() == 3 },
|
|
500*time.Millisecond, 10*time.Millisecond, "expected tick flush")
|
|
}
|
|
|
|
func TestBatchWriter_Stop_DrainsPending(t *testing.T) {
|
|
flusher := &mockFlusher{}
|
|
bw := auditlog.NewBatchWriterForTest(flusher, 1000, 10*time.Second, zap.NewNop())
|
|
bw.Start()
|
|
|
|
for i := 0; i < 7; i++ {
|
|
bw.Log(auditlog.AuditEntry{RequestID: "r"})
|
|
}
|
|
bw.Stop()
|
|
|
|
assert.Equal(t, 7, flusher.Total(), "Stop should drain remaining entries")
|
|
}
|
|
|
|
func TestBatchWriter_OverflowDrops(t *testing.T) {
|
|
// Flusher that blocks forever to force channel fill.
|
|
var called atomic.Bool
|
|
blockFlusher := &blockingFlusher{called: &called}
|
|
|
|
// Very small channel to trigger overflow quickly.
|
|
bw := auditlog.NewBatchWriterForTest(blockFlusher, 1, 10*time.Millisecond, zap.NewNop())
|
|
bw.Start()
|
|
defer bw.Stop()
|
|
|
|
// First entry triggers flush (which blocks); additional entries should fill channel.
|
|
// With cap=10_000 we can't easily fill it in a unit test, so we just verify
|
|
// that Log() returns immediately (non-blocking) even when the flusher is slow.
|
|
start := time.Now()
|
|
for i := 0; i < 20; i++ {
|
|
bw.Log(auditlog.AuditEntry{RequestID: "r"})
|
|
}
|
|
assert.Less(t, time.Since(start), 200*time.Millisecond, "Log should be non-blocking")
|
|
}
|
|
|
|
// blockingFlusher blocks for 5 seconds to simulate a slow ClickHouse.
|
|
type blockingFlusher struct {
|
|
called *atomic.Bool
|
|
}
|
|
|
|
func (b *blockingFlusher) InsertBatch(ctx context.Context, _ []auditlog.AuditEntry) error {
|
|
b.called.Store(true)
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestBatchWriter_ConcurrentLog(t *testing.T) {
|
|
flusher := &mockFlusher{}
|
|
bw := auditlog.NewBatchWriterForTest(flusher, 50, 20*time.Millisecond, zap.NewNop())
|
|
bw.Start()
|
|
defer bw.Stop()
|
|
|
|
var wg sync.WaitGroup
|
|
for g := 0; g < 10; g++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for i := 0; i < 10; i++ {
|
|
bw.Log(auditlog.AuditEntry{RequestID: "r"})
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
require.Eventually(t, func() bool { return flusher.Total() == 100 },
|
|
2*time.Second, 10*time.Millisecond)
|
|
}
|