'use strict'; const express = require('express'); const { Transform } = require('stream'); const app = express(); const PORT = process.env.PORT || 3200; const LOKI_URL = process.env.LOKI_URL || 'http://loki:3100'; const API_KEY = process.env.LOG_EXPORTER_API_KEY; // ─── Helpers ───────────────────────────────────────────────────────────────── /** * Simple API key middleware (optional, enabled when LOG_EXPORTER_API_KEY is set). */ function authMiddleware(req, res, next) { if (!API_KEY) return next(); const key = req.headers['x-api-key'] || req.query.apiKey; if (key !== API_KEY) { return res.status(401).json({ error: 'Unauthorized' }); } next(); } /** * Build a Loki LogQL query from request params. * Supports: service, level, search (free text filter) */ function buildLogQLQuery({ service, level, search }) { const labelFilters = []; if (service && service !== 'all') { const services = service.split(',').map((s) => s.trim()).filter(Boolean); if (services.length === 1) { labelFilters.push(`service="${services[0]}"`); } else { labelFilters.push(`service=~"${services.join('|')}"`); } } if (level && level !== 'all') { const levels = level.split(',').map((l) => l.trim()).filter(Boolean); if (levels.length === 1) { labelFilters.push(`level="${levels[0]}"`); } else { labelFilters.push(`level=~"${levels.join('|')}"`); } } const streamSelector = labelFilters.length > 0 ? `{${labelFilters.join(', ')}}` : `{service=~".+"}`; const lineFilters = search ? ` |= \`${search}\`` : ''; return `${streamSelector}${lineFilters}`; } /** * Query Loki's query_range endpoint and return flattened log entries. */ async function queryLoki({ query, start, end, limit = 5000 }) { const params = new URLSearchParams({ query, start: String(start), end: String(end), limit: String(Math.min(limit, 5000)), direction: 'BACKWARD', }); const url = `${LOKI_URL}/loki/api/v1/query_range?${params}`; const response = await fetch(url, { headers: { 'Accept': 'application/json' }, signal: AbortSignal.timeout(30000), }); if (!response.ok) { const body = await response.text(); throw new Error(`Loki query failed (${response.status}): ${body}`); } const data = await response.json(); if (data.status !== 'success') { throw new Error(`Loki returned status: ${data.status}`); } // Flatten streams → individual log entries const entries = []; for (const stream of data.data.result || []) { const labels = stream.stream || {}; for (const [tsNano, line] of stream.values || []) { let parsed = {}; try { parsed = JSON.parse(line); } catch { parsed = { msg: line }; } entries.push({ timestamp: new Date(Math.floor(Number(tsNano) / 1e6)).toISOString(), service: labels.service || labels.container || 'unknown', level: labels.level || parsed.level || 'info', context: labels.context || parsed.context || '', message: parsed.msg || parsed.message || line, reqId: parsed.reqId || '', req_method: parsed.req?.method || '', req_url: parsed.req?.url || '', res_status: parsed.res?.statusCode || '', response_time_ms: parsed.responseTime || '', error: parsed.err?.message || '', raw: line, }); } } // Sort by timestamp ascending entries.sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp)); return entries; } /** * Convert array of objects to CSV string. */ function toCSV(entries) { if (entries.length === 0) return ''; const headers = [ 'timestamp', 'service', 'level', 'context', 'message', 'reqId', 'req_method', 'req_url', 'res_status', 'response_time_ms', 'error', ]; const escape = (val) => { if (val === null || val === undefined) return ''; const str = String(val); if (str.includes(',') || str.includes('"') || str.includes('\n')) { return `"${str.replace(/"/g, '""')}"`; } return str; }; const rows = [headers.join(',')]; for (const entry of entries) { rows.push(headers.map((h) => escape(entry[h])).join(',')); } return rows.join('\n'); } // ─── Routes ────────────────────────────────────────────────────────────────── app.use(express.json()); // Rate limiting (basic — 60 requests/min per IP) const requestCounts = new Map(); setInterval(() => requestCounts.clear(), 60000); app.use((req, res, next) => { const ip = req.ip; const count = (requestCounts.get(ip) || 0) + 1; requestCounts.set(ip, count); if (count > 60) { return res.status(429).json({ error: 'Too Many Requests' }); } next(); }); // CORS for Grafana / frontend app.use((req, res, next) => { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type, x-api-key'); res.setHeader('Access-Control-Allow-Methods', 'GET, OPTIONS'); if (req.method === 'OPTIONS') return res.sendStatus(204); next(); }); /** * GET /health */ app.get('/health', (req, res) => { res.json({ status: 'ok', loki: LOKI_URL }); }); /** * GET /api/logs/services * Returns the list of services currently emitting logs. */ app.get('/api/logs/services', authMiddleware, async (req, res) => { try { const response = await fetch(`${LOKI_URL}/loki/api/v1/label/service/values`, { signal: AbortSignal.timeout(5000), }); if (!response.ok) throw new Error(`Loki error: ${response.status}`); const data = await response.json(); res.json({ services: data.data || [] }); } catch (err) { res.status(500).json({ error: err.message }); } }); /** * GET /api/logs/labels * Returns all available label names. */ app.get('/api/logs/labels', authMiddleware, async (req, res) => { try { const response = await fetch(`${LOKI_URL}/loki/api/v1/labels`, { signal: AbortSignal.timeout(5000), }); if (!response.ok) throw new Error(`Loki error: ${response.status}`); const data = await response.json(); res.json({ labels: data.data || [] }); } catch (err) { res.status(500).json({ error: err.message }); } }); /** * GET /api/logs/export * * Query params: * - start : ISO date or Unix timestamp in ns (default: 1h ago) * - end : ISO date or Unix timestamp in ns (default: now) * - service : comma-separated service names (default: all) * - level : comma-separated levels: error,warn,info,debug (default: all) * - search : free-text search string * - limit : max number of log lines (default: 5000, max: 5000) * - format : "json" | "csv" (default: json) * * Examples: * GET /api/logs/export?service=backend&level=error&format=csv * GET /api/logs/export?start=2024-01-01T00:00:00Z&end=2024-01-02T00:00:00Z&format=json */ app.get('/api/logs/export', authMiddleware, async (req, res) => { try { const now = Date.now(); const ONE_HOUR_NS = 3600 * 1e9; const nowNs = BigInt(now) * 1000000n; const oneHourAgoNs = nowNs - BigInt(ONE_HOUR_NS); // Parse time range const parseTime = (val, defaultNs) => { if (!val) return defaultNs; // Already in nanoseconds (large number) if (/^\d{18,}$/.test(val)) return BigInt(val); // Unix timestamp in seconds or ms const n = Number(val); if (!isNaN(n)) { // seconds → ns if (n < 1e12) return BigInt(Math.floor(n * 1e9)); // ms → ns if (n < 1e15) return BigInt(n) * 1000000n; return BigInt(n); } // ISO date string const ms = Date.parse(val); if (isNaN(ms)) throw new Error(`Invalid time value: ${val}`); return BigInt(ms) * 1000000n; }; const startNs = parseTime(req.query.start, oneHourAgoNs); const endNs = parseTime(req.query.end, nowNs); if (endNs <= startNs) { return res.status(400).json({ error: '"end" must be after "start"' }); } const format = (req.query.format || 'json').toLowerCase(); if (!['json', 'csv'].includes(format)) { return res.status(400).json({ error: 'format must be "json" or "csv"' }); } const limit = Math.min(parseInt(req.query.limit, 10) || 5000, 5000); const query = buildLogQLQuery({ service: req.query.service, level: req.query.level, search: req.query.search, }); const entries = await queryLoki({ query, start: startNs.toString(), end: endNs.toString(), limit, }); if (format === 'csv') { const csv = toCSV(entries); const filename = `xpeditis-logs-${new Date().toISOString().slice(0, 10)}.csv`; res.setHeader('Content-Type', 'text/csv; charset=utf-8'); res.setHeader('Content-Disposition', `attachment; filename="${filename}"`); return res.send(csv); } // JSON response res.json({ total: entries.length, query, range: { from: new Date(Number(startNs / 1000000n)).toISOString(), to: new Date(Number(endNs / 1000000n)).toISOString(), }, logs: entries, }); } catch (err) { console.error('[log-exporter] Export error:', err.message); res.status(500).json({ error: err.message }); } }); // ─── Start ──────────────────────────────────────────────────────────────────── app.listen(PORT, () => { console.log(`[log-exporter] Listening on port ${PORT}`); console.log(`[log-exporter] Loki URL: ${LOKI_URL}`); console.log(`[log-exporter] API key protection: ${API_KEY ? 'enabled' : 'disabled'}`); });