package metrics import ( "database/sql" "fmt" "log" "time" _ "modernc.org/sqlite" ) // MetricsStore manages SQLite storage for system and container metrics. type MetricsStore struct { db *sql.DB logger *log.Logger } // NewMetricsStore opens (or creates) a SQLite database at dbPath and initializes the schema. func NewMetricsStore(dbPath string, logger *log.Logger) (*MetricsStore, error) { db, err := sql.Open("sqlite", dbPath) if err != nil { return nil, fmt.Errorf("open sqlite: %w", err) } // Set pragmas for performance and concurrency pragmas := []string{ "PRAGMA journal_mode=WAL", "PRAGMA synchronous=NORMAL", "PRAGMA busy_timeout=5000", } for _, p := range pragmas { if _, err := db.Exec(p); err != nil { db.Close() return nil, fmt.Errorf("pragma %q: %w", p, err) } } // Create tables schema := []string{ `CREATE TABLE IF NOT EXISTS system_metrics ( ts INTEGER NOT NULL, cpu_percent REAL NOT NULL, mem_used_mb INTEGER NOT NULL, mem_total_mb INTEGER NOT NULL, temp_celsius REAL, load_avg_1 REAL, load_avg_5 REAL, load_avg_15 REAL, disk_used_gb REAL, disk_total_gb REAL, hdd_used_gb REAL, hdd_total_gb REAL )`, `CREATE INDEX IF NOT EXISTS idx_system_ts ON system_metrics(ts)`, `CREATE TABLE IF NOT EXISTS container_metrics ( ts INTEGER NOT NULL, container_name TEXT NOT NULL, cpu_percent REAL NOT NULL, mem_usage_mb REAL NOT NULL, mem_limit_mb REAL, net_rx_bytes INTEGER, net_tx_bytes INTEGER, block_read_bytes INTEGER, block_write_bytes INTEGER )`, `CREATE INDEX IF NOT EXISTS idx_container_ts ON container_metrics(ts)`, `CREATE INDEX IF NOT EXISTS idx_container_name ON container_metrics(container_name, ts)`, } for _, s := range schema { if _, err := db.Exec(s); err != nil { db.Close() return nil, fmt.Errorf("schema: %w", err) } } return &MetricsStore{db: db, logger: logger}, nil } // Close closes the underlying database connection. func (s *MetricsStore) Close() error { return s.db.Close() } // InsertSystemMetrics inserts a single system metrics sample. func (s *MetricsStore) InsertSystemMetrics(m SystemSample) error { _, err := s.db.Exec( `INSERT INTO system_metrics (ts, cpu_percent, mem_used_mb, mem_total_mb, temp_celsius, load_avg_1, load_avg_5, load_avg_15, disk_used_gb, disk_total_gb, hdd_used_gb, hdd_total_gb) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, m.Timestamp, m.CPUPercent, m.MemUsedMB, m.MemTotalMB, m.TempCelsius, m.LoadAvg1, m.LoadAvg5, m.LoadAvg15, m.DiskUsedGB, m.DiskTotalGB, m.HDDUsedGB, m.HDDTotalGB, ) return err } // InsertContainerMetrics inserts a batch of container metrics samples. func (s *MetricsStore) InsertContainerMetrics(samples []ContainerSample) error { if len(samples) == 0 { return nil } tx, err := s.db.Begin() if err != nil { return err } defer tx.Rollback() stmt, err := tx.Prepare( `INSERT INTO container_metrics (ts, container_name, cpu_percent, mem_usage_mb, mem_limit_mb, net_rx_bytes, net_tx_bytes, block_read_bytes, block_write_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, ) if err != nil { return err } defer stmt.Close() for _, c := range samples { if _, err := stmt.Exec(c.Timestamp, c.ContainerName, c.CPUPercent, c.MemUsageMB, c.MemLimitMB, c.NetRxBytes, c.NetTxBytes, c.BlockReadBytes, c.BlockWriteBytes); err != nil { return err } } return tx.Commit() } // QuerySystemMetrics returns downsampled system metrics between from and to. // resolution controls the approximate number of data points returned. func (s *MetricsStore) QuerySystemMetrics(from, to time.Time, resolution int) ([]SystemSample, error) { fromTS := from.Unix() toTS := to.Unix() if resolution <= 0 { resolution = 200 } rangeSeconds := toTS - fromTS if rangeSeconds <= 0 { return nil, nil } bucketSeconds := rangeSeconds / int64(resolution) if bucketSeconds < 1 { bucketSeconds = 1 } rows, err := s.db.Query(` SELECT (ts / ?) * ? AS bucket_ts, AVG(cpu_percent), AVG(mem_used_mb), AVG(mem_total_mb), AVG(temp_celsius), AVG(load_avg_1), AVG(load_avg_5), AVG(load_avg_15), AVG(disk_used_gb), AVG(disk_total_gb), AVG(hdd_used_gb), AVG(hdd_total_gb) FROM system_metrics WHERE ts >= ? AND ts <= ? GROUP BY ts / ? ORDER BY bucket_ts ASC`, bucketSeconds, bucketSeconds, fromTS, toTS, bucketSeconds, ) if err != nil { return nil, err } defer rows.Close() var result []SystemSample for rows.Next() { var m SystemSample var tempC, load1, load5, load15, diskUsed, diskTotal, hddUsed, hddTotal sql.NullFloat64 if err := rows.Scan(&m.Timestamp, &m.CPUPercent, &m.MemUsedMB, &m.MemTotalMB, &tempC, &load1, &load5, &load15, &diskUsed, &diskTotal, &hddUsed, &hddTotal); err != nil { return nil, err } if tempC.Valid { m.TempCelsius = tempC.Float64 } if load1.Valid { m.LoadAvg1 = load1.Float64 } if load5.Valid { m.LoadAvg5 = load5.Float64 } if load15.Valid { m.LoadAvg15 = load15.Float64 } if diskUsed.Valid { m.DiskUsedGB = diskUsed.Float64 } if diskTotal.Valid { m.DiskTotalGB = diskTotal.Float64 } if hddUsed.Valid { m.HDDUsedGB = hddUsed.Float64 } if hddTotal.Valid { m.HDDTotalGB = hddTotal.Float64 } result = append(result, m) } return result, rows.Err() } // QueryContainerMetrics returns downsampled metrics for a specific container. func (s *MetricsStore) QueryContainerMetrics(name string, from, to time.Time, resolution int) ([]ContainerSample, error) { fromTS := from.Unix() toTS := to.Unix() if resolution <= 0 { resolution = 200 } rangeSeconds := toTS - fromTS if rangeSeconds <= 0 { return nil, nil } bucketSeconds := rangeSeconds / int64(resolution) if bucketSeconds < 1 { bucketSeconds = 1 } rows, err := s.db.Query(` SELECT (ts / ?) * ? AS bucket_ts, container_name, AVG(cpu_percent), AVG(mem_usage_mb), AVG(mem_limit_mb), AVG(net_rx_bytes), AVG(net_tx_bytes), AVG(block_read_bytes), AVG(block_write_bytes) FROM container_metrics WHERE container_name = ? AND ts >= ? AND ts <= ? GROUP BY ts / ? ORDER BY bucket_ts ASC`, bucketSeconds, bucketSeconds, name, fromTS, toTS, bucketSeconds, ) if err != nil { return nil, err } defer rows.Close() var result []ContainerSample for rows.Next() { var c ContainerSample var memLimit, netRx, netTx, blkRead, blkWrite sql.NullFloat64 if err := rows.Scan(&c.Timestamp, &c.ContainerName, &c.CPUPercent, &c.MemUsageMB, &memLimit, &netRx, &netTx, &blkRead, &blkWrite); err != nil { return nil, err } if memLimit.Valid { c.MemLimitMB = memLimit.Float64 } if netRx.Valid { c.NetRxBytes = int64(netRx.Float64) } if netTx.Valid { c.NetTxBytes = int64(netTx.Float64) } if blkRead.Valid { c.BlockReadBytes = int64(blkRead.Float64) } if blkWrite.Valid { c.BlockWriteBytes = int64(blkWrite.Float64) } result = append(result, c) } return result, rows.Err() } // QueryContainerSummary returns the latest metrics for all containers. func (s *MetricsStore) QueryContainerSummary() ([]ContainerCurrentStats, error) { rows, err := s.db.Query(` SELECT container_name, cpu_percent, mem_usage_mb, COALESCE(mem_limit_mb, 0) FROM container_metrics WHERE ts = (SELECT MAX(ts) FROM container_metrics) ORDER BY cpu_percent DESC`) if err != nil { return nil, err } defer rows.Close() var result []ContainerCurrentStats for rows.Next() { var c ContainerCurrentStats if err := rows.Scan(&c.ContainerName, &c.CPUPercent, &c.MemUsageMB, &c.MemLimitMB); err != nil { return nil, err } result = append(result, c) } return result, rows.Err() } // Prune deletes rows older than the given duration. Returns the number of deleted rows. func (s *MetricsStore) Prune(olderThan time.Duration) (int64, error) { cutoff := time.Now().Add(-olderThan).Unix() var total int64 res, err := s.db.Exec("DELETE FROM system_metrics WHERE ts < ?", cutoff) if err != nil { return 0, err } n, _ := res.RowsAffected() total += n res, err = s.db.Exec("DELETE FROM container_metrics WHERE ts < ?", cutoff) if err != nil { return total, err } n, _ = res.RowsAffected() total += n return total, nil }