package store import ( "database/sql" "encoding/json" "fmt" "log" "time" _ "modernc.org/sqlite" ) // Store handles SQLite persistence for customer reports. type Store struct { db *sql.DB logger *log.Logger } // CustomerSummary holds the latest status for a customer (for dashboard). type CustomerSummary struct { CustomerID string CustomerName string ControllerVersion string ReceivedAt time.Time HealthStatus string CPUPercent float64 MemoryPercent float64 ContainerTotal int ContainerRunning int BackupLastSnapshot *time.Time ReportJSON string ControllerURL string // Computed fields (not stored) TimeSinceReport time.Duration DiskSummary string } // New creates a new store and initializes the schema. func New(dbPath string, logger *log.Logger) (*Store, error) { db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("opening database: %w", err) } s := &Store{db: db, logger: logger} if err := s.migrate(); err != nil { db.Close() return nil, fmt.Errorf("migrating database: %w", err) } return s, nil } func (s *Store) migrate() error { _, err := s.db.Exec(` CREATE TABLE IF NOT EXISTS reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id TEXT NOT NULL, received_at DATETIME NOT NULL DEFAULT (datetime('now')), report_json TEXT NOT NULL, health_status TEXT, cpu_percent REAL, memory_percent REAL, container_total INTEGER, container_running INTEGER, backup_last_snapshot DATETIME, controller_version TEXT ); CREATE INDEX IF NOT EXISTS idx_reports_customer ON reports(customer_id, received_at DESC); CREATE TABLE IF NOT EXISTS customer_notifications ( customer_id TEXT PRIMARY KEY, email TEXT NOT NULL DEFAULT '', enabled_events TEXT NOT NULL DEFAULT '[]', created_at DATETIME DEFAULT (datetime('now')), updated_at DATETIME DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS notification_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id TEXT NOT NULL, event_type TEXT NOT NULL, severity TEXT NOT NULL, message TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', error_message TEXT, created_at DATETIME DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_notification_log_customer ON notification_log(customer_id, created_at DESC); CREATE TABLE IF NOT EXISTS infra_backups ( customer_id TEXT PRIMARY KEY, backup_json TEXT NOT NULL, updated_at DATETIME NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS customer_configs ( customer_id TEXT PRIMARY KEY, customer_name TEXT NOT NULL DEFAULT '', domain TEXT NOT NULL DEFAULT '', email TEXT NOT NULL DEFAULT '', retrieval_password TEXT NOT NULL, api_key TEXT NOT NULL, config_json TEXT NOT NULL DEFAULT '{}', created_at DATETIME NOT NULL DEFAULT (datetime('now')), updated_at DATETIME NOT NULL DEFAULT (datetime('now')) ); `) if err != nil { return err } // v0.1.8: add controller_url column (idempotent — ignore error if already exists) s.db.Exec("ALTER TABLE reports ADD COLUMN controller_url TEXT") // v0.2.1: add status column to customer_configs (idempotent) s.db.Exec("ALTER TABLE customer_configs ADD COLUMN status TEXT NOT NULL DEFAULT 'active'") // v0.3.0: events table for hub-native monitoring _, err = s.db.Exec(` CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id TEXT NOT NULL, event_type TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'info', message TEXT NOT NULL DEFAULT '', details_json TEXT NOT NULL DEFAULT '{}', source TEXT NOT NULL DEFAULT 'controller', created_at DATETIME NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_events_customer_created ON events(customer_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type, created_at DESC); `) if err != nil { return err } // v0.3.0: add cooldown_hours to customer_notifications (idempotent) s.db.Exec("ALTER TABLE customer_notifications ADD COLUMN cooldown_hours INTEGER DEFAULT 6") // v0.3.0: add channel column to notification_log (idempotent) s.db.Exec("ALTER TABLE notification_log ADD COLUMN channel TEXT NOT NULL DEFAULT 'customer'") // v0.4.0: app telemetry tables _, err = s.db.Exec(` CREATE TABLE IF NOT EXISTS app_telemetry ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id TEXT NOT NULL, app_name TEXT NOT NULL, display_name TEXT NOT NULL DEFAULT '', reported_at DATETIME NOT NULL, memory_current_mb REAL DEFAULT 0, memory_avg_mb REAL DEFAULT 0, memory_peak_mb REAL DEFAULT 0, cpu_avg_percent REAL DEFAULT 0, catalog_estimate TEXT DEFAULT '', catalog_limit TEXT DEFAULT '', log_errors INTEGER DEFAULT 0, log_warnings INTEGER DEFAULT 0, containers_json TEXT DEFAULT '[]', issues_json TEXT DEFAULT '[]' ); CREATE INDEX IF NOT EXISTS idx_app_telemetry_lookup ON app_telemetry(app_name, reported_at); CREATE INDEX IF NOT EXISTS idx_app_telemetry_customer ON app_telemetry(customer_id, app_name, reported_at); CREATE INDEX IF NOT EXISTS idx_app_telemetry_prune ON app_telemetry(reported_at); CREATE TABLE IF NOT EXISTS app_log_issues ( id INTEGER PRIMARY KEY AUTOINCREMENT, app_name TEXT NOT NULL, fingerprint TEXT NOT NULL, severity TEXT NOT NULL, message TEXT NOT NULL, first_seen DATETIME NOT NULL, last_seen DATETIME NOT NULL, occurrence_count INTEGER DEFAULT 1, affected_customers TEXT DEFAULT '[]', UNIQUE(app_name, fingerprint) ); CREATE INDEX IF NOT EXISTS idx_app_log_issues_app ON app_log_issues(app_name, last_seen DESC); `) if err != nil { return err } // v0.7.0: versioned infra backups with GFS retention _, err = s.db.Exec(` CREATE TABLE IF NOT EXISTS infra_backup_versions ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_id TEXT NOT NULL, backup_json TEXT NOT NULL, created_at DATETIME NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_ibv_customer_time ON infra_backup_versions(customer_id, created_at DESC); `) if err != nil { return err } // One-time migration: copy existing single-row backups to versioned table s.db.Exec(`INSERT INTO infra_backup_versions (customer_id, backup_json, created_at) SELECT customer_id, backup_json, updated_at FROM infra_backups WHERE NOT EXISTS (SELECT 1 FROM infra_backup_versions WHERE infra_backup_versions.customer_id = infra_backups.customer_id)`) return nil } // NotificationPrefs holds per-customer notification preferences. type NotificationPrefs struct { CustomerID string Email string EnabledEvents []string CooldownHours int } // GetNotificationPrefs returns notification preferences for a customer. func (s *Store) GetNotificationPrefs(customerID string) (*NotificationPrefs, error) { var email, eventsJSON string var cooldownHours int err := s.db.QueryRow( "SELECT email, enabled_events, COALESCE(cooldown_hours, 6) FROM customer_notifications WHERE customer_id = ?", customerID, ).Scan(&email, &eventsJSON, &cooldownHours) if err != nil { if err == sql.ErrNoRows { return nil, nil } return nil, err } var events []string if err := json.Unmarshal([]byte(eventsJSON), &events); err != nil { s.logger.Printf("[WARN] Corrupt enabled_events JSON for %s: %v", customerID, err) } if cooldownHours <= 0 { cooldownHours = 6 } return &NotificationPrefs{ CustomerID: customerID, Email: email, EnabledEvents: events, CooldownHours: cooldownHours, }, nil } // SaveNotificationPrefs creates or updates notification preferences for a customer. func (s *Store) SaveNotificationPrefs(customerID, email string, enabledEvents []string, cooldownHours int) error { eventsJSON, _ := json.Marshal(enabledEvents) if cooldownHours <= 0 { cooldownHours = 6 } _, err := s.db.Exec(` INSERT INTO customer_notifications (customer_id, email, enabled_events, cooldown_hours, updated_at) VALUES (?, ?, ?, ?, datetime('now')) ON CONFLICT(customer_id) DO UPDATE SET email = excluded.email, enabled_events = excluded.enabled_events, cooldown_hours = excluded.cooldown_hours, updated_at = datetime('now')`, customerID, email, string(eventsJSON), cooldownHours, ) return err } // LogNotification records a notification attempt. func (s *Store) LogNotification(customerID, eventType, severity, message, status, errorMsg, channel string) error { if channel == "" { channel = "customer" } _, err := s.db.Exec(` INSERT INTO notification_log (customer_id, event_type, severity, message, status, error_message, channel) VALUES (?, ?, ?, ?, ?, ?, ?)`, customerID, eventType, severity, message, status, errorMsg, channel, ) return err } // NotificationLogEntry represents a single notification log record. type NotificationLogEntry struct { EventType string Severity string Message string Status string // "sent", "skipped", "failed" ErrorMessage string Channel string // "operator" or "customer" CreatedAt time.Time } // GetRecentNotifications returns the most recent notification log entries for a customer. func (s *Store) GetRecentNotifications(customerID string, limit int) ([]NotificationLogEntry, error) { rows, err := s.db.Query(` SELECT event_type, severity, message, status, COALESCE(error_message, ''), COALESCE(channel, 'customer'), created_at FROM notification_log WHERE customer_id = ? ORDER BY created_at DESC LIMIT ?`, customerID, limit) if err != nil { return nil, err } defer rows.Close() var entries []NotificationLogEntry for rows.Next() { var e NotificationLogEntry var createdAt, errorMsg string if err := rows.Scan(&e.EventType, &e.Severity, &e.Message, &e.Status, &errorMsg, &e.Channel, &createdAt); err != nil { return nil, err } e.CreatedAt = parseSQLiteTime(createdAt) e.ErrorMessage = errorMsg entries = append(entries, e) } return entries, rows.Err() } // SaveReport stores a new report. The reportJSON should be the raw JSON payload. func (s *Store) SaveReport(customerID string, reportJSON []byte) error { // Parse denormalized fields from the JSON var parsed struct { ControllerVersion string `json:"controller_version"` ControllerURL string `json:"controller_url"` System struct { CPUPercent float64 `json:"cpu_percent"` MemoryPercent float64 `json:"memory_percent"` } `json:"system"` Containers struct { Total int `json:"total"` Running int `json:"running"` } `json:"containers"` Backup struct { LastSnapshot *time.Time `json:"last_snapshot"` } `json:"backup"` Health struct { Status string `json:"status"` } `json:"health"` } if err := json.Unmarshal(reportJSON, &parsed); err != nil { s.logger.Printf("[WARN] Cannot parse report fields for denormalization: %v", err) } var backupSnapshot *string if parsed.Backup.LastSnapshot != nil { t := parsed.Backup.LastSnapshot.Format(time.RFC3339) backupSnapshot = &t } _, err := s.db.Exec(` INSERT INTO reports (customer_id, report_json, health_status, cpu_percent, memory_percent, container_total, container_running, backup_last_snapshot, controller_version, controller_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, customerID, string(reportJSON), parsed.Health.Status, parsed.System.CPUPercent, parsed.System.MemoryPercent, parsed.Containers.Total, parsed.Containers.Running, backupSnapshot, parsed.ControllerVersion, parsed.ControllerURL, ) return err } // GetCustomers returns the latest report summary for each customer. func (s *Store) GetCustomers() ([]CustomerSummary, error) { rows, err := s.db.Query(` SELECT r.customer_id, r.received_at, r.report_json, r.health_status, r.cpu_percent, r.memory_percent, r.container_total, r.container_running, r.backup_last_snapshot, r.controller_version, r.controller_url FROM reports r INNER JOIN ( SELECT customer_id, MAX(received_at) as max_time FROM reports GROUP BY customer_id ) latest ON r.customer_id = latest.customer_id AND r.received_at = latest.max_time ORDER BY r.customer_id`) if err != nil { return nil, err } defer rows.Close() var customers []CustomerSummary for rows.Next() { var c CustomerSummary var receivedAt string var backupSnapshot sql.NullString var controllerURL sql.NullString if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON, &c.HealthStatus, &c.CPUPercent, &c.MemoryPercent, &c.ContainerTotal, &c.ContainerRunning, &backupSnapshot, &c.ControllerVersion, &controllerURL); err != nil { return nil, err } c.ReceivedAt = parseSQLiteTime(receivedAt) c.TimeSinceReport = time.Since(c.ReceivedAt) if backupSnapshot.Valid { t, err := time.Parse(time.RFC3339, backupSnapshot.String) if err == nil { c.BackupLastSnapshot = &t } } if controllerURL.Valid { c.ControllerURL = controllerURL.String } // Parse customer_name from JSON var report struct { CustomerName string `json:"customer_name"` } if err := json.Unmarshal([]byte(c.ReportJSON), &report); err != nil { s.logger.Printf("[WARN] Cannot parse customer_name from report JSON for %s: %v", c.CustomerID, err) } c.CustomerName = report.CustomerName // Parse disk summary c.DiskSummary = parseDiskSummary(c.ReportJSON) customers = append(customers, c) } return customers, rows.Err() } // GetCustomer returns the latest report for a specific customer. func (s *Store) GetCustomer(customerID string) (*CustomerSummary, error) { row := s.db.QueryRow(` SELECT customer_id, received_at, report_json, health_status, cpu_percent, memory_percent, container_total, container_running, backup_last_snapshot, controller_version, controller_url FROM reports WHERE customer_id = ? ORDER BY received_at DESC LIMIT 1`, customerID) var c CustomerSummary var receivedAt string var backupSnapshot sql.NullString var controllerURL sql.NullString if err := row.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON, &c.HealthStatus, &c.CPUPercent, &c.MemoryPercent, &c.ContainerTotal, &c.ContainerRunning, &backupSnapshot, &c.ControllerVersion, &controllerURL); err != nil { if err == sql.ErrNoRows { return nil, nil } return nil, err } c.ReceivedAt = parseSQLiteTime(receivedAt) c.TimeSinceReport = time.Since(c.ReceivedAt) if backupSnapshot.Valid { t, err := time.Parse(time.RFC3339, backupSnapshot.String) if err == nil { c.BackupLastSnapshot = &t } } if controllerURL.Valid { c.ControllerURL = controllerURL.String } var report struct { CustomerName string `json:"customer_name"` } json.Unmarshal([]byte(c.ReportJSON), &report) c.CustomerName = report.CustomerName c.DiskSummary = parseDiskSummary(c.ReportJSON) return &c, nil } // GetCustomerHistory returns report history for a customer. func (s *Store) GetCustomerHistory(customerID string, since time.Duration) ([]CustomerSummary, error) { cutoff := time.Now().Add(-since).Format("2006-01-02 15:04:05") rows, err := s.db.Query(` SELECT customer_id, received_at, report_json, health_status, cpu_percent, memory_percent, container_total, container_running, backup_last_snapshot, controller_version, controller_url FROM reports WHERE customer_id = ? AND received_at >= ? ORDER BY received_at DESC`, customerID, cutoff) if err != nil { return nil, err } defer rows.Close() var history []CustomerSummary for rows.Next() { var c CustomerSummary var receivedAt string var backupSnapshot sql.NullString var controllerURL sql.NullString if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON, &c.HealthStatus, &c.CPUPercent, &c.MemoryPercent, &c.ContainerTotal, &c.ContainerRunning, &backupSnapshot, &c.ControllerVersion, &controllerURL); err != nil { return nil, err } c.ReceivedAt = parseSQLiteTime(receivedAt) c.TimeSinceReport = time.Since(c.ReceivedAt) if backupSnapshot.Valid { t, err := time.Parse(time.RFC3339, backupSnapshot.String) if err == nil { c.BackupLastSnapshot = &t } } if controllerURL.Valid { c.ControllerURL = controllerURL.String } history = append(history, c) } return history, rows.Err() } // SaveInfraBackup inserts a new infra backup version and prunes old ones (GFS retention). func (s *Store) SaveInfraBackup(customerID string, backupJSON []byte) error { _, err := s.db.Exec(` INSERT INTO infra_backup_versions (customer_id, backup_json, created_at) VALUES (?, ?, datetime('now'))`, customerID, string(backupJSON), ) if err != nil { return err } // Also maintain legacy table for backward compatibility during rollback window s.db.Exec(`INSERT INTO infra_backups (customer_id, backup_json, updated_at) VALUES (?, ?, datetime('now')) ON CONFLICT(customer_id) DO UPDATE SET backup_json = excluded.backup_json, updated_at = datetime('now')`, customerID, string(backupJSON)) s.pruneInfraBackups(customerID) return nil } // GetInfraBackup returns the latest infra backup JSON for a customer, or nil if not found. func (s *Store) GetInfraBackup(customerID string) ([]byte, error) { var data string err := s.db.QueryRow( "SELECT backup_json FROM infra_backup_versions WHERE customer_id = ? ORDER BY created_at DESC LIMIT 1", customerID, ).Scan(&data) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } return []byte(data), nil } // GetInfraBackupByID returns the infra backup JSON for a specific version ID, or nil if not found. func (s *Store) GetInfraBackupByID(id int64) ([]byte, error) { var data string err := s.db.QueryRow( "SELECT backup_json FROM infra_backup_versions WHERE id = ?", id, ).Scan(&data) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } return []byte(data), nil } // InfraBackupMeta holds summary info for the dashboard (avoids parsing full JSON). type InfraBackupMeta struct { UpdatedAt time.Time StackCount int DiskCount int VersionCount int } // GetInfraBackupMeta returns summary metadata for a customer's latest infra backup. func (s *Store) GetInfraBackupMeta(customerID string) (*InfraBackupMeta, error) { var backupJSON, createdAt string err := s.db.QueryRow( "SELECT backup_json, created_at FROM infra_backup_versions WHERE customer_id = ? ORDER BY created_at DESC LIMIT 1", customerID, ).Scan(&backupJSON, &createdAt) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } meta := &InfraBackupMeta{ UpdatedAt: parseSQLiteTime(createdAt), } // Count total versions s.db.QueryRow("SELECT COUNT(*) FROM infra_backup_versions WHERE customer_id = ?", customerID).Scan(&meta.VersionCount) // Parse just the fields we need parseInfraBackupCounts(backupJSON, &meta.StackCount, &meta.DiskCount, nil, s.logger, customerID) return meta, nil } // InfraBackupVersion holds summary info for a single backup version. type InfraBackupVersion struct { ID int64 `json:"id"` CreatedAt time.Time `json:"created_at"` StackCount int `json:"stack_count"` DiskCount int `json:"disk_count"` StackNames []string `json:"stack_names,omitempty"` } // ListInfraBackupVersions returns metadata for all retained versions of a customer's backup. func (s *Store) ListInfraBackupVersions(customerID string) ([]InfraBackupVersion, error) { rows, err := s.db.Query( "SELECT id, backup_json, created_at FROM infra_backup_versions WHERE customer_id = ? ORDER BY created_at DESC", customerID, ) if err != nil { return nil, err } defer rows.Close() var versions []InfraBackupVersion for rows.Next() { var v InfraBackupVersion var backupJSON, createdAt string if err := rows.Scan(&v.ID, &backupJSON, &createdAt); err != nil { return nil, err } v.CreatedAt = parseSQLiteTime(createdAt) parseInfraBackupCounts(backupJSON, &v.StackCount, &v.DiskCount, &v.StackNames, nil, "") versions = append(versions, v) } return versions, rows.Err() } // pruneInfraBackups applies GFS retention: keep all from last 24h, latest per day (7d), // latest per week (4w), latest per month (3mo). Delete everything else. func (s *Store) pruneInfraBackups(customerID string) { rows, err := s.db.Query( "SELECT id, created_at FROM infra_backup_versions WHERE customer_id = ? ORDER BY created_at DESC", customerID, ) if err != nil { return } defer rows.Close() type entry struct { id int64 createdAt time.Time } var all []entry for rows.Next() { var e entry var ts string if err := rows.Scan(&e.id, &ts); err != nil { return } e.createdAt = parseSQLiteTime(ts) all = append(all, e) } if len(all) <= 1 { return } now := time.Now().UTC() keep := make(map[int64]bool) seenDays := make(map[string]bool) seenWeeks := make(map[string]bool) seenMonths := make(map[string]bool) for _, e := range all { age := now.Sub(e.createdAt) // Keep all from last 24h if age < 24*time.Hour { keep[e.id] = true continue } // Latest per calendar day for last 7 days if age < 7*24*time.Hour { day := e.createdAt.Format("2006-01-02") if !seenDays[day] { seenDays[day] = true keep[e.id] = true } continue } // Latest per ISO week for last 4 weeks if age < 28*24*time.Hour { year, week := e.createdAt.ISOWeek() wk := fmt.Sprintf("%d-W%02d", year, week) if !seenWeeks[wk] { seenWeeks[wk] = true keep[e.id] = true } continue } // Latest per calendar month for last 3 months if age < 90*24*time.Hour { month := e.createdAt.Format("2006-01") if !seenMonths[month] { seenMonths[month] = true keep[e.id] = true } continue } // Older than 3 months — don't keep } // Build delete list var deleteIDs []interface{} var placeholders []string for _, e := range all { if !keep[e.id] { deleteIDs = append(deleteIDs, e.id) placeholders = append(placeholders, "?") } } if len(deleteIDs) == 0 { return } query := "DELETE FROM infra_backup_versions WHERE id IN (" + joinStrings(placeholders, ",") + ")" _, err = s.db.Exec(query, deleteIDs...) if err != nil { s.logger.Printf("[WARN] Failed to prune infra backup versions for %s: %v", customerID, err) } else { s.logger.Printf("[INFO] Pruned %d old infra backup version(s) for %s (kept %d)", len(deleteIDs), customerID, len(keep)) } } // parseInfraBackupCounts extracts stack/disk counts and optionally stack names from backup JSON. func parseInfraBackupCounts(backupJSON string, stackCount, diskCount *int, stackNames *[]string, logger *log.Logger, customerID string) { var parsed struct { DeployedStacks []struct { Name string `json:"name"` DisplayName string `json:"display_name"` } `json:"deployed_stacks"` DiskLayout struct { Mounts []json.RawMessage `json:"mounts"` } `json:"disk_layout"` } if err := json.Unmarshal([]byte(backupJSON), &parsed); err != nil { if logger != nil { logger.Printf("[WARN] Failed to parse infra backup metadata for %s: %v", customerID, err) } return } *stackCount = len(parsed.DeployedStacks) *diskCount = len(parsed.DiskLayout.Mounts) if stackNames != nil { for _, s := range parsed.DeployedStacks { name := s.DisplayName if name == "" { name = s.Name } *stackNames = append(*stackNames, name) } } } func joinStrings(ss []string, sep string) string { if len(ss) == 0 { return "" } result := ss[0] for _, s := range ss[1:] { result += sep + s } return result } // Prune deletes reports older than the given number of days. func (s *Store) Prune(maxDays int) (int64, error) { cutoff := time.Now().AddDate(0, 0, -maxDays).Format("2006-01-02 15:04:05") res, err := s.db.Exec("DELETE FROM reports WHERE received_at < ?", cutoff) if err != nil { return 0, err } return res.RowsAffected() } // Close closes the database connection. func (s *Store) Close() error { return s.db.Close() } // CustomerConfig holds a pre-provisioned customer configuration. type CustomerConfig struct { CustomerID string CustomerName string Domain string Email string RetrievalPassword string APIKey string ConfigJSON string // JSON object with customer-specific override fields Status string // "active" or "blocked" CreatedAt time.Time UpdatedAt time.Time } // SaveCustomerConfig creates or updates a customer configuration. func (s *Store) SaveCustomerConfig(cfg *CustomerConfig) error { _, err := s.db.Exec(` INSERT INTO customer_configs (customer_id, customer_name, domain, email, retrieval_password, api_key, config_json, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(customer_id) DO UPDATE SET customer_name = excluded.customer_name, domain = excluded.domain, email = excluded.email, retrieval_password = excluded.retrieval_password, api_key = excluded.api_key, config_json = excluded.config_json, updated_at = datetime('now')`, cfg.CustomerID, cfg.CustomerName, cfg.Domain, cfg.Email, cfg.RetrievalPassword, cfg.APIKey, cfg.ConfigJSON, ) return err } // GetCustomerConfig returns a customer configuration by ID, or nil if not found. func (s *Store) GetCustomerConfig(customerID string) (*CustomerConfig, error) { var cfg CustomerConfig var createdAt, updatedAt string err := s.db.QueryRow(` SELECT customer_id, customer_name, domain, email, retrieval_password, api_key, config_json, status, created_at, updated_at FROM customer_configs WHERE customer_id = ?`, customerID, ).Scan(&cfg.CustomerID, &cfg.CustomerName, &cfg.Domain, &cfg.Email, &cfg.RetrievalPassword, &cfg.APIKey, &cfg.ConfigJSON, &cfg.Status, &createdAt, &updatedAt) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } cfg.CreatedAt = parseSQLiteTime(createdAt) cfg.UpdatedAt = parseSQLiteTime(updatedAt) return &cfg, nil } // ListCustomerConfigs returns all customer configurations ordered by ID. func (s *Store) ListCustomerConfigs() ([]CustomerConfig, error) { rows, err := s.db.Query(` SELECT customer_id, customer_name, domain, email, retrieval_password, api_key, config_json, status, created_at, updated_at FROM customer_configs ORDER BY customer_id`) if err != nil { return nil, err } defer rows.Close() var configs []CustomerConfig for rows.Next() { var cfg CustomerConfig var createdAt, updatedAt string if err := rows.Scan(&cfg.CustomerID, &cfg.CustomerName, &cfg.Domain, &cfg.Email, &cfg.RetrievalPassword, &cfg.APIKey, &cfg.ConfigJSON, &cfg.Status, &createdAt, &updatedAt); err != nil { return nil, err } cfg.CreatedAt = parseSQLiteTime(createdAt) cfg.UpdatedAt = parseSQLiteTime(updatedAt) configs = append(configs, cfg) } return configs, rows.Err() } // DeleteCustomerConfig deletes a customer configuration. func (s *Store) DeleteCustomerConfig(customerID string) error { _, err := s.db.Exec("DELETE FROM customer_configs WHERE customer_id = ?", customerID) return err } // GetCustomerConfigByAPIKey looks up a customer config by its unique API key. // Returns nil if no matching key is found. func (s *Store) GetCustomerConfigByAPIKey(apiKey string) (*CustomerConfig, error) { var cfg CustomerConfig var createdAt, updatedAt string err := s.db.QueryRow(` SELECT customer_id, customer_name, domain, email, retrieval_password, api_key, config_json, status, created_at, updated_at FROM customer_configs WHERE api_key = ?`, apiKey, ).Scan(&cfg.CustomerID, &cfg.CustomerName, &cfg.Domain, &cfg.Email, &cfg.RetrievalPassword, &cfg.APIKey, &cfg.ConfigJSON, &cfg.Status, &createdAt, &updatedAt) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } cfg.CreatedAt = parseSQLiteTime(createdAt) cfg.UpdatedAt = parseSQLiteTime(updatedAt) return &cfg, nil } // SetCustomerConfigStatus sets the status (active/blocked) for a customer config. func (s *Store) SetCustomerConfigStatus(customerID, status string) error { _, err := s.db.Exec(` UPDATE customer_configs SET status = ?, updated_at = datetime('now') WHERE customer_id = ?`, status, customerID, ) return err } // IsCustomerBlocked returns true if the customer config has status "blocked". func (s *Store) IsCustomerBlocked(customerID string) bool { var status string err := s.db.QueryRow( "SELECT status FROM customer_configs WHERE customer_id = ?", customerID, ).Scan(&status) return err == nil && status == "blocked" } // UpdateRetrievalPassword updates the retrieval password for a customer config. func (s *Store) UpdateRetrievalPassword(customerID, newPassword string) error { _, err := s.db.Exec(` UPDATE customer_configs SET retrieval_password = ?, updated_at = datetime('now') WHERE customer_id = ?`, newPassword, customerID, ) return err } // --- Event system --- // Event represents a single event record. type Event struct { ID int64 CustomerID string EventType string Severity string // "info", "warning", "error" Message string DetailsJSON string // raw JSON Source string // "controller" or "hub" CreatedAt time.Time } // SaveEvent inserts a new event and returns its ID. func (s *Store) SaveEvent(customerID, eventType, severity, message, detailsJSON, source string) (int64, error) { if detailsJSON == "" { detailsJSON = "{}" } if source == "" { source = "controller" } res, err := s.db.Exec(` INSERT INTO events (customer_id, event_type, severity, message, details_json, source) VALUES (?, ?, ?, ?, ?, ?)`, customerID, eventType, severity, message, detailsJSON, source, ) if err != nil { return 0, err } return res.LastInsertId() } // GetRecentEvents returns the most recent events for a customer, newest first. func (s *Store) GetRecentEvents(customerID string, limit int) ([]Event, error) { rows, err := s.db.Query(` SELECT id, customer_id, event_type, severity, message, details_json, source, created_at FROM events WHERE customer_id = ? ORDER BY created_at DESC LIMIT ?`, customerID, limit) if err != nil { return nil, err } defer rows.Close() return scanEvents(rows) } // GetEventsByType returns events of a specific type for a customer since a given time. func (s *Store) GetEventsByType(customerID, eventType string, since time.Time) ([]Event, error) { rows, err := s.db.Query(` SELECT id, customer_id, event_type, severity, message, details_json, source, created_at FROM events WHERE customer_id = ? AND event_type = ? AND created_at >= ? ORDER BY created_at DESC`, customerID, eventType, since.UTC().Format("2006-01-02 15:04:05")) if err != nil { return nil, err } defer rows.Close() return scanEvents(rows) } // GetLatestEventByType returns the most recent event of a given type for a customer. func (s *Store) GetLatestEventByType(customerID, eventType string) (*Event, error) { var e Event var createdAt string err := s.db.QueryRow(` SELECT id, customer_id, event_type, severity, message, details_json, source, created_at FROM events WHERE customer_id = ? AND event_type = ? ORDER BY created_at DESC LIMIT 1`, customerID, eventType, ).Scan(&e.ID, &e.CustomerID, &e.EventType, &e.Severity, &e.Message, &e.DetailsJSON, &e.Source, &createdAt) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } e.CreatedAt = parseSQLiteTime(createdAt) return &e, nil } // GetAllRecentEvents returns the most recent events across all customers. func (s *Store) GetAllRecentEvents(limit int) ([]Event, error) { rows, err := s.db.Query(` SELECT id, customer_id, event_type, severity, message, details_json, source, created_at FROM events ORDER BY created_at DESC LIMIT ?`, limit) if err != nil { return nil, err } defer rows.Close() return scanEvents(rows) } // CountEventsBySeverity returns a count of events per severity for a customer since a given time. func (s *Store) CountEventsBySeverity(customerID string, since time.Time) (map[string]int, error) { rows, err := s.db.Query(` SELECT severity, COUNT(*) FROM events WHERE customer_id = ? AND created_at >= ? GROUP BY severity`, customerID, since.UTC().Format("2006-01-02 15:04:05")) if err != nil { return nil, err } defer rows.Close() counts := make(map[string]int) for rows.Next() { var sev string var count int if err := rows.Scan(&sev, &count); err != nil { return nil, err } counts[sev] = count } return counts, rows.Err() } // PruneEvents deletes events older than the given number of days. func (s *Store) PruneEvents(maxDays int) (int64, error) { cutoff := time.Now().AddDate(0, 0, -maxDays).UTC().Format("2006-01-02 15:04:05") res, err := s.db.Exec("DELETE FROM events WHERE created_at < ?", cutoff) if err != nil { return 0, err } return res.RowsAffected() } // GetActiveCustomerIDs returns customer IDs from customer_configs where status is 'active'. func (s *Store) GetActiveCustomerIDs() ([]string, error) { rows, err := s.db.Query("SELECT customer_id FROM customer_configs WHERE status = 'active'") if err != nil { return nil, err } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return nil, err } ids = append(ids, id) } return ids, rows.Err() } // Ping verifies the database is accessible. func (s *Store) Ping() error { var n int return s.db.QueryRow("SELECT 1").Scan(&n) } func scanEvents(rows *sql.Rows) ([]Event, error) { var events []Event for rows.Next() { var e Event var createdAt string if err := rows.Scan(&e.ID, &e.CustomerID, &e.EventType, &e.Severity, &e.Message, &e.DetailsJSON, &e.Source, &createdAt); err != nil { return nil, err } e.CreatedAt = parseSQLiteTime(createdAt) events = append(events, e) } return events, rows.Err() } // parseSQLiteTime tries multiple formats that modernc.org/sqlite may return. func parseSQLiteTime(s string) time.Time { formats := []string{ "2006-01-02 15:04:05", // SQLite datetime('now') "2006-01-02T15:04:05Z", // RFC3339 without fractional time.RFC3339, // 2006-01-02T15:04:05Z07:00 time.RFC3339Nano, // with fractional seconds "2006-01-02 15:04:05+00:00", // with explicit UTC offset "2006-01-02 15:04:05.999999999", // with fractional, no TZ } for _, f := range formats { if t, err := time.Parse(f, s); err == nil { return t } } // Last resort: if string is non-empty, log it for debugging if s != "" { log.Printf("[WARN] Could not parse timestamp: %q", s) } return time.Time{} // zero time } func parseDiskSummary(reportJSON string) string { var report struct { Storage []struct { Mount string `json:"mount"` Percent float64 `json:"percent"` } `json:"storage"` } // ignore parse errors — show "–" on failure json.Unmarshal([]byte(reportJSON), &report) //nolint:errcheck var parts []string for _, s := range report.Storage { parts = append(parts, fmt.Sprintf("%.0f%%", s.Percent)) } if len(parts) == 0 { return "–" } result := parts[0] for _, p := range parts[1:] { result += "/" + p } return result }