0c687ae280
Catalog sync could fail permanently if the container was killed mid-fetch, leaving behind .git/shallow.lock (or index.lock, HEAD.lock). Now cleaned up automatically before each git fetch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
430 lines
12 KiB
Go
430 lines
12 KiB
Go
package sync
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.dooplex.hu/admin/felhom-controller/internal/config"
|
|
)
|
|
|
|
// Syncer handles periodic git sync of the app catalog to the local stacks directory.
|
|
type Syncer struct {
|
|
cfg *config.Config
|
|
logger *log.Logger
|
|
cacheDir string // local git clone
|
|
rescanFn func() error
|
|
postSyncHook func(updated []string) // called after sync with names of updated stacks
|
|
mu sync.Mutex
|
|
lastSync time.Time
|
|
lastErr error
|
|
syncing bool
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// SyncStatus holds information about the last sync operation.
|
|
type SyncStatus struct {
|
|
LastSync time.Time `json:"last_sync"`
|
|
LastStatus string `json:"last_status"` // "ok", "error", "disabled", "never"
|
|
LastError string `json:"last_error,omitempty"`
|
|
Syncing bool `json:"syncing"`
|
|
}
|
|
|
|
// SyncResult holds the result of a single sync operation.
|
|
type SyncResult struct {
|
|
OK bool `json:"ok"`
|
|
NewApps []string `json:"new_apps,omitempty"`
|
|
Updated []string `json:"updated,omitempty"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
// New creates a new Syncer. rescanFn is called after a successful sync to trigger ScanStacks().
|
|
// postSyncHook is called with names of updated stacks (may be nil).
|
|
func New(cfg *config.Config, logger *log.Logger, rescanFn func() error, postSyncHook func([]string)) *Syncer {
|
|
cacheDir := filepath.Join(cfg.Paths.DataDir, "catalog-cache")
|
|
return &Syncer{
|
|
cfg: cfg,
|
|
logger: logger,
|
|
cacheDir: cacheDir,
|
|
rescanFn: rescanFn,
|
|
postSyncHook: postSyncHook,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// isDebug returns true if the logging level is set to "debug".
|
|
func (s *Syncer) isDebug() bool { return s.cfg.Logging.Level == "debug" }
|
|
|
|
// maskRepoURL masks credentials in a git URL for safe logging.
|
|
// e.g., "https://user:token@host/path" → "https://user:***@host/path"
|
|
var reURLCreds = regexp.MustCompile(`(https?://)([^:]+):([^@]+)@`)
|
|
|
|
func maskRepoURL(url string) string {
|
|
return reURLCreds.ReplaceAllString(url, "${1}${2}:***@")
|
|
}
|
|
|
|
// Start begins the periodic sync loop. Call Stop() to terminate.
|
|
func (s *Syncer) Start() {
|
|
if s.cfg.Git.RepoURL == "" {
|
|
s.logger.Println("[SYNC] Git repo URL is empty — sync disabled (manual mode)")
|
|
return
|
|
}
|
|
|
|
interval, err := time.ParseDuration(s.cfg.Git.SyncInterval)
|
|
if err != nil {
|
|
s.logger.Printf("[SYNC] Invalid sync_interval %q, defaulting to 15m", s.cfg.Git.SyncInterval)
|
|
interval = 15 * time.Minute
|
|
}
|
|
|
|
s.logger.Printf("[SYNC] Starting catalog sync (repo: %s, interval: %s)", s.cfg.Git.RepoURL, interval)
|
|
|
|
// Initial sync on startup
|
|
go func() {
|
|
result := s.doSync()
|
|
s.logger.Printf("[SYNC] Initial sync: %s", result.Message)
|
|
}()
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.stopCh:
|
|
s.logger.Println("[SYNC] Sync loop stopped")
|
|
return
|
|
case <-ticker.C:
|
|
result := s.doSync()
|
|
s.logger.Printf("[SYNC] Periodic sync: %s", result.Message)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop terminates the periodic sync loop.
|
|
func (s *Syncer) Stop() {
|
|
close(s.stopCh)
|
|
}
|
|
|
|
// TriggerSync performs an immediate sync. Returns the result.
|
|
// Debounce: refuses if last sync was less than 30 seconds ago.
|
|
func (s *Syncer) TriggerSync() SyncResult {
|
|
if s.cfg.Git.RepoURL == "" {
|
|
return SyncResult{OK: false, Message: "Git sync is disabled (no repo_url configured)"}
|
|
}
|
|
|
|
s.mu.Lock()
|
|
if s.syncing {
|
|
s.mu.Unlock()
|
|
return SyncResult{OK: false, Message: "Szinkronizálás már folyamatban"}
|
|
}
|
|
if time.Since(s.lastSync) < 30*time.Second {
|
|
s.mu.Unlock()
|
|
return SyncResult{OK: false, Message: "Túl gyakori szinkronizálás — várj 30 másodpercet"}
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
return s.doSync()
|
|
}
|
|
|
|
// Status returns the current sync status.
|
|
func (s *Syncer) Status() SyncStatus {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
status := SyncStatus{
|
|
LastSync: s.lastSync,
|
|
Syncing: s.syncing,
|
|
}
|
|
|
|
if s.cfg.Git.RepoURL == "" {
|
|
status.LastStatus = "disabled"
|
|
} else if s.lastSync.IsZero() {
|
|
status.LastStatus = "never"
|
|
} else if s.lastErr != nil {
|
|
status.LastStatus = "error"
|
|
status.LastError = s.lastErr.Error()
|
|
} else {
|
|
status.LastStatus = "ok"
|
|
}
|
|
|
|
return status
|
|
}
|
|
|
|
// doSync performs the actual git clone/pull + file copy.
|
|
func (s *Syncer) doSync() SyncResult {
|
|
s.mu.Lock()
|
|
s.syncing = true
|
|
s.mu.Unlock()
|
|
|
|
defer func() {
|
|
s.mu.Lock()
|
|
s.syncing = false
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
result := SyncResult{OK: true}
|
|
|
|
// Step 1: Clone or pull
|
|
if err := s.gitCloneOrPull(); err != nil {
|
|
s.mu.Lock()
|
|
s.lastErr = err
|
|
s.lastSync = time.Now()
|
|
s.mu.Unlock()
|
|
return SyncResult{OK: false, Message: fmt.Sprintf("Git hiba: %v", err)}
|
|
}
|
|
|
|
// Step 2: Copy templates to stacks dir
|
|
newApps, updated, err := s.copyTemplates()
|
|
if err != nil {
|
|
s.mu.Lock()
|
|
s.lastErr = err
|
|
s.lastSync = time.Now()
|
|
s.mu.Unlock()
|
|
return SyncResult{OK: false, Message: fmt.Sprintf("Másolási hiba: %v", err)}
|
|
}
|
|
|
|
result.NewApps = newApps
|
|
result.Updated = updated
|
|
|
|
// Step 3: Trigger rescan if anything changed
|
|
if len(newApps) > 0 || len(updated) > 0 {
|
|
if err := s.rescanFn(); err != nil {
|
|
s.logger.Printf("[SYNC] Rescan after sync failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// Step 4: Inject missing deploy fields for updated stacks
|
|
if len(updated) > 0 && s.postSyncHook != nil {
|
|
if s.isDebug() {
|
|
s.logger.Printf("[DEBUG] [SYNC] Post-sync hook: triggering missing field injection for %d stack(s): %v", len(updated), updated)
|
|
}
|
|
s.postSyncHook(updated)
|
|
}
|
|
|
|
// Build message
|
|
parts := []string{}
|
|
if len(newApps) > 0 {
|
|
parts = append(parts, fmt.Sprintf("új: %s", strings.Join(newApps, ", ")))
|
|
}
|
|
if len(updated) > 0 {
|
|
parts = append(parts, fmt.Sprintf("frissítve: %s", strings.Join(updated, ", ")))
|
|
}
|
|
if len(parts) == 0 {
|
|
result.Message = "Sablonok naprakészek — nincs változás"
|
|
} else {
|
|
result.Message = "Sablonok frissítve — " + strings.Join(parts, "; ")
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.lastErr = nil
|
|
s.lastSync = time.Now()
|
|
s.mu.Unlock()
|
|
|
|
return result
|
|
}
|
|
|
|
// gitCloneOrPull clones the repo if not yet cloned, or pulls latest changes.
|
|
func (s *Syncer) gitCloneOrPull() error {
|
|
if err := os.MkdirAll(filepath.Dir(s.cacheDir), 0755); err != nil {
|
|
return fmt.Errorf("creating cache parent dir: %w", err)
|
|
}
|
|
|
|
gitDir := filepath.Join(s.cacheDir, ".git")
|
|
if _, err := os.Stat(gitDir); os.IsNotExist(err) {
|
|
// Clone
|
|
s.logger.Printf("[SYNC] Cloning %s → %s", s.cfg.Git.RepoURL, s.cacheDir)
|
|
args := []string{"clone", "--depth", "1", "--branch", s.cfg.Git.Branch}
|
|
repoURL := s.buildRepoURL()
|
|
args = append(args, repoURL, s.cacheDir)
|
|
if s.isDebug() {
|
|
s.logger.Printf("[DEBUG] [SYNC] git clone URL: %s, branch: %s, cacheDir: %s", maskRepoURL(repoURL), s.cfg.Git.Branch, s.cacheDir)
|
|
}
|
|
return s.runGit(args...)
|
|
}
|
|
|
|
// Remove stale git lock files left behind by interrupted operations
|
|
s.removeGitLockFiles()
|
|
|
|
// Pull
|
|
s.logger.Printf("[SYNC] Pulling latest from %s (branch: %s)", s.cfg.Git.RepoURL, s.cfg.Git.Branch)
|
|
if s.isDebug() {
|
|
s.logger.Printf("[DEBUG] [SYNC] git fetch --depth 1 origin %s in %s", s.cfg.Git.Branch, s.cacheDir)
|
|
}
|
|
if err := s.runGitInDir(s.cacheDir, "fetch", "--depth", "1", "origin", s.cfg.Git.Branch); err != nil {
|
|
return fmt.Errorf("git fetch: %w", err)
|
|
}
|
|
if err := s.runGitInDir(s.cacheDir, "reset", "--hard", "origin/"+s.cfg.Git.Branch); err != nil {
|
|
return fmt.Errorf("git reset: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// removeGitLockFiles removes stale .git/*.lock files that may have been left
|
|
// behind if a previous git operation was interrupted (e.g. container restart).
|
|
// These lock files prevent all subsequent git operations from succeeding.
|
|
func (s *Syncer) removeGitLockFiles() {
|
|
gitDir := filepath.Join(s.cacheDir, ".git")
|
|
lockFiles := []string{"index.lock", "shallow.lock", "HEAD.lock"}
|
|
for _, name := range lockFiles {
|
|
lockPath := filepath.Join(gitDir, name)
|
|
if _, err := os.Stat(lockPath); err == nil {
|
|
s.logger.Printf("[SYNC] Removing stale lock file: %s", lockPath)
|
|
if err := os.Remove(lockPath); err != nil {
|
|
s.logger.Printf("[SYNC] Failed to remove lock file %s: %v", lockPath, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// buildRepoURL constructs the repo URL with optional auth credentials.
|
|
func (s *Syncer) buildRepoURL() string {
|
|
url := s.cfg.Git.RepoURL
|
|
if s.cfg.Git.Username != "" && s.cfg.Git.Token != "" {
|
|
// Inject credentials into HTTPS URL: https://user:token@host/path
|
|
url = strings.Replace(url, "https://", fmt.Sprintf("https://%s:%s@", s.cfg.Git.Username, s.cfg.Git.Token), 1)
|
|
}
|
|
return url
|
|
}
|
|
|
|
// copyTemplates copies docker-compose.yml and .felhom.yml from the catalog cache
|
|
// to the stacks directory. Never overwrites app.yaml or .env files.
|
|
func (s *Syncer) copyTemplates() (newApps []string, updated []string, err error) {
|
|
templatesDir := filepath.Join(s.cacheDir, "templates")
|
|
entries, err := os.ReadDir(templatesDir)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("reading templates dir: %w", err)
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if !entry.IsDir() {
|
|
continue
|
|
}
|
|
|
|
appName := entry.Name()
|
|
srcDir := filepath.Join(templatesDir, appName)
|
|
dstDir := filepath.Join(s.cfg.Paths.StacksDir, appName)
|
|
|
|
isNew := false
|
|
if _, err := os.Stat(dstDir); os.IsNotExist(err) {
|
|
isNew = true
|
|
if err := os.MkdirAll(dstDir, 0755); err != nil {
|
|
s.logger.Printf("[SYNC] Failed to create stack dir %s: %v", dstDir, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Files to sync (only template files, never app.yaml or .env)
|
|
syncFiles := []string{"docker-compose.yml", ".felhom.yml"}
|
|
anyChanged := false
|
|
|
|
for _, filename := range syncFiles {
|
|
src := filepath.Join(srcDir, filename)
|
|
dst := filepath.Join(dstDir, filename)
|
|
|
|
if _, err := os.Stat(src); os.IsNotExist(err) {
|
|
if s.isDebug() {
|
|
s.logger.Printf("[DEBUG] [SYNC] %s/%s: source not found, skipping", appName, filename)
|
|
}
|
|
continue
|
|
}
|
|
|
|
changed, err := copyIfChanged(src, dst)
|
|
if err != nil {
|
|
s.logger.Printf("[SYNC] Failed to copy %s/%s: %v", appName, filename, err)
|
|
continue
|
|
}
|
|
if changed {
|
|
anyChanged = true
|
|
s.logger.Printf("[SYNC] Updated %s/%s", appName, filename)
|
|
if s.isDebug() {
|
|
s.logFileHashes(appName, filename, src, dst)
|
|
}
|
|
} else if s.isDebug() {
|
|
s.logger.Printf("[DEBUG] [SYNC] %s/%s: hash match, skipped", appName, filename)
|
|
}
|
|
}
|
|
|
|
if isNew {
|
|
newApps = append(newApps, appName)
|
|
} else if anyChanged {
|
|
updated = append(updated, appName)
|
|
}
|
|
}
|
|
|
|
return newApps, updated, nil
|
|
}
|
|
|
|
// logFileHashes logs the source and destination file hashes for debugging.
|
|
func (s *Syncer) logFileHashes(appName, filename, src, dst string) {
|
|
srcData, err := os.ReadFile(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
srcHash := sha256.Sum256(srcData)
|
|
dstData, err := os.ReadFile(dst)
|
|
if err != nil {
|
|
s.logger.Printf("[DEBUG] [SYNC] %s/%s: src=%s, dst=new file", appName, filename, hex.EncodeToString(srcHash[:8]))
|
|
return
|
|
}
|
|
dstHash := sha256.Sum256(dstData)
|
|
s.logger.Printf("[DEBUG] [SYNC] %s/%s: src=%s, dst=%s (changed)", appName, filename, hex.EncodeToString(srcHash[:8]), hex.EncodeToString(dstHash[:8]))
|
|
}
|
|
|
|
// copyIfChanged copies src to dst only if the content differs.
|
|
// Returns true if the file was actually written.
|
|
func copyIfChanged(src, dst string) (bool, error) {
|
|
srcData, err := os.ReadFile(src)
|
|
if err != nil {
|
|
return false, fmt.Errorf("reading %s: %w", src, err)
|
|
}
|
|
|
|
// Check if dst exists and has same content
|
|
dstData, err := os.ReadFile(dst)
|
|
if err == nil {
|
|
srcHash := sha256.Sum256(srcData)
|
|
dstHash := sha256.Sum256(dstData)
|
|
if srcHash == dstHash {
|
|
return false, nil // No change
|
|
}
|
|
}
|
|
|
|
if err := os.WriteFile(dst, srcData, 0644); err != nil {
|
|
return false, fmt.Errorf("writing %s: %w", dst, err)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// runGit executes a git command with the given args.
|
|
func (s *Syncer) runGit(args ...string) error {
|
|
return s.runGitInDir("", args...)
|
|
}
|
|
|
|
// runGitInDir executes a git command in the specified directory.
|
|
func (s *Syncer) runGitInDir(dir string, args ...string) error {
|
|
cmd := exec.Command("git", args...)
|
|
if dir != "" {
|
|
cmd.Dir = dir
|
|
}
|
|
|
|
var stderr bytes.Buffer
|
|
cmd.Stdout = io.Discard
|
|
cmd.Stderr = &stderr
|
|
|
|
s.logger.Printf("[SYNC] Running: git %s", strings.Join(args, " "))
|
|
|
|
if err := cmd.Run(); err != nil {
|
|
return fmt.Errorf("git %s: %w\nstderr: %s", strings.Join(args, " "), err, stderr.String())
|
|
}
|
|
return nil
|
|
}
|