Files
deploy-felhom-compose/controller/internal/sync/sync.go
T
admin be7803c0ac v0.24.0 — Pre-testing observability: debug logging, diagnostic dump, startup self-test
- Add [DEBUG] logging across all modules (backup, storage, sync, selfupdate,
  monitor, notify, report, assets, setup) gated behind logging.level: "debug"
- Add /api/debug/dump endpoint returning full controller state JSON (debug only)
- Add startup self-test validating 9 subsystems (Docker, dirs, storage, hub,
  restic repos, metrics DB) with pass/warn/fail summary
- New packages: internal/selftest, internal/util
- Constructor/signature changes: debug bool params, logger params on
  RunHealthCheck and BuildReport, smart watchdog probe logging

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 18:32:26 +01:00

410 lines
11 KiB
Go

package sync
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-controller/internal/config"
)
// Syncer handles periodic git sync of the app catalog to the local stacks directory.
type Syncer struct {
cfg *config.Config
logger *log.Logger
cacheDir string // local git clone
rescanFn func() error
postSyncHook func(updated []string) // called after sync with names of updated stacks
mu sync.Mutex
lastSync time.Time
lastErr error
syncing bool
stopCh chan struct{}
}
// SyncStatus holds information about the last sync operation.
type SyncStatus struct {
LastSync time.Time `json:"last_sync"`
LastStatus string `json:"last_status"` // "ok", "error", "disabled", "never"
LastError string `json:"last_error,omitempty"`
Syncing bool `json:"syncing"`
}
// SyncResult holds the result of a single sync operation.
type SyncResult struct {
OK bool `json:"ok"`
NewApps []string `json:"new_apps,omitempty"`
Updated []string `json:"updated,omitempty"`
Message string `json:"message"`
}
// New creates a new Syncer. rescanFn is called after a successful sync to trigger ScanStacks().
// postSyncHook is called with names of updated stacks (may be nil).
func New(cfg *config.Config, logger *log.Logger, rescanFn func() error, postSyncHook func([]string)) *Syncer {
cacheDir := filepath.Join(cfg.Paths.DataDir, "catalog-cache")
return &Syncer{
cfg: cfg,
logger: logger,
cacheDir: cacheDir,
rescanFn: rescanFn,
postSyncHook: postSyncHook,
stopCh: make(chan struct{}),
}
}
// isDebug returns true if the logging level is set to "debug".
func (s *Syncer) isDebug() bool { return s.cfg.Logging.Level == "debug" }
// maskRepoURL masks credentials in a git URL for safe logging.
// e.g., "https://user:token@host/path" → "https://user:***@host/path"
var reURLCreds = regexp.MustCompile(`(https?://)([^:]+):([^@]+)@`)
func maskRepoURL(url string) string {
return reURLCreds.ReplaceAllString(url, "${1}${2}:***@")
}
// Start begins the periodic sync loop. Call Stop() to terminate.
func (s *Syncer) Start() {
if s.cfg.Git.RepoURL == "" {
s.logger.Println("[SYNC] Git repo URL is empty — sync disabled (manual mode)")
return
}
interval, err := time.ParseDuration(s.cfg.Git.SyncInterval)
if err != nil {
s.logger.Printf("[SYNC] Invalid sync_interval %q, defaulting to 15m", s.cfg.Git.SyncInterval)
interval = 15 * time.Minute
}
s.logger.Printf("[SYNC] Starting catalog sync (repo: %s, interval: %s)", s.cfg.Git.RepoURL, interval)
// Initial sync on startup
go func() {
result := s.doSync()
s.logger.Printf("[SYNC] Initial sync: %s", result.Message)
}()
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
s.logger.Println("[SYNC] Sync loop stopped")
return
case <-ticker.C:
result := s.doSync()
s.logger.Printf("[SYNC] Periodic sync: %s", result.Message)
}
}
}()
}
// Stop terminates the periodic sync loop.
func (s *Syncer) Stop() {
close(s.stopCh)
}
// TriggerSync performs an immediate sync. Returns the result.
// Debounce: refuses if last sync was less than 30 seconds ago.
func (s *Syncer) TriggerSync() SyncResult {
if s.cfg.Git.RepoURL == "" {
return SyncResult{OK: false, Message: "Git sync is disabled (no repo_url configured)"}
}
s.mu.Lock()
if s.syncing {
s.mu.Unlock()
return SyncResult{OK: false, Message: "Szinkronizálás már folyamatban"}
}
if time.Since(s.lastSync) < 30*time.Second {
s.mu.Unlock()
return SyncResult{OK: false, Message: "Túl gyakori szinkronizálás — várj 30 másodpercet"}
}
s.mu.Unlock()
return s.doSync()
}
// Status returns the current sync status.
func (s *Syncer) Status() SyncStatus {
s.mu.Lock()
defer s.mu.Unlock()
status := SyncStatus{
LastSync: s.lastSync,
Syncing: s.syncing,
}
if s.cfg.Git.RepoURL == "" {
status.LastStatus = "disabled"
} else if s.lastSync.IsZero() {
status.LastStatus = "never"
} else if s.lastErr != nil {
status.LastStatus = "error"
status.LastError = s.lastErr.Error()
} else {
status.LastStatus = "ok"
}
return status
}
// doSync performs the actual git clone/pull + file copy.
func (s *Syncer) doSync() SyncResult {
s.mu.Lock()
s.syncing = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.syncing = false
s.mu.Unlock()
}()
result := SyncResult{OK: true}
// Step 1: Clone or pull
if err := s.gitCloneOrPull(); err != nil {
s.mu.Lock()
s.lastErr = err
s.lastSync = time.Now()
s.mu.Unlock()
return SyncResult{OK: false, Message: fmt.Sprintf("Git hiba: %v", err)}
}
// Step 2: Copy templates to stacks dir
newApps, updated, err := s.copyTemplates()
if err != nil {
s.mu.Lock()
s.lastErr = err
s.lastSync = time.Now()
s.mu.Unlock()
return SyncResult{OK: false, Message: fmt.Sprintf("Másolási hiba: %v", err)}
}
result.NewApps = newApps
result.Updated = updated
// Step 3: Trigger rescan if anything changed
if len(newApps) > 0 || len(updated) > 0 {
if err := s.rescanFn(); err != nil {
s.logger.Printf("[SYNC] Rescan after sync failed: %v", err)
}
}
// Step 4: Inject missing deploy fields for updated stacks
if len(updated) > 0 && s.postSyncHook != nil {
if s.isDebug() {
s.logger.Printf("[DEBUG] [SYNC] Post-sync hook: triggering missing field injection for %d stack(s): %v", len(updated), updated)
}
s.postSyncHook(updated)
}
// Build message
parts := []string{}
if len(newApps) > 0 {
parts = append(parts, fmt.Sprintf("új: %s", strings.Join(newApps, ", ")))
}
if len(updated) > 0 {
parts = append(parts, fmt.Sprintf("frissítve: %s", strings.Join(updated, ", ")))
}
if len(parts) == 0 {
result.Message = "Sablonok naprakészek — nincs változás"
} else {
result.Message = "Sablonok frissítve — " + strings.Join(parts, "; ")
}
s.mu.Lock()
s.lastErr = nil
s.lastSync = time.Now()
s.mu.Unlock()
return result
}
// gitCloneOrPull clones the repo if not yet cloned, or pulls latest changes.
func (s *Syncer) gitCloneOrPull() error {
if err := os.MkdirAll(filepath.Dir(s.cacheDir), 0755); err != nil {
return fmt.Errorf("creating cache parent dir: %w", err)
}
gitDir := filepath.Join(s.cacheDir, ".git")
if _, err := os.Stat(gitDir); os.IsNotExist(err) {
// Clone
s.logger.Printf("[SYNC] Cloning %s → %s", s.cfg.Git.RepoURL, s.cacheDir)
args := []string{"clone", "--depth", "1", "--branch", s.cfg.Git.Branch}
repoURL := s.buildRepoURL()
args = append(args, repoURL, s.cacheDir)
if s.isDebug() {
s.logger.Printf("[DEBUG] [SYNC] git clone URL: %s, branch: %s, cacheDir: %s", maskRepoURL(repoURL), s.cfg.Git.Branch, s.cacheDir)
}
return s.runGit(args...)
}
// Pull
s.logger.Printf("[SYNC] Pulling latest from %s (branch: %s)", s.cfg.Git.RepoURL, s.cfg.Git.Branch)
if s.isDebug() {
s.logger.Printf("[DEBUG] [SYNC] git fetch --depth 1 origin %s in %s", s.cfg.Git.Branch, s.cacheDir)
}
if err := s.runGitInDir(s.cacheDir, "fetch", "--depth", "1", "origin", s.cfg.Git.Branch); err != nil {
return fmt.Errorf("git fetch: %w", err)
}
if err := s.runGitInDir(s.cacheDir, "reset", "--hard", "origin/"+s.cfg.Git.Branch); err != nil {
return fmt.Errorf("git reset: %w", err)
}
return nil
}
// buildRepoURL constructs the repo URL with optional auth credentials.
func (s *Syncer) buildRepoURL() string {
url := s.cfg.Git.RepoURL
if s.cfg.Git.Username != "" && s.cfg.Git.Token != "" {
// Inject credentials into HTTPS URL: https://user:token@host/path
url = strings.Replace(url, "https://", fmt.Sprintf("https://%s:%s@", s.cfg.Git.Username, s.cfg.Git.Token), 1)
}
return url
}
// copyTemplates copies docker-compose.yml and .felhom.yml from the catalog cache
// to the stacks directory. Never overwrites app.yaml or .env files.
func (s *Syncer) copyTemplates() (newApps []string, updated []string, err error) {
templatesDir := filepath.Join(s.cacheDir, "templates")
entries, err := os.ReadDir(templatesDir)
if err != nil {
return nil, nil, fmt.Errorf("reading templates dir: %w", err)
}
for _, entry := range entries {
if !entry.IsDir() {
continue
}
appName := entry.Name()
srcDir := filepath.Join(templatesDir, appName)
dstDir := filepath.Join(s.cfg.Paths.StacksDir, appName)
isNew := false
if _, err := os.Stat(dstDir); os.IsNotExist(err) {
isNew = true
if err := os.MkdirAll(dstDir, 0755); err != nil {
s.logger.Printf("[SYNC] Failed to create stack dir %s: %v", dstDir, err)
continue
}
}
// Files to sync (only template files, never app.yaml or .env)
syncFiles := []string{"docker-compose.yml", ".felhom.yml"}
anyChanged := false
for _, filename := range syncFiles {
src := filepath.Join(srcDir, filename)
dst := filepath.Join(dstDir, filename)
if _, err := os.Stat(src); os.IsNotExist(err) {
if s.isDebug() {
s.logger.Printf("[DEBUG] [SYNC] %s/%s: source not found, skipping", appName, filename)
}
continue
}
changed, err := copyIfChanged(src, dst)
if err != nil {
s.logger.Printf("[SYNC] Failed to copy %s/%s: %v", appName, filename, err)
continue
}
if changed {
anyChanged = true
s.logger.Printf("[SYNC] Updated %s/%s", appName, filename)
if s.isDebug() {
s.logFileHashes(appName, filename, src, dst)
}
} else if s.isDebug() {
s.logger.Printf("[DEBUG] [SYNC] %s/%s: hash match, skipped", appName, filename)
}
}
if isNew {
newApps = append(newApps, appName)
} else if anyChanged {
updated = append(updated, appName)
}
}
return newApps, updated, nil
}
// logFileHashes logs the source and destination file hashes for debugging.
func (s *Syncer) logFileHashes(appName, filename, src, dst string) {
srcData, err := os.ReadFile(src)
if err != nil {
return
}
srcHash := sha256.Sum256(srcData)
dstData, err := os.ReadFile(dst)
if err != nil {
s.logger.Printf("[DEBUG] [SYNC] %s/%s: src=%s, dst=new file", appName, filename, hex.EncodeToString(srcHash[:8]))
return
}
dstHash := sha256.Sum256(dstData)
s.logger.Printf("[DEBUG] [SYNC] %s/%s: src=%s, dst=%s (changed)", appName, filename, hex.EncodeToString(srcHash[:8]), hex.EncodeToString(dstHash[:8]))
}
// copyIfChanged copies src to dst only if the content differs.
// Returns true if the file was actually written.
func copyIfChanged(src, dst string) (bool, error) {
srcData, err := os.ReadFile(src)
if err != nil {
return false, fmt.Errorf("reading %s: %w", src, err)
}
// Check if dst exists and has same content
dstData, err := os.ReadFile(dst)
if err == nil {
srcHash := sha256.Sum256(srcData)
dstHash := sha256.Sum256(dstData)
if srcHash == dstHash {
return false, nil // No change
}
}
if err := os.WriteFile(dst, srcData, 0644); err != nil {
return false, fmt.Errorf("writing %s: %w", dst, err)
}
return true, nil
}
// runGit executes a git command with the given args.
func (s *Syncer) runGit(args ...string) error {
return s.runGitInDir("", args...)
}
// runGitInDir executes a git command in the specified directory.
func (s *Syncer) runGitInDir(dir string, args ...string) error {
cmd := exec.Command("git", args...)
if dir != "" {
cmd.Dir = dir
}
var stderr bytes.Buffer
cmd.Stdout = io.Discard
cmd.Stderr = &stderr
s.logger.Printf("[SYNC] Running: git %s", strings.Join(args, " "))
if err := cmd.Run(); err != nil {
return fmt.Errorf("git %s: %w\nstderr: %s", strings.Join(args, " "), err, stderr.String())
}
return nil
}