120 lines
3.0 KiB
Go
120 lines
3.0 KiB
Go
package auditlog
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Flusher is implemented by storage backends (e.g. ClickHouseLogger).
|
|
type Flusher interface {
|
|
InsertBatch(ctx context.Context, entries []AuditEntry) error
|
|
}
|
|
|
|
// BatchWriter wraps a Flusher with an async buffered channel.
|
|
// It flushes when batchSize entries are accumulated OR flushInterval elapses,
|
|
// whichever comes first. On channel overflow it drops the entry and logs a warning.
|
|
type BatchWriter struct {
|
|
ch chan AuditEntry
|
|
batchSize int
|
|
flushInterval time.Duration
|
|
flusher Flusher
|
|
logger *zap.Logger
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewBatchWriter creates a production BatchWriter (cap=10 000, size=100, interval=1s).
|
|
func NewBatchWriter(flusher Flusher, logger *zap.Logger) *BatchWriter {
|
|
return NewBatchWriterForTest(flusher, 100, time.Second, logger)
|
|
}
|
|
|
|
// NewBatchWriterForTest creates a BatchWriter with configurable parameters for unit tests.
|
|
func NewBatchWriterForTest(flusher Flusher, batchSize int, flushInterval time.Duration, logger *zap.Logger) *BatchWriter {
|
|
return &BatchWriter{
|
|
ch: make(chan AuditEntry, 10_000),
|
|
batchSize: batchSize,
|
|
flushInterval: flushInterval,
|
|
flusher: flusher,
|
|
logger: logger,
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Log enqueues an entry. Non-blocking: drops the entry if the channel is full.
|
|
func (bw *BatchWriter) Log(entry AuditEntry) {
|
|
select {
|
|
case bw.ch <- entry:
|
|
default:
|
|
bw.logger.Warn("audit log channel full — entry dropped",
|
|
zap.String("request_id", entry.RequestID))
|
|
}
|
|
}
|
|
|
|
// Start launches the background flush goroutine.
|
|
func (bw *BatchWriter) Start() {
|
|
bw.wg.Add(1)
|
|
go bw.run()
|
|
}
|
|
|
|
// Stop signals the flush goroutine to drain remaining entries and exit.
|
|
func (bw *BatchWriter) Stop() {
|
|
close(bw.done)
|
|
bw.wg.Wait()
|
|
}
|
|
|
|
func (bw *BatchWriter) run() {
|
|
defer bw.wg.Done()
|
|
ticker := time.NewTicker(bw.flushInterval)
|
|
defer ticker.Stop()
|
|
|
|
batch := make([]AuditEntry, 0, bw.batchSize)
|
|
|
|
flush := func() {
|
|
if len(batch) == 0 {
|
|
return
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := bw.flusher.InsertBatch(ctx, batch); err != nil {
|
|
bw.logger.Error("audit log batch insert failed", zap.Error(err), zap.Int("count", len(batch)))
|
|
}
|
|
batch = batch[:0]
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case entry := <-bw.ch:
|
|
batch = append(batch, entry)
|
|
if len(batch) >= bw.batchSize {
|
|
flush()
|
|
}
|
|
case <-ticker.C:
|
|
flush()
|
|
case <-bw.done:
|
|
// Drain remaining entries from channel.
|
|
for {
|
|
select {
|
|
case entry := <-bw.ch:
|
|
batch = append(batch, entry)
|
|
default:
|
|
flush()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Query is not supported on BatchWriter; use the underlying Logger (e.g. ClickHouseLogger).
|
|
func (bw *BatchWriter) Query(_ context.Context, _ AuditQuery) (*AuditResult, error) {
|
|
return &AuditResult{}, nil
|
|
}
|
|
|
|
// QueryCosts is not supported on BatchWriter.
|
|
func (bw *BatchWriter) QueryCosts(_ context.Context, _ CostQuery) (*CostResult, error) {
|
|
return &CostResult{}, nil
|
|
}
|