Files
felhom.eu/hub/internal/store/store.go
T
admin e531516cfa Hub: add POST /api/v1/notify endpoint for customer notifications
- New notification relay endpoint: receives events from customer controllers,
  looks up customer email preferences, sends via Resend HTTP API
- New tables: customer_notifications (per-customer email + event prefs),
  notification_log (audit trail for all notification attempts)
- Hungarian email template with severity, event type, timestamp
- Config: notifications.resend_api_key + notifications.from_email
- Test events always pass event-type filter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 19:29:55 +01:00

405 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package store
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "modernc.org/sqlite"
)
// Store handles SQLite persistence for customer reports.
type Store struct {
db *sql.DB
logger *log.Logger
}
// CustomerSummary holds the latest status for a customer (for dashboard).
type CustomerSummary struct {
CustomerID string
CustomerName string
ControllerVersion string
ReceivedAt time.Time
HealthStatus string
CPUPercent float64
MemoryPercent float64
ContainerTotal int
ContainerRunning int
BackupLastSnapshot *time.Time
ReportJSON string
// Computed fields (not stored)
TimeSinceReport time.Duration
DiskSummary string
}
// New creates a new store and initializes the schema.
func New(dbPath string, logger *log.Logger) (*Store, error) {
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
if err != nil {
return nil, fmt.Errorf("opening database: %w", err)
}
s := &Store{db: db, logger: logger}
if err := s.migrate(); err != nil {
db.Close()
return nil, fmt.Errorf("migrating database: %w", err)
}
return s, nil
}
func (s *Store) migrate() error {
_, err := s.db.Exec(`
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
received_at DATETIME NOT NULL DEFAULT (datetime('now')),
report_json TEXT NOT NULL,
health_status TEXT,
cpu_percent REAL,
memory_percent REAL,
container_total INTEGER,
container_running INTEGER,
backup_last_snapshot DATETIME,
controller_version TEXT
);
CREATE INDEX IF NOT EXISTS idx_reports_customer
ON reports(customer_id, received_at DESC);
CREATE TABLE IF NOT EXISTS customer_notifications (
customer_id TEXT PRIMARY KEY,
email TEXT NOT NULL DEFAULT '',
enabled_events TEXT NOT NULL DEFAULT '[]',
created_at DATETIME DEFAULT (datetime('now')),
updated_at DATETIME DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS notification_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_id TEXT NOT NULL,
event_type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
error_message TEXT,
created_at DATETIME DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_notification_log_customer
ON notification_log(customer_id, created_at DESC);
`)
return err
}
// NotificationPrefs holds per-customer notification preferences.
type NotificationPrefs struct {
CustomerID string
Email string
EnabledEvents []string
}
// GetNotificationPrefs returns notification preferences for a customer.
func (s *Store) GetNotificationPrefs(customerID string) (*NotificationPrefs, error) {
var email, eventsJSON string
err := s.db.QueryRow(
"SELECT email, enabled_events FROM customer_notifications WHERE customer_id = ?",
customerID,
).Scan(&email, &eventsJSON)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
var events []string
json.Unmarshal([]byte(eventsJSON), &events)
return &NotificationPrefs{
CustomerID: customerID,
Email: email,
EnabledEvents: events,
}, nil
}
// SaveNotificationPrefs creates or updates notification preferences for a customer.
func (s *Store) SaveNotificationPrefs(customerID, email string, enabledEvents []string) error {
eventsJSON, _ := json.Marshal(enabledEvents)
_, err := s.db.Exec(`
INSERT INTO customer_notifications (customer_id, email, enabled_events, updated_at)
VALUES (?, ?, ?, datetime('now'))
ON CONFLICT(customer_id) DO UPDATE SET
email = excluded.email,
enabled_events = excluded.enabled_events,
updated_at = datetime('now')`,
customerID, email, string(eventsJSON),
)
return err
}
// LogNotification records a notification attempt.
func (s *Store) LogNotification(customerID, eventType, severity, message, status, errorMsg string) error {
_, err := s.db.Exec(`
INSERT INTO notification_log (customer_id, event_type, severity, message, status, error_message)
VALUES (?, ?, ?, ?, ?, ?)`,
customerID, eventType, severity, message, status, errorMsg,
)
return err
}
// SaveReport stores a new report. The reportJSON should be the raw JSON payload.
func (s *Store) SaveReport(customerID string, reportJSON []byte) error {
// Parse denormalized fields from the JSON
var parsed struct {
ControllerVersion string `json:"controller_version"`
System struct {
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
} `json:"system"`
Containers struct {
Total int `json:"total"`
Running int `json:"running"`
} `json:"containers"`
Backup struct {
LastSnapshot *time.Time `json:"last_snapshot"`
} `json:"backup"`
Health struct {
Status string `json:"status"`
} `json:"health"`
}
json.Unmarshal(reportJSON, &parsed)
var backupSnapshot *string
if parsed.Backup.LastSnapshot != nil {
t := parsed.Backup.LastSnapshot.Format(time.RFC3339)
backupSnapshot = &t
}
_, err := s.db.Exec(`
INSERT INTO reports (customer_id, report_json, health_status, cpu_percent,
memory_percent, container_total, container_running,
backup_last_snapshot, controller_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
customerID, string(reportJSON),
parsed.Health.Status, parsed.System.CPUPercent,
parsed.System.MemoryPercent, parsed.Containers.Total,
parsed.Containers.Running, backupSnapshot,
parsed.ControllerVersion,
)
return err
}
// GetCustomers returns the latest report summary for each customer.
func (s *Store) GetCustomers() ([]CustomerSummary, error) {
rows, err := s.db.Query(`
SELECT r.customer_id, r.received_at, r.report_json,
r.health_status, r.cpu_percent, r.memory_percent,
r.container_total, r.container_running,
r.backup_last_snapshot, r.controller_version
FROM reports r
INNER JOIN (
SELECT customer_id, MAX(received_at) as max_time
FROM reports
GROUP BY customer_id
) latest ON r.customer_id = latest.customer_id
AND r.received_at = latest.max_time
ORDER BY r.customer_id`)
if err != nil {
return nil, err
}
defer rows.Close()
var customers []CustomerSummary
for rows.Next() {
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion); err != nil {
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
// Parse customer_name from JSON
var report struct {
CustomerName string `json:"customer_name"`
}
json.Unmarshal([]byte(c.ReportJSON), &report)
c.CustomerName = report.CustomerName
// Parse disk summary
c.DiskSummary = parseDiskSummary(c.ReportJSON)
customers = append(customers, c)
}
return customers, rows.Err()
}
// GetCustomer returns the latest report for a specific customer.
func (s *Store) GetCustomer(customerID string) (*CustomerSummary, error) {
row := s.db.QueryRow(`
SELECT customer_id, received_at, report_json,
health_status, cpu_percent, memory_percent,
container_total, container_running,
backup_last_snapshot, controller_version
FROM reports
WHERE customer_id = ?
ORDER BY received_at DESC
LIMIT 1`, customerID)
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
if err := row.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
var report struct {
CustomerName string `json:"customer_name"`
}
json.Unmarshal([]byte(c.ReportJSON), &report)
c.CustomerName = report.CustomerName
c.DiskSummary = parseDiskSummary(c.ReportJSON)
return &c, nil
}
// GetCustomerHistory returns report history for a customer.
func (s *Store) GetCustomerHistory(customerID string, since time.Duration) ([]CustomerSummary, error) {
cutoff := time.Now().Add(-since).Format("2006-01-02 15:04:05")
rows, err := s.db.Query(`
SELECT customer_id, received_at, report_json,
health_status, cpu_percent, memory_percent,
container_total, container_running,
backup_last_snapshot, controller_version
FROM reports
WHERE customer_id = ? AND received_at >= ?
ORDER BY received_at DESC`, customerID, cutoff)
if err != nil {
return nil, err
}
defer rows.Close()
var history []CustomerSummary
for rows.Next() {
var c CustomerSummary
var receivedAt string
var backupSnapshot sql.NullString
if err := rows.Scan(&c.CustomerID, &receivedAt, &c.ReportJSON,
&c.HealthStatus, &c.CPUPercent, &c.MemoryPercent,
&c.ContainerTotal, &c.ContainerRunning,
&backupSnapshot, &c.ControllerVersion); err != nil {
return nil, err
}
c.ReceivedAt = parseSQLiteTime(receivedAt)
c.TimeSinceReport = time.Since(c.ReceivedAt)
if backupSnapshot.Valid {
t, err := time.Parse(time.RFC3339, backupSnapshot.String)
if err == nil {
c.BackupLastSnapshot = &t
}
}
history = append(history, c)
}
return history, rows.Err()
}
// Prune deletes reports older than the given number of days.
func (s *Store) Prune(maxDays int) (int64, error) {
cutoff := time.Now().AddDate(0, 0, -maxDays).Format("2006-01-02 15:04:05")
res, err := s.db.Exec("DELETE FROM reports WHERE received_at < ?", cutoff)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
// Close closes the database connection.
func (s *Store) Close() error {
return s.db.Close()
}
// parseSQLiteTime tries multiple formats that modernc.org/sqlite may return.
func parseSQLiteTime(s string) time.Time {
formats := []string{
"2006-01-02 15:04:05", // SQLite datetime('now')
"2006-01-02T15:04:05Z", // RFC3339 without fractional
time.RFC3339, // 2006-01-02T15:04:05Z07:00
time.RFC3339Nano, // with fractional seconds
"2006-01-02 15:04:05+00:00", // with explicit UTC offset
"2006-01-02 15:04:05.999999999", // with fractional, no TZ
}
for _, f := range formats {
if t, err := time.Parse(f, s); err == nil {
return t
}
}
// Last resort: if string is non-empty, log it for debugging
if s != "" {
log.Printf("[WARN] Could not parse timestamp: %q", s)
}
return time.Time{} // zero time
}
func parseDiskSummary(reportJSON string) string {
var report struct {
Storage []struct {
Mount string `json:"mount"`
Percent float64 `json:"percent"`
} `json:"storage"`
}
json.Unmarshal([]byte(reportJSON), &report)
var parts []string
for _, s := range report.Storage {
parts = append(parts, fmt.Sprintf("%.0f%%", s.Percent))
}
if len(parts) == 0 {
return ""
}
result := parts[0]
for _, p := range parts[1:] {
result += "/" + p
}
return result
}