Files
admin af1dd14933 fix: standardize log prefixes, remove duplicates, add missing module tags
Second-pass logging cleanup: consistent [LEVEL] [module] format across
all 41 files. Remove stale prefixes ([CF], [SYNC], [SCHED], [API],
[STORAGE], [HEALTH], [ROLLBACK]). Remove 5 duplicate log lines. Gate
ungated DEBUG lines. Fix wrong log levels (restore start WARN→INFO).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 21:20:09 +01:00

440 lines
13 KiB
Go

package sync
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-controller/internal/config"
)
// Syncer handles periodic git sync of the app catalog to the local stacks directory.
type Syncer struct {
cfg *config.Config
logger *log.Logger
cacheDir string // local git clone
rescanFn func() error
postSyncHook func(updated []string) // called after sync with names of updated stacks
mu sync.Mutex
lastSync time.Time
lastErr error
syncing bool
stopCh chan struct{}
stopOnce sync.Once
}
// SyncStatus holds information about the last sync operation.
type SyncStatus struct {
LastSync time.Time `json:"last_sync"`
LastStatus string `json:"last_status"` // "ok", "error", "disabled", "never"
LastError string `json:"last_error,omitempty"`
Syncing bool `json:"syncing"`
}
// SyncResult holds the result of a single sync operation.
type SyncResult struct {
OK bool `json:"ok"`
NewApps []string `json:"new_apps,omitempty"`
Updated []string `json:"updated,omitempty"`
Message string `json:"message"`
}
// New creates a new Syncer. rescanFn is called after a successful sync to trigger ScanStacks().
// postSyncHook is called with names of updated stacks (may be nil).
func New(cfg *config.Config, logger *log.Logger, rescanFn func() error, postSyncHook func([]string)) *Syncer {
cacheDir := filepath.Join(cfg.Paths.DataDir, "catalog-cache")
return &Syncer{
cfg: cfg,
logger: logger,
cacheDir: cacheDir,
rescanFn: rescanFn,
postSyncHook: postSyncHook,
stopCh: make(chan struct{}),
}
}
// isDebug returns true if the logging level is set to "debug".
func (s *Syncer) isDebug() bool { return s.cfg.Logging.Level == "debug" }
// maskRepoURL masks credentials in a git URL for safe logging.
// e.g., "https://user:token@host/path" → "https://user:***@host/path"
var reURLCreds = regexp.MustCompile(`(https?://)([^:]+):([^@]+)@`)
func maskRepoURL(url string) string {
return reURLCreds.ReplaceAllString(url, "${1}${2}:***@")
}
// Start begins the periodic sync loop. Call Stop() to terminate.
func (s *Syncer) Start() {
if s.cfg.Git.RepoURL == "" {
s.logger.Println("[WARN] [sync] Git repo URL is empty — sync disabled (manual mode)")
return
}
interval, err := time.ParseDuration(s.cfg.Git.SyncInterval)
if err != nil {
s.logger.Printf("[WARN] [sync] Invalid sync_interval %q, defaulting to 15m", s.cfg.Git.SyncInterval)
interval = 15 * time.Minute
}
s.logger.Printf("[INFO] [sync] Starting catalog sync (repo: %s, interval: %s)", s.cfg.Git.RepoURL, interval)
// Initial sync on startup
go func() {
result := s.doSync()
s.logger.Printf("[INFO] [sync] Initial sync: %s", result.Message)
}()
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
s.logger.Println("[INFO] [sync] Sync loop stopped")
return
case <-ticker.C:
result := s.doSync()
s.logger.Printf("[INFO] [sync] Periodic sync: %s", result.Message)
}
}
}()
}
// Stop terminates the periodic sync loop. Safe to call multiple times.
func (s *Syncer) Stop() {
s.stopOnce.Do(func() {
close(s.stopCh)
})
}
// TriggerSync performs an immediate sync. Returns the result.
// Debounce: refuses if last sync was less than 30 seconds ago.
func (s *Syncer) TriggerSync() SyncResult {
if s.cfg.Git.RepoURL == "" {
return SyncResult{OK: false, Message: "Git sync is disabled (no repo_url configured)"}
}
s.mu.Lock()
if s.syncing {
s.mu.Unlock()
return SyncResult{OK: false, Message: "Szinkronizálás már folyamatban"}
}
if time.Since(s.lastSync) < 30*time.Second {
s.mu.Unlock()
return SyncResult{OK: false, Message: "Túl gyakori szinkronizálás — várj 30 másodpercet"}
}
s.syncing = true
s.mu.Unlock()
return s.doSync()
}
// Status returns the current sync status.
func (s *Syncer) Status() SyncStatus {
s.mu.Lock()
defer s.mu.Unlock()
status := SyncStatus{
LastSync: s.lastSync,
Syncing: s.syncing,
}
if s.cfg.Git.RepoURL == "" {
status.LastStatus = "disabled"
} else if s.lastSync.IsZero() {
status.LastStatus = "never"
} else if s.lastErr != nil {
status.LastStatus = "error"
status.LastError = s.lastErr.Error()
} else {
status.LastStatus = "ok"
}
return status
}
// doSync performs the actual git clone/pull + file copy.
func (s *Syncer) doSync() SyncResult {
s.mu.Lock()
s.syncing = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.syncing = false
s.mu.Unlock()
}()
result := SyncResult{OK: true}
s.logger.Printf("[INFO] [sync] Starting catalog sync")
// Step 1: Clone or pull
if err := s.gitCloneOrPull(); err != nil {
s.logger.Printf("[ERROR] [sync] Catalog sync failed: %v", err)
s.mu.Lock()
s.lastErr = err
s.lastSync = time.Now()
s.mu.Unlock()
return SyncResult{OK: false, Message: fmt.Sprintf("Git hiba: %v", err)}
}
// Step 2: Copy templates to stacks dir
newApps, updated, err := s.copyTemplates()
if err != nil {
s.logger.Printf("[ERROR] [sync] Catalog sync failed: %v", err)
s.mu.Lock()
s.lastErr = err
s.lastSync = time.Now()
s.mu.Unlock()
return SyncResult{OK: false, Message: fmt.Sprintf("Másolási hiba: %v", err)}
}
result.NewApps = newApps
result.Updated = updated
// Step 3: Trigger rescan if anything changed
if len(newApps) > 0 || len(updated) > 0 {
if err := s.rescanFn(); err != nil {
s.logger.Printf("[WARN] [sync] Rescan after sync failed: %v", err)
}
}
// Step 4: Inject missing deploy fields for updated stacks
if len(updated) > 0 && s.postSyncHook != nil {
if s.isDebug() {
s.logger.Printf("[DEBUG] [sync] Post-sync hook: triggering missing field injection for %d stack(s): %v", len(updated), updated)
}
s.postSyncHook(updated)
}
// Build message
parts := []string{}
if len(newApps) > 0 {
parts = append(parts, fmt.Sprintf("új: %s", strings.Join(newApps, ", ")))
}
if len(updated) > 0 {
parts = append(parts, fmt.Sprintf("frissítve: %s", strings.Join(updated, ", ")))
}
if len(parts) == 0 {
result.Message = "Sablonok naprakészek — nincs változás"
} else {
result.Message = "Sablonok frissítve — " + strings.Join(parts, "; ")
}
s.logger.Printf("[INFO] [sync] Catalog sync complete")
s.mu.Lock()
s.lastErr = nil
s.lastSync = time.Now()
s.mu.Unlock()
return result
}
// gitCloneOrPull clones the repo if not yet cloned, or pulls latest changes.
func (s *Syncer) gitCloneOrPull() error {
if err := os.MkdirAll(filepath.Dir(s.cacheDir), 0755); err != nil {
return fmt.Errorf("creating cache parent dir: %w", err)
}
gitDir := filepath.Join(s.cacheDir, ".git")
if _, err := os.Stat(gitDir); os.IsNotExist(err) {
// Clone
s.logger.Printf("[INFO] [sync] Cloning %s → %s", s.cfg.Git.RepoURL, s.cacheDir)
args := []string{"clone", "--depth", "1", "--branch", s.cfg.Git.Branch}
repoURL := s.buildRepoURL()
args = append(args, repoURL, s.cacheDir)
if s.isDebug() {
s.logger.Printf("[DEBUG] [sync] git clone URL: %s, branch: %s, cacheDir: %s", maskRepoURL(repoURL), s.cfg.Git.Branch, s.cacheDir)
}
return s.runGit(args...)
}
// Remove stale git lock files left behind by interrupted operations
s.removeGitLockFiles()
// Pull
s.logger.Printf("[INFO] [sync] Pulling latest from %s (branch: %s)", s.cfg.Git.RepoURL, s.cfg.Git.Branch)
if s.isDebug() {
s.logger.Printf("[DEBUG] [sync] git fetch --depth 1 origin %s in %s", s.cfg.Git.Branch, s.cacheDir)
}
if err := s.runGitInDir(s.cacheDir, "fetch", "--depth", "1", "origin", s.cfg.Git.Branch); err != nil {
return fmt.Errorf("git fetch: %w", err)
}
if err := s.runGitInDir(s.cacheDir, "reset", "--hard", "origin/"+s.cfg.Git.Branch); err != nil {
return fmt.Errorf("git reset: %w", err)
}
return nil
}
// removeGitLockFiles removes stale .git/*.lock files that may have been left
// behind if a previous git operation was interrupted (e.g. container restart).
// These lock files prevent all subsequent git operations from succeeding.
func (s *Syncer) removeGitLockFiles() {
gitDir := filepath.Join(s.cacheDir, ".git")
lockFiles := []string{"index.lock", "shallow.lock", "HEAD.lock"}
for _, name := range lockFiles {
lockPath := filepath.Join(gitDir, name)
if _, err := os.Stat(lockPath); err == nil {
s.logger.Printf("[WARN] [sync] Removing stale lock file: %s", lockPath)
if err := os.Remove(lockPath); err != nil {
s.logger.Printf("[WARN] [sync] Failed to remove lock file %s: %v", lockPath, err)
}
}
}
}
// buildRepoURL constructs the repo URL with optional auth credentials.
func (s *Syncer) buildRepoURL() string {
url := s.cfg.Git.RepoURL
if s.cfg.Git.Username != "" && s.cfg.Git.Token != "" {
// Inject credentials into HTTPS URL: https://user:token@host/path
url = strings.Replace(url, "https://", fmt.Sprintf("https://%s:%s@", s.cfg.Git.Username, s.cfg.Git.Token), 1)
}
return url
}
// copyTemplates copies docker-compose.yml and .felhom.yml from the catalog cache
// to the stacks directory. Never overwrites app.yaml.
func (s *Syncer) copyTemplates() (newApps []string, updated []string, err error) {
templatesDir := filepath.Join(s.cacheDir, "templates")
entries, err := os.ReadDir(templatesDir)
if err != nil {
return nil, nil, fmt.Errorf("reading templates dir: %w", err)
}
for _, entry := range entries {
if !entry.IsDir() {
continue
}
appName := entry.Name()
srcDir := filepath.Join(templatesDir, appName)
dstDir := filepath.Join(s.cfg.Paths.StacksDir, appName)
isNew := false
if _, err := os.Stat(dstDir); os.IsNotExist(err) {
isNew = true
if err := os.MkdirAll(dstDir, 0755); err != nil {
s.logger.Printf("[WARN] [sync] Failed to create stack dir %s: %v", dstDir, err)
continue
}
}
// Files to sync (only template files, never app.yaml)
syncFiles := []string{"docker-compose.yml", ".felhom.yml"}
anyChanged := false
for _, filename := range syncFiles {
src := filepath.Join(srcDir, filename)
dst := filepath.Join(dstDir, filename)
if _, err := os.Stat(src); os.IsNotExist(err) {
if s.isDebug() {
s.logger.Printf("[DEBUG] [sync] %s/%s: source not found, skipping", appName, filename)
}
continue
}
changed, err := copyIfChanged(src, dst)
if err != nil {
s.logger.Printf("[WARN] [sync] Failed to copy catalog file %s/%s: %v", appName, filename, err)
continue
}
if changed {
anyChanged = true
s.logger.Printf("[INFO] [sync] Updated %s/%s", appName, filename)
if s.isDebug() {
s.logFileHashes(appName, filename, src, dst)
}
} else if s.isDebug() {
s.logger.Printf("[DEBUG] [sync] %s/%s: hash match, skipped", appName, filename)
}
}
if isNew {
newApps = append(newApps, appName)
} else if anyChanged {
updated = append(updated, appName)
}
}
return newApps, updated, nil
}
// logFileHashes logs the source and destination file hashes for debugging.
func (s *Syncer) logFileHashes(appName, filename, src, dst string) {
srcData, err := os.ReadFile(src)
if err != nil {
return
}
srcHash := sha256.Sum256(srcData)
dstData, err := os.ReadFile(dst)
if err != nil {
s.logger.Printf("[DEBUG] [sync] %s/%s: src=%s, dst=new file", appName, filename, hex.EncodeToString(srcHash[:8]))
return
}
dstHash := sha256.Sum256(dstData)
s.logger.Printf("[DEBUG] [sync] %s/%s: src=%s, dst=%s (changed)", appName, filename, hex.EncodeToString(srcHash[:8]), hex.EncodeToString(dstHash[:8]))
}
// copyIfChanged copies src to dst only if the content differs.
// Returns true if the file was actually written.
func copyIfChanged(src, dst string) (bool, error) {
srcData, err := os.ReadFile(src)
if err != nil {
return false, fmt.Errorf("reading %s: %w", src, err)
}
// Check if dst exists and has same content
dstData, err := os.ReadFile(dst)
if err == nil {
srcHash := sha256.Sum256(srcData)
dstHash := sha256.Sum256(dstData)
if srcHash == dstHash {
return false, nil // No change
}
}
if err := os.WriteFile(dst, srcData, 0644); err != nil {
return false, fmt.Errorf("writing %s: %w", dst, err)
}
return true, nil
}
// runGit executes a git command with the given args.
func (s *Syncer) runGit(args ...string) error {
return s.runGitInDir("", args...)
}
// runGitInDir executes a git command in the specified directory.
func (s *Syncer) runGitInDir(dir string, args ...string) error {
cmd := exec.Command("git", args...)
if dir != "" {
cmd.Dir = dir
}
var stderr bytes.Buffer
cmd.Stdout = io.Discard
cmd.Stderr = &stderr
s.logger.Printf("[DEBUG] [sync] Running: git %s", maskRepoURL(strings.Join(args, " ")))
if err := cmd.Run(); err != nil {
return fmt.Errorf("git %s: %w\nstderr: %s", strings.Join(args, " "), err, stderr.String())
}
return nil
}