Files
felhom.eu/hub/internal/store/store.go
T
admin 41e313bf36 hub v0.1.7: Infrastructure backup endpoints for disaster recovery
Add infra-backup push/pull API for controller DR:
- POST /api/v1/infra-backup — controller pushes infrastructure snapshot
- GET /api/v1/infra-backup/{customer_id} — fresh controller pulls backup
- infra_backups SQLite table with per-customer snapshots
- Customer detail page shows infra backup status card
- README.md with full API docs and DR flow

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 13:17:12 +01:00

517 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package store
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "modernc.org/sqlite"
)
// Store handles SQLite persistence for customer reports.
type Store struct {
db *sql.DB
logger *log.Logger
}
// CustomerSummary holds the latest status for a customer (for dashboard).
type CustomerSummary struct {
CustomerID string
CustomerName string
ControllerVersion string
ReceivedAt time.Time
HealthStatus string
CPUPercent float64
MemoryPercent float64
ContainerTotal int
ContainerRunning int
BackupLastSnapshot *time.Time
ReportJSON string
// Computed fields (not stored)
TimeSinceReport time.Duration
DiskSummary string
}
// New creates a new store and initializes the schema.
func New(dbPath string, logger *log.Logger) (*Store, error) {
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
if err != nil {
return nil, fmt.Errorf("opening database: %w", err)
}
s := &Store{db: db, logger: logger}
if err := s.migrate(); err != nil {
db.Close()
return nil, fmt.Errorf("migrating database: %w", err)
}
return s, nil
}
func (s *Store) migrate() error {
_, err := s.db.Exec(`
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
received_at DATETIME NOT NULL DEFAULT (datetime('now')),
report_json TEXT NOT NULL,
health_status TEXT,
cpu_percent REAL,
memory_percent REAL,
container_total INTEGER,
container_running INTEGER,
backup_last_snapshot DATETIME,
controller_version TEXT
);
CREATE INDEX IF NOT EXISTS idx_reports_customer
ON reports(customer_id, received_at DESC);
CREATE TABLE IF NOT EXISTS customer_notifications (
customer_id TEXT PRIMARY KEY,
email TEXT NOT NULL DEFAULT '',
enabled_events TEXT NOT NULL DEFAULT '[]',
created_at DATETIME DEFAULT (datetime('now')),
updated_at DATETIME DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS notification_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
event_type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
error_message TEXT,
created_at DATETIME DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_notification_log_customer
ON notification_log(customer_id, created_at DESC);
CREATE TABLE IF NOT EXISTS infra_backups (
customer_id TEXT PRIMARY KEY,
backup_json TEXT NOT NULL,
updated_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
`)
return err
}
// NotificationPrefs holds per-customer notification preferences.
type NotificationPrefs struct {
CustomerID string
Email string
EnabledEvents []string
}
// GetNotificationPrefs returns notification preferences for a customer.
func (s *Store) GetNotificationPrefs(customerID string) (*NotificationPrefs, error) {
var email, eventsJSON string
err := s.db.QueryRow(
"SELECT email, enabled_events FROM customer_notifications WHERE customer_id = ?",
customerID,
).Scan(&email, &eventsJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
var events []string
json.Unmarshal([]byte(eventsJSON), &events)
return &NotificationPrefs{
CustomerID: customerID,
Email: email,
EnabledEvents: events,
}, nil
}
// SaveNotificationPrefs creates or updates notification preferences for a customer.
func (s *Store) SaveNotificationPrefs(customerID, email string, enabledEvents []string) error {
eventsJSON, _ := json.Marshal(enabledEvents)
_, err := s.db.Exec(`
INSERT INTO customer_notifications (customer_id, email, enabled_events, updated_at)
VALUES (?, ?, ?, datetime('now'))
ON CONFLICT(customer_id) DO UPDATE SET
email = excluded.email,
enabled_events = excluded.enabled_events,
updated_at = datetime('now')`,
customerID, email, string(eventsJSON),
)
return err
}
// LogNotification records a notification attempt.
func (s *Store) LogNotification(customerID, eventType, severity, message, status, errorMsg string) error {
_, err := s.db.Exec(`
INSERT INTO notification_log (customer_id, event_type, severity, message, status, error_message)
VALUES (?, ?, ?, ?, ?, ?)`,
customerID, eventType, severity, message, status, errorMsg,
)
return err
}
// NotificationLogEntry represents a single notification log record.
type NotificationLogEntry struct {
EventType string
Severity string
Message string
Status string // "sent", "skipped", "failed"
ErrorMessage string
CreatedAt time.Time
}
// GetRecentNotifications returns the most recent notification log entries for a customer.
func (s *Store) GetRecentNotifications(customerID string, limit int) ([]NotificationLogEntry, error) {
rows, err := s.db.Query(`
SELECT event_type, severity, message, status, COALESCE(error_message, ''), created_at
FROM notification_log
WHERE customer_id = ?
ORDER BY created_at DESC
LIMIT ?`, customerID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var entries []NotificationLogEntry
for rows.Next() {
var e NotificationLogEntry
var createdAt, errorMsg string
if err := rows.Scan(&e.EventType, &e.Severity, &e.Message, &e.Status, &errorMsg, &createdAt); err != nil {
return nil, err
}
e.CreatedAt = parseSQLiteTime(createdAt)
e.ErrorMessage = errorMsg
entries = append(entries, e)
}
return entries, rows.Err()
}
// SaveReport stores a new report. The reportJSON should be the raw JSON payload.
func (s *Store) SaveReport(customerID string, reportJSON []byte) error {
// Parse denormalized fields from the JSON
var parsed struct {
ControllerVersion string `json:"controller_version"`
System struct {
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
} `json:"system"`
Containers struct {
Total int `json:"total"`
Running int `json:"running"`
} `json:"containers"`
Backup struct {
LastSnapshot *time.Time `json:"last_snapshot"`
} `json:"backup"`
Health struct {
Status string `json:"status"`
} `json:"health"`
}
json.Unmarshal(reportJSON, &parsed)
var backupSnapshot *string
if parsed.Backup.LastSnapshot != nil {
t := parsed.Backup.LastSnapshot.Format(time.RFC3339)
backupSnapshot = &t
}
_, err := s.db.Exec(`
INSERT INTO reports (customer_id, report_json, health_status, cpu_percent,
memory_percent, container_total, container_running,
backup_last_snapshot, controller_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
customerID, string(reportJSON),
parsed.Health.Status, parsed.System.CPUPercent,
parsed.System.MemoryPercent, parsed.Containers.Total,
parsed.Containers.Running, backupSnapshot,
parsed.ControllerVersion,
)
return err
}
// GetCustomers returns the latest report summary for each customer.
func (s *Store) GetCustomers() ([]CustomerSummary, error) {
rows, err := s.db.Query(`
SELECT r.customer_id, r.received_at, r.report_json,
r.health_status, r.cpu_percent, r.memory_percent,
r.container_total, r.container_running,
r.backup_last_snapshot, r.controller_version
FROM reports r
INNER JOIN (
SELECT customer_id, MAX(received_at) as max_time
FROM reports
GROUP BY customer_id
) latest ON r.customer_id = latest.customer_id
AND r.received_at = latest.max_time
ORDER BY r.customer_id`)
if err != nil {
return nil, err
}
defer rows.Close()
var customers []CustomerSummary
for rows.Next() {
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion); err != nil {
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
// Parse customer_name from JSON
var report struct {
CustomerName string `json:"customer_name"`
}
json.Unmarshal([]byte(c.ReportJSON), &report)
c.CustomerName = report.CustomerName
// Parse disk summary
c.DiskSummary = parseDiskSummary(c.ReportJSON)
customers = append(customers, c)
}
return customers, rows.Err()
}
// GetCustomer returns the latest report for a specific customer.
func (s *Store) GetCustomer(customerID string) (*CustomerSummary, error) {
row := s.db.QueryRow(`
SELECT customer_id, received_at, report_json,
health_status, cpu_percent, memory_percent,
container_total, container_running,
backup_last_snapshot, controller_version
FROM reports
WHERE customer_id = ?
ORDER BY received_at DESC
LIMIT 1`, customerID)
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
if err := row.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
var report struct {
CustomerName string `json:"customer_name"`
}
json.Unmarshal([]byte(c.ReportJSON), &report)
c.CustomerName = report.CustomerName
c.DiskSummary = parseDiskSummary(c.ReportJSON)
return &c, nil
}
// GetCustomerHistory returns report history for a customer.
func (s *Store) GetCustomerHistory(customerID string, since time.Duration) ([]CustomerSummary, error) {
cutoff := time.Now().Add(-since).Format("2006-01-02 15:04:05")
rows, err := s.db.Query(`
SELECT customer_id, received_at, report_json,
health_status, cpu_percent, memory_percent,
container_total, container_running,
backup_last_snapshot, controller_version
FROM reports
WHERE customer_id = ? AND received_at >= ?
ORDER BY received_at DESC`, customerID, cutoff)
if err != nil {
return nil, err
}
defer rows.Close()
var history []CustomerSummary
for rows.Next() {
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion); err != nil {
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
history = append(history, c)
}
return history, rows.Err()
}
// SaveInfraBackup upserts the infrastructure backup for a customer.
func (s *Store) SaveInfraBackup(customerID string, backupJSON []byte) error {
_, err := s.db.Exec(`
INSERT INTO infra_backups (customer_id, backup_json, updated_at)
VALUES (?, ?, datetime('now'))
ON CONFLICT(customer_id) DO UPDATE SET
backup_json = excluded.backup_json,
updated_at = datetime('now')`,
customerID, string(backupJSON),
)
return err
}
// GetInfraBackup returns the raw infra backup JSON for a customer, or nil if not found.
func (s *Store) GetInfraBackup(customerID string) ([]byte, error) {
var data string
err := s.db.QueryRow(
"SELECT backup_json FROM infra_backups WHERE customer_id = ?",
customerID,
).Scan(&data)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return []byte(data), nil
}
// InfraBackupMeta holds summary info for the dashboard (avoids parsing full JSON).
type InfraBackupMeta struct {
UpdatedAt time.Time
StackCount int
DiskCount int
}
// GetInfraBackupMeta returns summary metadata for a customer's infra backup.
func (s *Store) GetInfraBackupMeta(customerID string) (*InfraBackupMeta, error) {
var backupJSON, updatedAt string
err := s.db.QueryRow(
"SELECT backup_json, updated_at FROM infra_backups WHERE customer_id = ?",
customerID,
).Scan(&backupJSON, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
meta := &InfraBackupMeta{
UpdatedAt: parseSQLiteTime(updatedAt),
}
// Parse just the fields we need
var parsed struct {
DeployedStacks []json.RawMessage `json:"deployed_stacks"`
DiskLayout struct {
Mounts []json.RawMessage `json:"mounts"`
} `json:"disk_layout"`
}
if json.Unmarshal([]byte(backupJSON), &parsed) == nil {
meta.StackCount = len(parsed.DeployedStacks)
meta.DiskCount = len(parsed.DiskLayout.Mounts)
}
return meta, nil
}
// Prune deletes reports older than the given number of days.
func (s *Store) Prune(maxDays int) (int64, error) {
cutoff := time.Now().AddDate(0, 0, -maxDays).Format("2006-01-02 15:04:05")
res, err := s.db.Exec("DELETE FROM reports WHERE received_at < ?", cutoff)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// Close closes the database connection.
func (s *Store) Close() error {
return s.db.Close()
}
// parseSQLiteTime tries multiple formats that modernc.org/sqlite may return.
func parseSQLiteTime(s string) time.Time {
formats := []string{
"2006-01-02 15:04:05", // SQLite datetime('now')
"2006-01-02T15:04:05Z", // RFC3339 without fractional
time.RFC3339, // 2006-01-02T15:04:05Z07:00
time.RFC3339Nano, // with fractional seconds
"2006-01-02 15:04:05+00:00", // with explicit UTC offset
"2006-01-02 15:04:05.999999999", // with fractional, no TZ
}
for _, f := range formats {
if t, err := time.Parse(f, s); err == nil {
return t
}
}
// Last resort: if string is non-empty, log it for debugging
if s != "" {
log.Printf("[WARN] Could not parse timestamp: %q", s)
}
return time.Time{} // zero time
}
func parseDiskSummary(reportJSON string) string {
var report struct {
Storage []struct {
Mount string `json:"mount"`
Percent float64 `json:"percent"`
} `json:"storage"`
}
json.Unmarshal([]byte(reportJSON), &report)
var parts []string
for _, s := range report.Storage {
parts = append(parts, fmt.Sprintf("%.0f%%", s.Percent))
}
if len(parts) == 0 {
return ""
}
result := parts[0]
for _, p := range parts[1:] {
result += "/" + p
}
return result
}