Files
deploy-felhom-compose/controller/cmd/controller/main.go
T

521 lines
16 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"gitea.dooplex.hu/admin/felhom-controller/internal/api"
"gitea.dooplex.hu/admin/felhom-controller/internal/backup"
"gitea.dooplex.hu/admin/felhom-controller/internal/config"
"gitea.dooplex.hu/admin/felhom-controller/internal/metrics"
"gitea.dooplex.hu/admin/felhom-controller/internal/monitor"
"gitea.dooplex.hu/admin/felhom-controller/internal/notify"
"gitea.dooplex.hu/admin/felhom-controller/internal/report"
"gitea.dooplex.hu/admin/felhom-controller/internal/scheduler"
"gitea.dooplex.hu/admin/felhom-controller/internal/settings"
"gitea.dooplex.hu/admin/felhom-controller/internal/stacks"
catalogsync "gitea.dooplex.hu/admin/felhom-controller/internal/sync"
"gitea.dooplex.hu/admin/felhom-controller/internal/system"
"gitea.dooplex.hu/admin/felhom-controller/internal/web"
)
var (
// Set at build time via ldflags
Version = "dev"
BuildTime = "unknown"
GitCommit = "unknown"
)
func main() {
configPath := flag.String("config", "/opt/docker/felhom-controller/controller.yaml", "Path to configuration file")
showVersion := flag.Bool("version", false, "Show version and exit")
flag.Parse()
if *showVersion {
fmt.Printf("felhom-controller %s (built %s, commit %s)\n", Version, BuildTime, GitCommit)
os.Exit(0)
}
// --- Load configuration ---
cfg, err := config.Load(*configPath)
if err != nil {
log.Fatalf("[FATAL] Failed to load config from %s: %v", *configPath, err)
}
logger := setupLogger(cfg)
logger.Printf("[INFO] felhom-controller %s starting (customer: %s, domain: %s)",
Version, cfg.Customer.ID, cfg.Customer.Domain)
// --- Load settings ---
settingsPath := cfg.Paths.DataDir + "/settings.json"
sett, err := settings.Load(settingsPath, logger)
if err != nil {
logger.Fatalf("[FATAL] Failed to load settings from %s: %v", settingsPath, err)
}
// --- Auto-discover storage paths from deployed apps ---
discoveredPaths := discoverHDDPaths(cfg.Paths.StacksDir, logger)
sett.AutoDiscoverStoragePaths(discoveredPaths, cfg.Paths.HDDPath, logger)
// --- Initialize stack manager ---
stackMgr, err := stacks.NewManager(cfg, logger)
if err != nil {
logger.Fatalf("[FATAL] Failed to initialize stack manager: %v", err)
}
// Initial stack scan
if err := stackMgr.ScanStacks(); err != nil {
logger.Printf("[WARN] Initial stack scan failed: %v", err)
}
// --- Initialize catalog syncer ---
syncer := catalogsync.New(cfg, logger, stackMgr.ScanStacks)
syncer.Start()
defer syncer.Stop()
// --- Graceful shutdown context ---
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// --- Start CPU collector ---
cpuCollector := system.NewCPUCollector(5 * time.Second)
cpuCollector.Start(ctx)
defer cpuCollector.Stop()
// --- Initialize metrics store + collector ---
metricsDBPath := "/opt/docker/felhom-controller/data/metrics.db"
metricsStore, err := metrics.NewMetricsStore(metricsDBPath, logger)
if err != nil {
logger.Printf("[WARN] Failed to initialize metrics store: %v — monitoring disabled", err)
} else {
logger.Printf("[INFO] Metrics store opened at %s", metricsDBPath)
}
if metricsStore != nil {
defer metricsStore.Close()
metricsHDDPath := cfg.Paths.HDDPath
if p := sett.GetDefaultStoragePath(); p != "" {
metricsHDDPath = p
}
metricsCollector := metrics.NewMetricsCollector(metricsStore, cpuCollector, metricsHDDPath, logger)
metricsCollector.Start(ctx)
defer metricsCollector.Stop()
logger.Println("[INFO] Metrics collector started (60s interval)")
}
// --- Initialize health pinger ---
pinger := monitor.NewPinger(&cfg.Monitoring, logger)
// --- Initialize backup manager ---
var backupMgr *backup.Manager
stackProv := &stackAdapter{
mgr: stackMgr,
getStoragePaths: func() []settings.StoragePath { return sett.GetStoragePaths() },
}
if cfg.Backup.Enabled {
backupMgr = backup.NewManager(cfg, pinger, sett, logger)
backupMgr.SetStackProvider(stackProv)
backupMgr.AfterBackup = func() {
nextDBDump := scheduler.NextDailyRun(cfg.Backup.DBDumpSchedule)
nextBackup := scheduler.NextDailyRun(cfg.Backup.ResticSchedule)
backupMgr.RefreshCache(nextDBDump, nextBackup)
}
go backupMgr.LoadSnapshotHistory()
}
// --- Initialize cross-drive backup runner ---
crossDriveRunner := backup.NewCrossDriveRunner(sett, stackProv, cfg.Paths.SystemDataPath, cfg.Paths.StacksDir, logger)
// Wire cross-drive → backup manager for pre-backup DB dumps
if backupMgr != nil {
crossDriveRunner.SetDBDumper(backupMgr)
}
// --- Initialize alert manager ---
alertMgr := web.NewAlertManager(logger)
// --- Initialize notifier ---
notifier := notify.New(cfg.Hub.URL, cfg.Hub.APIKey, cfg.Customer.ID, sett, logger)
// --- Initialize scheduler ---
sched := scheduler.New(logger)
// Existing periodic tasks (migrated from ad-hoc goroutines)
sched.Every("status-refresh", 30*time.Second, func(ctx context.Context) error {
return stackMgr.RefreshStatus()
})
sched.Every("stack-scan", 2*time.Minute, func(ctx context.Context) error {
return stackMgr.ScanStacks()
})
// Heartbeat — lightweight "I'm alive" signal
sched.Every("heartbeat", 5*time.Minute, func(ctx context.Context) error {
pinger.Ping(cfg.Monitoring.PingUUIDs.Heartbeat, "")
return nil
})
// System health ping
healthInterval, err := time.ParseDuration(cfg.Monitoring.SystemHealthInterval)
if err != nil {
healthInterval = 5 * time.Minute
}
sched.Every("system-health", healthInterval, func(ctx context.Context) error {
healthReport := monitor.RunHealthCheck(cfg, cpuCollector, sett.GetStoragePaths())
body := healthReport.FormatMessage()
healthUUID := cfg.Monitoring.PingUUIDs.SystemHealth
if healthReport.Status == "fail" {
pinger.Fail(healthUUID, body)
} else {
pinger.Ping(healthUUID, body)
}
// Refresh dashboard alerts from health report
alertMgr.Refresh(healthReport, cfg, backupMgr)
// Notify on health status changes
notifier.NotifyHealthChange(healthReport.Status, healthReport.Issues, healthReport.Warnings)
return nil
})
// Backup daily jobs
if cfg.Backup.Enabled && backupMgr != nil {
sched.Daily("db-dump", cfg.Backup.DBDumpSchedule, func(ctx context.Context) error {
err := backupMgr.RunDBDumps(ctx)
if err != nil {
notifier.NotifyDBDumpFailed("Adatbázis mentés sikertelen", err.Error())
}
return err
})
sched.Daily("backup", cfg.Backup.ResticSchedule, func(ctx context.Context) error {
err := backupMgr.RunBackup(ctx)
if err != nil {
notifier.NotifyBackupFailed("Biztonsági mentés sikertelen", err.Error())
}
// Phase 3: Chain cross-drive backups immediately after restic (regardless of restic success)
// Daily jobs run every night; weekly jobs only on Sunday
if crossDriveRunner != nil {
if cdErr := crossDriveRunner.RunAllScheduled(ctx, "daily"); cdErr != nil {
logger.Printf("[WARN] Cross-drive daily backup had errors: %v", cdErr)
}
if time.Now().Weekday() == time.Sunday {
if cdErr := crossDriveRunner.RunAllScheduled(ctx, "weekly"); cdErr != nil {
logger.Printf("[WARN] Cross-drive weekly backup had errors: %v", cdErr)
}
}
}
return err
})
// Weekly integrity check — Sunday 04:00
sched.Daily("backup-integrity", "04:00", func(ctx context.Context) error {
if time.Now().Weekday() != time.Sunday {
return nil
}
err := backupMgr.RunIntegrityCheck(ctx)
if err != nil {
notifier.NotifyIntegrityFailed("Mentés integritás ellenőrzés sikertelen", err.Error())
}
return err
})
// Cache refresh: every 5 minutes
sched.Every("backup-cache", 5*time.Minute, func(ctx context.Context) error {
nextDBDump := scheduler.NextDailyRun(cfg.Backup.DBDumpSchedule)
nextBackup := scheduler.NextDailyRun(cfg.Backup.ResticSchedule)
backupMgr.RefreshCache(nextDBDump, nextBackup)
return nil
})
}
// Metrics prune — daily at 04:00
if metricsStore != nil {
sched.Daily("metrics-prune", "04:00", func(ctx context.Context) error {
deleted, err := metricsStore.Prune(30 * 24 * time.Hour)
if err != nil {
return err
}
logger.Printf("[INFO] Pruned %d old metric rows", deleted)
return nil
})
}
// --- Central hub reporting ---
var hubPusher *report.Pusher
if cfg.Hub.URL != "" && cfg.Hub.APIKey != "" {
hubPusher = report.NewPusher(&cfg.Hub, logger)
if cfg.Hub.Enabled {
pushInterval, err := time.ParseDuration(cfg.Hub.PushInterval)
if err != nil {
pushInterval = 15 * time.Minute
}
sched.Every("hub-report", pushInterval, func(ctx context.Context) error {
r := report.BuildReport(cfg, stackMgr, backupMgr, cpuCollector, metricsStore, Version, sett.GetStoragePaths())
return hubPusher.Push(r)
})
logger.Printf("[INFO] Hub reporting enabled (every %s to %s)", pushInterval, cfg.Hub.URL)
} else {
logger.Printf("[INFO] Hub reporting disabled — will send disabled notification to %s", cfg.Hub.URL)
}
}
sched.Start(ctx)
defer sched.Stop()
// Fire startup pings + hub report immediately (don't wait for first scheduler tick)
go func() {
time.Sleep(5 * time.Second) // Let all subsystems fully initialize
// Heartbeat ping
pinger.Ping(cfg.Monitoring.PingUUIDs.Heartbeat, "startup")
logger.Println("[INFO] Startup heartbeat ping sent")
// System health ping
healthReport := monitor.RunHealthCheck(cfg, cpuCollector, sett.GetStoragePaths())
body := healthReport.FormatMessage()
healthUUID := cfg.Monitoring.PingUUIDs.SystemHealth
if healthReport.Status == "fail" {
pinger.Fail(healthUUID, body)
} else {
pinger.Ping(healthUUID, body)
}
logger.Printf("[INFO] Startup health ping sent (status: %s)", healthReport.Status)
// Hub report
if hubPusher != nil {
if cfg.Hub.Enabled {
r := report.BuildReport(cfg, stackMgr, backupMgr, cpuCollector, metricsStore, Version, sett.GetStoragePaths())
var pushErr error
for attempt := 1; attempt <= 3; attempt++ {
pushErr = hubPusher.Push(r)
if pushErr == nil {
logger.Println("[INFO] Startup hub report sent")
break
}
logger.Printf("[WARN] Startup hub report attempt %d/3 failed: %v", attempt, pushErr)
if attempt < 3 {
time.Sleep(15 * time.Second)
}
}
if pushErr != nil {
logger.Printf("[WARN] Startup hub report failed after 3 attempts — next scheduled push in %s", cfg.Hub.PushInterval)
}
} else {
// Send a minimal "disabled" notification so hub knows reporting is intentionally off
r := &report.Report{
Version: 1,
CustomerID: cfg.Customer.ID,
CustomerName: cfg.Customer.Name,
ControllerVersion: Version,
Timestamp: time.Now().UTC(),
ReportingDisabled: true,
Health: report.HealthReport{Status: "disabled", Issues: []string{}, Warnings: []string{}},
Stacks: report.StacksReport{Deployed: []string{}, Available: []string{}},
Containers: report.ContainerReport{List: []report.ContainerDetailReport{}},
}
hubPusher.PushOnce(r)
}
}
}()
// Initial backup cache population (don't block startup)
if cfg.Backup.Enabled && backupMgr != nil {
go func() {
nextDBDump := scheduler.NextDailyRun(cfg.Backup.DBDumpSchedule)
nextBackup := scheduler.NextDailyRun(cfg.Backup.ResticSchedule)
backupMgr.RefreshCache(nextDBDump, nextBackup)
}()
}
// Sync notification preferences to hub on startup (handles hub DB rebuild recovery)
if notifier.IsEnabled() {
go func() {
prefs := sett.GetNotificationPrefs()
if prefs.Email != "" {
if err := notifier.SyncPreferences(prefs.Email, prefs.EnabledEvents); err != nil {
logger.Printf("[WARN] Failed to sync notification preferences on startup: %v", err)
}
}
}()
}
// Initial alert refresh (so alerts appear immediately, not after first 5min health check)
go func() {
report := monitor.RunHealthCheck(cfg, cpuCollector, sett.GetStoragePaths())
alertMgr.Refresh(report, cfg, backupMgr)
}()
// --- Initialize API router ---
apiRouter := api.NewRouter(cfg, sett, stackMgr, syncer, cpuCollector, backupMgr, crossDriveRunner, metricsStore, logger)
// --- Initialize web server ---
webServer := web.NewServer(cfg, stackMgr, cpuCollector, backupMgr, crossDriveRunner, sched, sett, alertMgr, notifier, logger, Version)
// --- Build HTTP mux ---
mux := http.NewServeMux()
// API routes (no auth for health endpoint, auth for everything else)
mux.HandleFunc("/api/health", apiRouter.HealthHandler)
// Storage API routes handled by web server (longer prefix takes precedence over /api/)
mux.Handle("/api/storage/", webServer.RequireAuth(http.HandlerFunc(webServer.ServeStorageAPI)))
mux.Handle("/api/", webServer.RequireAuth(http.HandlerFunc(apiRouter.ServeHTTP)))
// Web UI routes (auth required)
mux.Handle("/", webServer.RequireAuth(http.HandlerFunc(webServer.ServeHTTP)))
// --- Start HTTP server ---
server := &http.Server{
Addr: cfg.Web.Listen,
Handler: mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 60 * time.Second,
IdleTimeout: 120 * time.Second,
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
logger.Printf("[INFO] Received signal %v, shutting down...", sig)
cancel()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Printf("[ERROR] HTTP server shutdown error: %v", err)
}
}()
logger.Printf("[INFO] Web UI listening on %s", cfg.Web.Listen)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatalf("[FATAL] HTTP server error: %v", err)
}
logger.Println("[INFO] felhom-controller stopped")
}
func setupLogger(cfg *config.Config) *log.Logger {
// For now, log to stdout. File logging will be added later.
logger := log.New(os.Stdout, "", log.LstdFlags)
if cfg.Logging.Level == "debug" {
logger.SetFlags(log.LstdFlags | log.Lshortfile)
}
return logger
}
// stackAdapter implements backup.StackDataProvider using stacks.Manager.
type stackAdapter struct {
mgr *stacks.Manager
getStoragePaths func() []settings.StoragePath
}
func (a *stackAdapter) GetStackComposePath(name string) (string, bool) {
s, ok := a.mgr.GetStack(name)
if !ok {
return "", false
}
return s.ComposePath, true
}
func (a *stackAdapter) ListDeployedStacks() []backup.StackSummary {
var result []backup.StackSummary
for _, s := range a.mgr.GetStacks() {
if !s.Deployed {
continue
}
result = append(result, backup.StackSummary{
Name: s.Name,
DisplayName: s.Meta.DisplayName,
ComposePath: s.ComposePath,
NeedsHDD: s.Meta.Resources.NeedsHDD,
})
}
return result
}
func (a *stackAdapter) StopStack(name string) error {
return a.mgr.StopStack(name)
}
func (a *stackAdapter) StartStack(name string) error {
return a.mgr.StartStack(name)
}
func (a *stackAdapter) GetStackHDDMounts(name string) []string {
s, ok := a.mgr.GetStack(name)
if !ok {
return nil
}
// Priority 1: Read the app's own HDD_PATH from its app.yaml
stackDir := filepath.Dir(s.ComposePath)
appCfg := stacks.LoadAppConfig(stackDir)
if appCfg != nil && appCfg.Env["HDD_PATH"] != "" {
return stacks.ParseComposeHDDMounts(s.ComposePath, appCfg.Env["HDD_PATH"])
}
// Priority 2: Try all registered storage paths (fallback)
var allMounts []string
seen := make(map[string]bool)
for _, sp := range a.getStoragePaths() {
mounts := stacks.ParseComposeHDDMounts(s.ComposePath, sp.Path)
for _, m := range mounts {
if !seen[m] {
seen[m] = true
allMounts = append(allMounts, m)
}
}
}
return allMounts
}
func (a *stackAdapter) GetStackHDDPath(name string) string {
s, ok := a.mgr.GetStack(name)
if !ok {
return ""
}
stackDir := filepath.Dir(s.ComposePath)
appCfg := stacks.LoadAppConfig(stackDir)
if appCfg != nil && appCfg.Env["HDD_PATH"] != "" {
return filepath.Clean(appCfg.Env["HDD_PATH"])
}
return ""
}
// discoverHDDPaths scans deployed apps' app.yaml for HDD_PATH env values.
func discoverHDDPaths(stacksDir string, logger *log.Logger) []string {
entries, err := os.ReadDir(stacksDir)
if err != nil {
logger.Printf("[WARN] Cannot read stacks dir for HDD path discovery: %v", err)
return nil
}
seen := make(map[string]bool)
var paths []string
for _, e := range entries {
if !e.IsDir() {
continue
}
appCfg := stacks.LoadAppConfig(filepath.Join(stacksDir, e.Name()))
if appCfg == nil || !appCfg.Deployed {
continue
}
if hddPath, ok := appCfg.Env["HDD_PATH"]; ok && hddPath != "" {
cleaned := filepath.Clean(hddPath)
if !seen[cleaned] {
seen[cleaned] = true
paths = append(paths, cleaned)
}
}
}
return paths
}