Files
felhom.eu/hub/internal/store/store.go
T
admin a757bee07a feat(hub): app telemetry analytics dashboard (v0.4.0)
- store/telemetry.go: new app_telemetry + app_log_issues tables with
  SaveAppTelemetry, GetFleetAppSummary (with P95), GetAppTelemetryHistory,
  GetAppCustomerBreakdown, GetCustomerAppSummary, GetAppIssues, prune methods
- api/handler.go: parse and save optional app_telemetry from report body,
  backward-compatible with old controllers
- cmd/hub/main.go: prune app_telemetry (90d) and stale issues (30d)
- web/apps.go: handleApps + handleAppDetail + chart data aggregation helpers
- web/server.go: routes for /apps, /apps/{name}, /static/chart.min.js;
  added memoryColor/accuracyClass/gt template functions
- web/embed.go: embed static/chart.min.js
- web/configs.go: add app telemetry section to handleCustomerUnified
- templates/apps.html: fleet-wide app list with summary cards and sortable table
- templates/app_detail.html: per-app page with Chart.js memory trend,
  customer breakdown, and known issues table
- templates/customer_unified.html: new Alkalmazás telemetria card
- templates/style.css: badge, summary-card, chart, period-selector,
  accuracy-dot, mem-color, data-table styles
- All templates: added Alkalmazások nav link

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 10:46:50 +01:00

