Files
deploy-felhom-compose/controller/internal/assets/syncer.go
T
admin db83db383c fix: deep bug hunt II — concurrency, security & optimization (25 files)
Critical: watchdog mutex panic safety, SetGeoAppOverride nil guard,
SSD-only app DB restore fallback.

High: double deploy race (atomic Deploying flag), delete/remove during
deploy guard, ScanStacks overwrite protection, FileBrowser mount mutex,
PushEvent history, PushOnce error handling, DB dump sync+close before
rename, restic retry fresh context, encrypt failure logging, cross-backup
path traversal validation, deepCopyStack completeness.

Security: constant-time API key comparison, login rate limiting (5/min),
git credential masking in logs, storage path prefix traversal fix.

Concurrency: MigrateEncryption lock ordering, SubdomainInUse I/O outside
lock, scheduler late-registered jobs, SQLite WAL verification, metrics
shutdown context, telemetry scan error logging, asset sync lock scope.

Optimization: streaming file copy for DB dumps, restic stats dedup,
atomic infra config copy.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 14:21:09 +01:00

329 lines
8.0 KiB
Go

package assets
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// ManifestEntry mirrors the Hub's manifest entry.
type ManifestEntry struct {
Filename string `json:"filename"`
Size int64 `json:"size"`
SHA256 string `json:"sha256"`
}
// HubManifest is the response from GET /api/v1/assets/manifest.
type HubManifest struct {
Version int `json:"version"`
Generated string `json:"generated"`
Files []ManifestEntry `json:"files"`
}
// SyncStatus reports the state of the last sync.
type SyncStatus struct {
LastSync string `json:"last_sync"` // RFC3339
LastStatus string `json:"last_status"` // "ok", "error", "never"
LastError string `json:"last_error,omitempty"`
FileCount int `json:"file_count"`
TotalBytes int64 `json:"total_bytes"`
}
// Syncer downloads and caches app assets from the Hub API.
type Syncer struct {
hubURL string
apiKey string
assetsDir string // <dataDir>/assets — downloaded cache
fallbackDir string // /usr/share/felhom/assets — baked-in fallback
httpClient *http.Client
logger *log.Logger
debug bool
mu sync.Mutex
running bool
status SyncStatus
}
// New creates a Syncer that downloads assets from the Hub.
func New(hubURL, apiKey, assetsDir, fallbackDir string, logger *log.Logger, debug bool) *Syncer {
return &Syncer{
hubURL: strings.TrimSuffix(hubURL, "/"),
apiKey: apiKey,
assetsDir: assetsDir,
fallbackDir: fallbackDir,
httpClient: &http.Client{Timeout: 60 * time.Second},
logger: logger,
debug: debug,
status: SyncStatus{LastStatus: "never"},
}
}
// Sync fetches the manifest from the Hub, compares checksums, and downloads
// changed/new files. It also removes local files not in the Hub manifest.
func (s *Syncer) Sync(ctx context.Context) error {
s.mu.Lock()
if s.running {
s.mu.Unlock()
return fmt.Errorf("asset sync already in progress")
}
s.running = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.running = false
s.mu.Unlock()
}()
s.logger.Println("[INFO] Asset sync starting...")
if err := os.MkdirAll(s.assetsDir, 0755); err != nil {
s.setError(fmt.Errorf("create assets dir: %w", err))
return err
}
// 1. Fetch Hub manifest
if s.debug {
s.logger.Printf("[DEBUG] Asset sync: fetching manifest from %s/api/v1/assets/manifest", s.hubURL)
}
manifest, err := s.fetchManifest(ctx)
if err != nil {
s.setError(fmt.Errorf("fetch manifest: %w", err))
return err
}
if s.debug {
s.logger.Printf("[DEBUG] Asset sync: manifest has %d files", len(manifest.Files))
}
// 2. Build local hash map
localHashes, err := s.buildLocalHashes()
if err != nil {
s.setError(fmt.Errorf("scan local assets: %w", err))
return err
}
if s.debug {
s.logger.Printf("[DEBUG] Asset sync: %d local files found", len(localHashes))
}
// 3. Download changed/new files
hubFiles := make(map[string]bool, len(manifest.Files))
var downloaded, skipped int
var totalBytes int64
for _, entry := range manifest.Files {
hubFiles[entry.Filename] = true
totalBytes += entry.Size
if localHash, ok := localHashes[entry.Filename]; ok && localHash == entry.SHA256 {
skipped++
continue
}
if s.debug {
s.logger.Printf("[DEBUG] Asset sync: downloading %s (remote sha256=%s)", entry.Filename, entry.SHA256[:12]+"...")
}
if err := s.downloadFile(ctx, entry.Filename); err != nil {
s.logger.Printf("[WARN] Failed to download asset %s: %v", entry.Filename, err)
continue
}
downloaded++
}
// 4. Remove local files not in Hub manifest
var removed int
for name := range localHashes {
if !hubFiles[name] {
path := filepath.Join(s.assetsDir, name)
if s.debug {
s.logger.Printf("[DEBUG] Asset sync: removing stale file %s", name)
}
if err := os.Remove(path); err != nil {
s.logger.Printf("[WARN] Failed to remove stale asset %s: %v", name, err)
} else {
removed++
}
}
}
// 5. Save local manifest copy
s.saveLocalManifest(manifest)
// 6. Update status (under lock)
s.mu.Lock()
s.status = SyncStatus{
LastSync: time.Now().UTC().Format(time.RFC3339),
LastStatus: "ok",
FileCount: len(manifest.Files),
TotalBytes: totalBytes,
}
s.mu.Unlock()
s.logger.Printf("[INFO] Asset sync complete: %d downloaded, %d unchanged, %d removed (%d total files)",
downloaded, skipped, removed, len(manifest.Files))
return nil
}
// Resolve returns the full path for an asset: checks assetsDir first, fallbackDir second.
func (s *Syncer) Resolve(filename string) string {
filename = filepath.Base(filename) // sanitize
// Check synced cache first
cached := filepath.Join(s.assetsDir, filename)
if _, err := os.Stat(cached); err == nil {
return cached
}
// Fallback to baked-in assets
fallback := filepath.Join(s.fallbackDir, filename)
if _, err := os.Stat(fallback); err == nil {
return fallback
}
// Not found — return cached path so caller gets a clean 404
return cached
}
// Status returns the current sync status.
func (s *Syncer) Status() SyncStatus {
s.mu.Lock()
defer s.mu.Unlock()
return s.status
}
func (s *Syncer) setError(err error) {
s.mu.Lock()
s.status = SyncStatus{
LastSync: time.Now().UTC().Format(time.RFC3339),
LastStatus: "error",
LastError: err.Error(),
FileCount: s.status.FileCount,
TotalBytes: s.status.TotalBytes,
}
s.mu.Unlock()
s.logger.Printf("[WARN] Asset sync failed: %v", err)
}
func (s *Syncer) fetchManifest(ctx context.Context) (*HubManifest, error) {
url := s.hubURL + "/api/v1/assets/manifest"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+s.apiKey)
resp, err := s.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("HTTP request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var manifest HubManifest
if err := json.NewDecoder(resp.Body).Decode(&manifest); err != nil {
return nil, fmt.Errorf("decode manifest: %w", err)
}
return &manifest, nil
}
func (s *Syncer) buildLocalHashes() (map[string]string, error) {
entries, err := os.ReadDir(s.assetsDir)
if err != nil {
return nil, err
}
hashes := make(map[string]string)
for _, e := range entries {
if e.IsDir() || e.Name() == "manifest.json" {
continue
}
path := filepath.Join(s.assetsDir, e.Name())
h, err := fileSHA256(path)
if err != nil {
continue
}
hashes[e.Name()] = h
}
return hashes, nil
}
func (s *Syncer) downloadFile(ctx context.Context, filename string) error {
url := s.hubURL + "/api/v1/assets/file/" + filename
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+s.apiKey)
resp, err := s.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP %d for %s", resp.StatusCode, filename)
}
// Atomic write: write to .tmp, rename
dst := filepath.Join(s.assetsDir, filepath.Base(filename))
tmp := dst + ".tmp"
out, err := os.Create(tmp)
if err != nil {
return err
}
if _, err := io.Copy(out, resp.Body); err != nil {
out.Close()
os.Remove(tmp)
return err
}
if err := out.Close(); err != nil {
os.Remove(tmp)
return err
}
return os.Rename(tmp, dst)
}
func (s *Syncer) saveLocalManifest(manifest *HubManifest) {
data, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return
}
path := filepath.Join(s.assetsDir, "manifest.json")
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0644); err != nil {
return
}
os.Rename(tmp, path)
}
func fileSHA256(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", err
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return hex.EncodeToString(h.Sum(nil)), nil
}