966 lines
28 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package store
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "modernc.org/sqlite"
)
// Store handles SQLite persistence for customer reports.
type Store struct {
db *sql.DB
logger *log.Logger
}
// CustomerSummary holds the latest status for a customer (for dashboard).
type CustomerSummary struct {
CustomerID string
CustomerName string
ControllerVersion string
ReceivedAt time.Time
HealthStatus string
CPUPercent float64
MemoryPercent float64
ContainerTotal int
ContainerRunning int
BackupLastSnapshot *time.Time
ReportJSON string
ControllerURL string
// Computed fields (not stored)
TimeSinceReport time.Duration
DiskSummary string
}
// New creates a new store and initializes the schema.
func New(dbPath string, logger *log.Logger) (*Store, error) {
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
if err != nil {
return nil, fmt.Errorf("opening database: %w", err)
}
s := &Store{db: db, logger: logger}
if err := s.migrate(); err != nil {
db.Close()
return nil, fmt.Errorf("migrating database: %w", err)
}
return s, nil
}
func (s *Store) migrate() error {
_, err := s.db.Exec(`
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
received_at DATETIME NOT NULL DEFAULT (datetime('now')),
report_json TEXT NOT NULL,
health_status TEXT,
cpu_percent REAL,
memory_percent REAL,
container_total INTEGER,
container_running INTEGER,
backup_last_snapshot DATETIME,
controller_version TEXT
);
CREATE INDEX IF NOT EXISTS idx_reports_customer
ON reports(customer_id, received_at DESC);
CREATE TABLE IF NOT EXISTS customer_notifications (
customer_id TEXT PRIMARY KEY,
email TEXT NOT NULL DEFAULT '',
enabled_events TEXT NOT NULL DEFAULT '[]',
created_at DATETIME DEFAULT (datetime('now')),
updated_at DATETIME DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS notification_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
event_type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
error_message TEXT,
created_at DATETIME DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_notification_log_customer
ON notification_log(customer_id, created_at DESC);
CREATE TABLE IF NOT EXISTS infra_backups (
customer_id TEXT PRIMARY KEY,
backup_json TEXT NOT NULL,
updated_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS customer_configs (
customer_id TEXT PRIMARY KEY,
customer_name TEXT NOT NULL DEFAULT '',
domain TEXT NOT NULL DEFAULT '',
email TEXT NOT NULL DEFAULT '',
retrieval_password TEXT NOT NULL,
api_key TEXT NOT NULL,
config_json TEXT NOT NULL DEFAULT '{}',
created_at DATETIME NOT NULL DEFAULT (datetime('now')),
updated_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
`)
if err != nil {
return err
}
// v0.1.8: add controller_url column (idempotent — ignore error if already exists)
s.db.Exec("ALTER TABLE reports ADD COLUMN controller_url TEXT")
// v0.2.1: add status column to customer_configs (idempotent)
s.db.Exec("ALTER TABLE customer_configs ADD COLUMN status TEXT NOT NULL DEFAULT 'active'")
// v0.3.0: events table for hub-native monitoring
_, err = s.db.Exec(`
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
event_type TEXT NOT NULL,
severity TEXT NOT NULL DEFAULT 'info',
message TEXT NOT NULL DEFAULT '',
details_json TEXT NOT NULL DEFAULT '{}',
source TEXT NOT NULL DEFAULT 'controller',
created_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_events_customer_created
ON events(customer_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_events_type
ON events(event_type, created_at DESC);
`)
if err != nil {
return err
}
// v0.3.0: add cooldown_hours to customer_notifications (idempotent)
s.db.Exec("ALTER TABLE customer_notifications ADD COLUMN cooldown_hours INTEGER DEFAULT 6")
// v0.3.0: add channel column to notification_log (idempotent)
s.db.Exec("ALTER TABLE notification_log ADD COLUMN channel TEXT NOT NULL DEFAULT 'customer'")
// v0.4.0: app telemetry tables
_, err = s.db.Exec(`
CREATE TABLE IF NOT EXISTS app_telemetry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
app_name TEXT NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
reported_at DATETIME NOT NULL,
memory_current_mb REAL DEFAULT 0,
memory_avg_mb REAL DEFAULT 0,
memory_peak_mb REAL DEFAULT 0,
cpu_avg_percent REAL DEFAULT 0,
catalog_estimate TEXT DEFAULT '',
catalog_limit TEXT DEFAULT '',
log_errors INTEGER DEFAULT 0,
log_warnings INTEGER DEFAULT 0,
containers_json TEXT DEFAULT '[]',
issues_json TEXT DEFAULT '[]'
);
CREATE INDEX IF NOT EXISTS idx_app_telemetry_lookup
ON app_telemetry(app_name, reported_at);
CREATE INDEX IF NOT EXISTS idx_app_telemetry_customer
ON app_telemetry(customer_id, app_name, reported_at);
CREATE INDEX IF NOT EXISTS idx_app_telemetry_prune
ON app_telemetry(reported_at);
CREATE TABLE IF NOT EXISTS app_log_issues (
id INTEGER PRIMARY KEY AUTOINCREMENT,
app_name TEXT NOT NULL,
fingerprint TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL,
occurrence_count INTEGER DEFAULT 1,
affected_customers TEXT DEFAULT '[]',
UNIQUE(app_name, fingerprint)
);
CREATE INDEX IF NOT EXISTS idx_app_log_issues_app
ON app_log_issues(app_name, last_seen DESC);
`)
if err != nil {
return err
}
return nil
}
// NotificationPrefs holds per-customer notification preferences.
type NotificationPrefs struct {
CustomerID string
Email string
EnabledEvents []string
CooldownHours int
}
// GetNotificationPrefs returns notification preferences for a customer.
func (s *Store) GetNotificationPrefs(customerID string) (*NotificationPrefs, error) {
var email, eventsJSON string
var cooldownHours int
err := s.db.QueryRow(
"SELECT email, enabled_events, COALESCE(cooldown_hours, 6) FROM customer_notifications WHERE customer_id = ?",
customerID,
).Scan(&email, &eventsJSON, &cooldownHours)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
var events []string
if err := json.Unmarshal([]byte(eventsJSON), &events); err != nil {
s.logger.Printf("[WARN] Corrupt enabled_events JSON for %s: %v", customerID, err)
}
if cooldownHours <= 0 {
cooldownHours = 6
}
return &NotificationPrefs{
CustomerID: customerID,
Email: email,
EnabledEvents: events,
CooldownHours: cooldownHours,
}, nil
}
// SaveNotificationPrefs creates or updates notification preferences for a customer.
func (s *Store) SaveNotificationPrefs(customerID, email string, enabledEvents []string, cooldownHours int) error {
eventsJSON, _ := json.Marshal(enabledEvents)
if cooldownHours <= 0 {
cooldownHours = 6
}
_, err := s.db.Exec(`
INSERT INTO customer_notifications (customer_id, email, enabled_events, cooldown_hours, updated_at)
VALUES (?, ?, ?, ?, datetime('now'))
ON CONFLICT(customer_id) DO UPDATE SET
email = excluded.email,
enabled_events = excluded.enabled_events,
cooldown_hours = excluded.cooldown_hours,
updated_at = datetime('now')`,
customerID, email, string(eventsJSON), cooldownHours,
)
return err
}
// LogNotification records a notification attempt.
func (s *Store) LogNotification(customerID, eventType, severity, message, status, errorMsg, channel string) error {
if channel == "" {
channel = "customer"
}
_, err := s.db.Exec(`
INSERT INTO notification_log (customer_id, event_type, severity, message, status, error_message, channel)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
customerID, eventType, severity, message, status, errorMsg, channel,
)
return err
}
// NotificationLogEntry represents a single notification log record.
type NotificationLogEntry struct {
EventType string
Severity string
Message string
Status string // "sent", "skipped", "failed"
ErrorMessage string
Channel string // "operator" or "customer"
CreatedAt time.Time
}
// GetRecentNotifications returns the most recent notification log entries for a customer.
func (s *Store) GetRecentNotifications(customerID string, limit int) ([]NotificationLogEntry, error) {
rows, err := s.db.Query(`
SELECT event_type, severity, message, status, COALESCE(error_message, ''), COALESCE(channel, 'customer'), created_at
FROM notification_log
WHERE customer_id = ?
ORDER BY created_at DESC
LIMIT ?`, customerID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var entries []NotificationLogEntry
for rows.Next() {
var e NotificationLogEntry
var createdAt, errorMsg string
if err := rows.Scan(&e.EventType, &e.Severity, &e.Message, &e.Status, &errorMsg, &e.Channel, &createdAt); err != nil {
return nil, err
}
e.CreatedAt = parseSQLiteTime(createdAt)
e.ErrorMessage = errorMsg
entries = append(entries, e)
}
return entries, rows.Err()
}
// SaveReport stores a new report. The reportJSON should be the raw JSON payload.
func (s *Store) SaveReport(customerID string, reportJSON []byte) error {
// Parse denormalized fields from the JSON
var parsed struct {
ControllerVersion string `json:"controller_version"`
ControllerURL string `json:"controller_url"`
System struct {
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
} `json:"system"`
Containers struct {
Total int `json:"total"`
Running int `json:"running"`
} `json:"containers"`
Backup struct {
LastSnapshot *time.Time `json:"last_snapshot"`
} `json:"backup"`
Health struct {
Status string `json:"status"`
} `json:"health"`
}
if err := json.Unmarshal(reportJSON, &parsed); err != nil {
s.logger.Printf("[WARN] Cannot parse report fields for denormalization: %v", err)
}
var backupSnapshot *string
if parsed.Backup.LastSnapshot != nil {
t := parsed.Backup.LastSnapshot.Format(time.RFC3339)
backupSnapshot = &t
}
_, err := s.db.Exec(`
INSERT INTO reports (customer_id, report_json, health_status, cpu_percent,
memory_percent, container_total, container_running,
backup_last_snapshot, controller_version, controller_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
customerID, string(reportJSON),
parsed.Health.Status, parsed.System.CPUPercent,
parsed.System.MemoryPercent, parsed.Containers.Total,
parsed.Containers.Running, backupSnapshot,
parsed.ControllerVersion, parsed.ControllerURL,
)
return err
}
// GetCustomers returns the latest report summary for each customer.
func (s *Store) GetCustomers() ([]CustomerSummary, error) {
rows, err := s.db.Query(`
SELECT r.customer_id, r.received_at, r.report_json,
r.health_status, r.cpu_percent, r.memory_percent,
r.container_total, r.container_running,
r.backup_last_snapshot, r.controller_version, r.controller_url
FROM reports r
INNER JOIN (
SELECT customer_id, MAX(received_at) as max_time
FROM reports
GROUP BY customer_id
) latest ON r.customer_id = latest.customer_id
AND r.received_at = latest.max_time
ORDER BY r.customer_id`)
if err != nil {
return nil, err
}
defer rows.Close()
var customers []CustomerSummary
for rows.Next() {
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
var controllerURL sql.NullString
if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion, &controllerURL); err != nil {
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
if controllerURL.Valid {
c.ControllerURL = controllerURL.String
}
// Parse customer_name from JSON
var report struct {
CustomerName string `json:"customer_name"`
}
if err := json.Unmarshal([]byte(c.ReportJSON), &report); err != nil {
s.logger.Printf("[WARN] Cannot parse customer_name from report JSON for %s: %v", c.CustomerID, err)
}
c.CustomerName = report.CustomerName
// Parse disk summary
c.DiskSummary = parseDiskSummary(c.ReportJSON)
customers = append(customers, c)
}
return customers, rows.Err()
}
// GetCustomer returns the latest report for a specific customer.
func (s *Store) GetCustomer(customerID string) (*CustomerSummary, error) {
row := s.db.QueryRow(`
SELECT customer_id, received_at, report_json,
health_status, cpu_percent, memory_percent,
container_total, container_running,
backup_last_snapshot, controller_version, controller_url
FROM reports
WHERE customer_id = ?
ORDER BY received_at DESC
LIMIT 1`, customerID)
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
var controllerURL sql.NullString
if err := row.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion, &controllerURL); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
if controllerURL.Valid {
c.ControllerURL = controllerURL.String
}
var report struct {
CustomerName string `json:"customer_name"`
}
json.Unmarshal([]byte(c.ReportJSON), &report)
c.CustomerName = report.CustomerName
c.DiskSummary = parseDiskSummary(c.ReportJSON)
return &c, nil
}
// GetCustomerHistory returns report history for a customer.
func (s *Store) GetCustomerHistory(customerID string, since time.Duration) ([]CustomerSummary, error) {
cutoff := time.Now().Add(-since).Format("2006-01-02 15:04:05")
rows, err := s.db.Query(`
SELECT customer_id, received_at, report_json,
health_status, cpu_percent, memory_percent,
container_total, container_running,
backup_last_snapshot, controller_version, controller_url
FROM reports
WHERE customer_id = ? AND received_at >= ?
ORDER BY received_at DESC`, customerID, cutoff)
if err != nil {
return nil, err
}
defer rows.Close()
var history []CustomerSummary
for rows.Next() {
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
var controllerURL sql.NullString
if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion, &controllerURL); err != nil {
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
if controllerURL.Valid {
c.ControllerURL = controllerURL.String
}
history = append(history, c)
}
return history, rows.Err()
}
// SaveInfraBackup upserts the infrastructure backup for a customer.
func (s *Store) SaveInfraBackup(customerID string, backupJSON []byte) error {
_, err := s.db.Exec(`
INSERT INTO infra_backups (customer_id, backup_json, updated_at)
VALUES (?, ?, datetime('now'))
ON CONFLICT(customer_id) DO UPDATE SET
backup_json = excluded.backup_json,
updated_at = datetime('now')`,
customerID, string(backupJSON),
)
return err
}
// GetInfraBackup returns the raw infra backup JSON for a customer, or nil if not found.
func (s *Store) GetInfraBackup(customerID string) ([]byte, error) {
var data string
err := s.db.QueryRow(
"SELECT backup_json FROM infra_backups WHERE customer_id = ?",
customerID,
).Scan(&data)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return []byte(data), nil
}
// InfraBackupMeta holds summary info for the dashboard (avoids parsing full JSON).
type InfraBackupMeta struct {
UpdatedAt time.Time
StackCount int
DiskCount int
}
// GetInfraBackupMeta returns summary metadata for a customer's infra backup.
func (s *Store) GetInfraBackupMeta(customerID string) (*InfraBackupMeta, error) {
var backupJSON, updatedAt string
err := s.db.QueryRow(
"SELECT backup_json, updated_at FROM infra_backups WHERE customer_id = ?",
customerID,
).Scan(&backupJSON, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
meta := &InfraBackupMeta{
UpdatedAt: parseSQLiteTime(updatedAt),
}
// Parse just the fields we need
var parsed struct {
DeployedStacks []json.RawMessage `json:"deployed_stacks"`
DiskLayout struct {
Mounts []json.RawMessage `json:"mounts"`
} `json:"disk_layout"`
}
if err := json.Unmarshal([]byte(backupJSON), &parsed); err != nil {
s.logger.Printf("[WARN] Failed to parse infra backup metadata for %s: %v", customerID, err)
} else {
meta.StackCount = len(parsed.DeployedStacks)
meta.DiskCount = len(parsed.DiskLayout.Mounts)
}
return meta, nil
}
// Prune deletes reports older than the given number of days.
func (s *Store) Prune(maxDays int) (int64, error) {
cutoff := time.Now().AddDate(0, 0, -maxDays).Format("2006-01-02 15:04:05")
res, err := s.db.Exec("DELETE FROM reports WHERE received_at < ?", cutoff)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// Close closes the database connection.
func (s *Store) Close() error {
return s.db.Close()
}
// CustomerConfig holds a pre-provisioned customer configuration.
type CustomerConfig struct {
CustomerID string
CustomerName string
Domain string
Email string
RetrievalPassword string
APIKey string
ConfigJSON string // JSON object with customer-specific override fields
Status string // "active" or "blocked"
CreatedAt time.Time
UpdatedAt time.Time
}
// SaveCustomerConfig creates or updates a customer configuration.
func (s *Store) SaveCustomerConfig(cfg *CustomerConfig) error {
_, err := s.db.Exec(`
INSERT INTO customer_configs (customer_id, customer_name, domain, email,
retrieval_password, api_key, config_json, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))
ON CONFLICT(customer_id) DO UPDATE SET
customer_name = excluded.customer_name,
domain = excluded.domain,
email = excluded.email,
retrieval_password = excluded.retrieval_password,
api_key = excluded.api_key,
config_json = excluded.config_json,
updated_at = datetime('now')`,
cfg.CustomerID, cfg.CustomerName, cfg.Domain, cfg.Email,
cfg.RetrievalPassword, cfg.APIKey, cfg.ConfigJSON,
)
return err
}
// GetCustomerConfig returns a customer configuration by ID, or nil if not found.
func (s *Store) GetCustomerConfig(customerID string) (*CustomerConfig, error) {
var cfg CustomerConfig
var createdAt, updatedAt string
err := s.db.QueryRow(`
SELECT customer_id, customer_name, domain, email,
retrieval_password, api_key, config_json, status, created_at, updated_at
FROM customer_configs WHERE customer_id = ?`,
customerID,
).Scan(&cfg.CustomerID, &cfg.CustomerName, &cfg.Domain, &cfg.Email,
&cfg.RetrievalPassword, &cfg.APIKey, &cfg.ConfigJSON, &cfg.Status,
&createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
cfg.CreatedAt = parseSQLiteTime(createdAt)
cfg.UpdatedAt = parseSQLiteTime(updatedAt)
return &cfg, nil
}
// ListCustomerConfigs returns all customer configurations ordered by ID.
func (s *Store) ListCustomerConfigs() ([]CustomerConfig, error) {
rows, err := s.db.Query(`
SELECT customer_id, customer_name, domain, email,
retrieval_password, api_key, config_json, status, created_at, updated_at
FROM customer_configs ORDER BY customer_id`)
if err != nil {
return nil, err
}
defer rows.Close()
var configs []CustomerConfig
for rows.Next() {
var cfg CustomerConfig
var createdAt, updatedAt string
if err := rows.Scan(&cfg.CustomerID, &cfg.CustomerName, &cfg.Domain, &cfg.Email,
&cfg.RetrievalPassword, &cfg.APIKey, &cfg.ConfigJSON, &cfg.Status,
&createdAt, &updatedAt); err != nil {
return nil, err
}
cfg.CreatedAt = parseSQLiteTime(createdAt)
cfg.UpdatedAt = parseSQLiteTime(updatedAt)
configs = append(configs, cfg)
}
return configs, rows.Err()
}
// DeleteCustomerConfig deletes a customer configuration.
func (s *Store) DeleteCustomerConfig(customerID string) error {
_, err := s.db.Exec("DELETE FROM customer_configs WHERE customer_id = ?", customerID)
return err
}
// GetCustomerConfigByAPIKey looks up a customer config by its unique API key.
// Returns nil if no matching key is found.
func (s *Store) GetCustomerConfigByAPIKey(apiKey string) (*CustomerConfig, error) {
var cfg CustomerConfig
var createdAt, updatedAt string
err := s.db.QueryRow(`
SELECT customer_id, customer_name, domain, email,
retrieval_password, api_key, config_json, status, created_at, updated_at
FROM customer_configs WHERE api_key = ?`,
apiKey,
).Scan(&cfg.CustomerID, &cfg.CustomerName, &cfg.Domain, &cfg.Email,
&cfg.RetrievalPassword, &cfg.APIKey, &cfg.ConfigJSON, &cfg.Status,
&createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
cfg.CreatedAt = parseSQLiteTime(createdAt)
cfg.UpdatedAt = parseSQLiteTime(updatedAt)
return &cfg, nil
}
// SetCustomerConfigStatus sets the status (active/blocked) for a customer config.
func (s *Store) SetCustomerConfigStatus(customerID, status string) error {
_, err := s.db.Exec(`
UPDATE customer_configs SET status = ?, updated_at = datetime('now')
WHERE customer_id = ?`,
status, customerID,
)
return err
}
// IsCustomerBlocked returns true if the customer config has status "blocked".
func (s *Store) IsCustomerBlocked(customerID string) bool {
var status string
err := s.db.QueryRow(
"SELECT status FROM customer_configs WHERE customer_id = ?",
customerID,
).Scan(&status)
return err == nil && status == "blocked"
}
// UpdateRetrievalPassword updates the retrieval password for a customer config.
func (s *Store) UpdateRetrievalPassword(customerID, newPassword string) error {
_, err := s.db.Exec(`
UPDATE customer_configs SET retrieval_password = ?, updated_at = datetime('now')
WHERE customer_id = ?`,
newPassword, customerID,
)
return err
}
// --- Event system ---
// Event represents a single event record.
type Event struct {
ID int64
CustomerID string
EventType string
Severity string // "info", "warning", "error"
Message string
DetailsJSON string // raw JSON
Source string // "controller" or "hub"
CreatedAt time.Time
}
// SaveEvent inserts a new event and returns its ID.
func (s *Store) SaveEvent(customerID, eventType, severity, message, detailsJSON, source string) (int64, error) {
if detailsJSON == "" {
detailsJSON = "{}"
}
if source == "" {
source = "controller"
}
res, err := s.db.Exec(`
INSERT INTO events (customer_id, event_type, severity, message, details_json, source)
VALUES (?, ?, ?, ?, ?, ?)`,
customerID, eventType, severity, message, detailsJSON, source,
)
if err != nil {
return 0, err
}
return res.LastInsertId()
}
// GetRecentEvents returns the most recent events for a customer, newest first.
func (s *Store) GetRecentEvents(customerID string, limit int) ([]Event, error) {
rows, err := s.db.Query(`
SELECT id, customer_id, event_type, severity, message, details_json, source, created_at
FROM events
WHERE customer_id = ?
ORDER BY created_at DESC
LIMIT ?`, customerID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEvents(rows)
}
// GetEventsByType returns events of a specific type for a customer since a given time.
func (s *Store) GetEventsByType(customerID, eventType string, since time.Time) ([]Event, error) {
rows, err := s.db.Query(`
SELECT id, customer_id, event_type, severity, message, details_json, source, created_at
FROM events
WHERE customer_id = ? AND event_type = ? AND created_at >= ?
ORDER BY created_at DESC`,
customerID, eventType, since.UTC().Format("2006-01-02 15:04:05"))
if err != nil {
return nil, err
}
defer rows.Close()
return scanEvents(rows)
}
// GetLatestEventByType returns the most recent event of a given type for a customer.
func (s *Store) GetLatestEventByType(customerID, eventType string) (*Event, error) {
var e Event
var createdAt string
err := s.db.QueryRow(`
SELECT id, customer_id, event_type, severity, message, details_json, source, created_at
FROM events
WHERE customer_id = ? AND event_type = ?
ORDER BY created_at DESC
LIMIT 1`, customerID, eventType,
).Scan(&e.ID, &e.CustomerID, &e.EventType, &e.Severity, &e.Message, &e.DetailsJSON, &e.Source, &createdAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
e.CreatedAt = parseSQLiteTime(createdAt)
return &e, nil
}
// GetAllRecentEvents returns the most recent events across all customers.
func (s *Store) GetAllRecentEvents(limit int) ([]Event, error) {
rows, err := s.db.Query(`
SELECT id, customer_id, event_type, severity, message, details_json, source, created_at
FROM events
ORDER BY created_at DESC
LIMIT ?`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEvents(rows)
}
// CountEventsBySeverity returns a count of events per severity for a customer since a given time.
func (s *Store) CountEventsBySeverity(customerID string, since time.Time) (map[string]int, error) {
rows, err := s.db.Query(`
SELECT severity, COUNT(*) FROM events
WHERE customer_id = ? AND created_at >= ?
GROUP BY severity`,
customerID, since.UTC().Format("2006-01-02 15:04:05"))
if err != nil {
return nil, err
}
defer rows.Close()
counts := make(map[string]int)
for rows.Next() {
var sev string
var count int
if err := rows.Scan(&sev, &count); err != nil {
return nil, err
}
counts[sev] = count
}
return counts, rows.Err()
}
// PruneEvents deletes events older than the given number of days.
func (s *Store) PruneEvents(maxDays int) (int64, error) {
cutoff := time.Now().AddDate(0, 0, -maxDays).UTC().Format("2006-01-02 15:04:05")
res, err := s.db.Exec("DELETE FROM events WHERE created_at < ?", cutoff)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// GetActiveCustomerIDs returns customer IDs from customer_configs where status is 'active'.
func (s *Store) GetActiveCustomerIDs() ([]string, error) {
rows, err := s.db.Query("SELECT customer_id FROM customer_configs WHERE status = 'active'")
if err != nil {
return nil, err
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
return ids, rows.Err()
}
// Ping verifies the database is accessible.
func (s *Store) Ping() error {
var n int
return s.db.QueryRow("SELECT 1").Scan(&n)
}
func scanEvents(rows *sql.Rows) ([]Event, error) {
var events []Event
for rows.Next() {
var e Event
var createdAt string
if err := rows.Scan(&e.ID, &e.CustomerID, &e.EventType, &e.Severity, &e.Message, &e.DetailsJSON, &e.Source, &createdAt); err != nil {
return nil, err
}
e.CreatedAt = parseSQLiteTime(createdAt)
events = append(events, e)
}
return events, rows.Err()
}
// parseSQLiteTime tries multiple formats that modernc.org/sqlite may return.
func parseSQLiteTime(s string) time.Time {
formats := []string{
"2006-01-02 15:04:05", // SQLite datetime('now')
"2006-01-02T15:04:05Z", // RFC3339 without fractional
time.RFC3339, // 2006-01-02T15:04:05Z07:00
time.RFC3339Nano, // with fractional seconds
"2006-01-02 15:04:05+00:00", // with explicit UTC offset
"2006-01-02 15:04:05.999999999", // with fractional, no TZ
}
for _, f := range formats {
if t, err := time.Parse(f, s); err == nil {
return t
}
}
// Last resort: if string is non-empty, log it for debugging
if s != "" {
log.Printf("[WARN] Could not parse timestamp: %q", s)
}
return time.Time{} // zero time
}
func parseDiskSummary(reportJSON string) string {
var report struct {
Storage []struct {
Mount string `json:"mount"`
Percent float64 `json:"percent"`
} `json:"storage"`
}
// ignore parse errors — show "" on failure
json.Unmarshal([]byte(reportJSON), &report) //nolint:errcheck
var parts []string
for _, s := range report.Storage {
parts = append(parts, fmt.Sprintf("%.0f%%", s.Percent))
}
if len(parts) == 0 {
return ""
}
result := parts[0]
for _, p := range parts[1:] {
result += "/" + p
}
return result
}