Files
deploy-felhom-compose/controller/internal/stacks/healthprobe.go
T
admin 8b8c04a487 fix: P0+P1 critical bug fixes across controller (24 files)
Concurrency fixes:
- Deep-copy stacks in GetStack/GetStacks to prevent shared state mutation (C04)
- Add per-state mutex to watchdog pathProbeState (C05)
- Guard MetricsCollector.Start() with sync.Once against double-start (C06)
- Hold diskJobMu across entire raw mount operation (C07)
- Add mutex to SetEncryptionKey (C08), MigrateEncryption write lock (H03)
- Use sync.Once for sync.Stop() channel close (H08)
- Set syncing=true before releasing lock in TriggerSync (H09)
- Deep-copy lastDBDump/lastBackup in GetFullStatus (H11)
- Add WaitGroup for stderr goroutine in MigrateDrive (H19)
- Add mutex to SetBackupRunningCheck (M18)

Security fixes:
- Validate Bearer token against Hub API key in CSRF middleware (H16)
- Validate backup paths start with expected prefix in RemoveStack (M12)
- Guard uuid[:8] slice with length check (H20)
- Parse fstab fields exactly for mount target matching (H21)

Bug fixes:
- Use decrypted env vars for compose deploy (C01)
- Log decrypt failures in DecryptMap instead of swallowing (C02)
- Move Deployed=false inside lock in runComposeDeploy (C03)
- Fix activeDrives() to skip disconnected drives (H02)
- Fix Snapshot() stderr extraction from exec.ExitError (H01)
- Check unlockCmd.Run() error in restic (H01)
- Buffer template rendering via bytes.Buffer (H07)
- Thread context.Context through cloudflare client (H10)
- Fix leaf-name collision detection in cross-drive backup (H15)
- Add nil check for crossDriveRunner (H17)
- Use strings.TrimSpace instead of slice on command output (H18)
- Make SaveAppConfig atomic with write-to-tmp+rename (H04)
- Pass encKey on deploy failure SaveAppConfig (H05)
- Fix IPv6 address format in TCP health probe

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 13:39:45 +01:00

324 lines
8.2 KiB
Go

package stacks
import (
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
"time"
)
// probeTarget holds the info needed to probe a single stack.
type probeTarget struct {
stackName string
containerName string
checks []HealthCheckItem
}
// RunHealthProbes runs controller-side health probes for all running stacks
// that have healthcheck configuration and whose interval has elapsed.
// Called by the scheduler every minute.
func (m *Manager) RunHealthProbes() error {
// Phase 1: collect targets (under lock)
m.mu.RLock()
var targets []probeTarget
for name, stack := range m.stacks {
if stack.State != StateRunning && stack.State != StateUnhealthy {
continue
}
hc := stack.Meta.HealthCheck
if hc == nil || len(hc.Checks) == 0 {
continue
}
// Check if interval has elapsed since last probe
interval := parseInterval(hc.Interval)
if stack.HealthProbe != nil && time.Since(stack.HealthProbe.LastCheck) < interval {
continue
}
// Find the main container to probe (matching stack name)
containerName := findProbeContainer(name, stack.Containers)
if containerName == "" {
continue
}
targets = append(targets, probeTarget{
stackName: name,
containerName: containerName,
checks: hc.Checks,
})
}
m.mu.RUnlock()
if len(targets) == 0 {
return nil
}
// Phase 2: run all probes concurrently (no lock held)
type probeResult struct {
stackName string
result *HealthProbeResult
}
results := make([]probeResult, len(targets))
var wg sync.WaitGroup
for i, t := range targets {
wg.Add(1)
go func(idx int, t probeTarget) {
defer wg.Done()
result := m.runChecks(t)
results[idx] = probeResult{stackName: t.stackName, result: result}
}(i, t)
}
wg.Wait()
// Phase 3: apply results and log (under lock)
m.mu.Lock()
okCount, failCount := 0, 0
for _, pr := range results {
stack, ok := m.stacks[pr.stackName]
if !ok {
continue
}
stack.HealthProbe = pr.result
if pr.result.Healthy {
okCount++
// If Docker says running and probe is healthy, ensure state is running
// (clears a previous unhealthy override)
if stack.State == StateUnhealthy {
stack.State = StateRunning
}
} else {
failCount++
if stack.State == StateRunning {
stack.State = StateUnhealthy
}
}
}
m.mu.Unlock()
// Summary log
if failCount > 0 {
m.logger.Printf("[INFO] Health probes: %d ok, %d unhealthy (of %d probed)", okCount, failCount, len(targets))
} else if m.isDebug() {
m.logger.Printf("[DEBUG] Health probes: %d ok (of %d probed)", okCount, len(targets))
}
return nil
}
// runChecks executes all health check items for a single stack target.
func (m *Manager) runChecks(t probeTarget) *HealthProbeResult {
result := &HealthProbeResult{
LastCheck: time.Now(),
Healthy: true,
}
for _, check := range t.checks {
detail := m.runSingleCheck(t.containerName, check)
result.Details = append(result.Details, detail)
if detail.Healthy {
if m.isDebug() {
if detail.Status > 0 {
m.logger.Printf("[DEBUG] Health probe %s: %s %s :%d%s → %d (%s)",
t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Status, detail.Latency)
} else {
m.logger.Printf("[DEBUG] Health probe %s: TCP :%d → ok (%s)",
t.stackName, check.Port, detail.Latency)
}
}
} else {
result.Healthy = false
m.logger.Printf("[WARN] Health probe %s: %s %s :%d%s → %s",
t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Error)
}
}
return result
}
// runSingleCheck executes one health check item and returns the result.
func (m *Manager) runSingleCheck(containerName string, check HealthCheckItem) HealthCheckDetail {
target := fmt.Sprintf(":%d%s", check.Port, check.Path)
switch check.Type {
case "tcp":
return m.probeTCP(containerName, check.Port, target)
case "http", "api":
return m.probeHTTP(containerName, check, target)
default:
return HealthCheckDetail{
Type: check.Type,
Target: target,
Healthy: false,
Error: fmt.Sprintf("unknown check type: %s", check.Type),
}
}
}
// probeTCP tests if a TCP port is reachable on the container.
func (m *Manager) probeTCP(containerName string, port int, target string) HealthCheckDetail {
start := time.Now()
addr := net.JoinHostPort(containerName, fmt.Sprintf("%d", port))
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
latency := time.Since(start)
detail := HealthCheckDetail{
Type: "tcp",
Target: target,
Latency: formatLatency(latency),
}
if err != nil {
detail.Healthy = false
detail.Error = err.Error()
} else {
conn.Close()
detail.Healthy = true
}
return detail
}
// probeHTTP makes an HTTP request to the container and evaluates the result.
// For "http" type: any response = healthy. For "api" type: validates expect rules.
func (m *Manager) probeHTTP(containerName string, check HealthCheckItem, target string) HealthCheckDetail {
url := fmt.Sprintf("http://%s:%d%s", containerName, check.Port, check.Path)
method := check.Method
if method == "" {
method = "GET"
}
start := time.Now()
client := &http.Client{
Timeout: 5 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
req, err := http.NewRequest(method, url, nil)
if err != nil {
return HealthCheckDetail{
Type: check.Type,
Target: target,
Healthy: false,
Error: fmt.Sprintf("bad request: %v", err),
Latency: "0ms",
}
}
resp, err := client.Do(req)
latency := time.Since(start)
detail := HealthCheckDetail{
Type: check.Type,
Target: target,
Latency: formatLatency(latency),
}
if err != nil {
detail.Healthy = false
detail.Error = err.Error()
return detail
}
defer resp.Body.Close()
detail.Status = resp.StatusCode
// For "http" type, any response means the service is alive
if check.Type == "http" {
detail.Healthy = true
return detail
}
// For "api" type, validate expectations
if check.Expect == nil {
// No expectations = just check for a response (same as http)
detail.Healthy = true
return detail
}
// Check expected status code
if check.Expect.Status > 0 && resp.StatusCode != check.Expect.Status {
detail.Healthy = false
detail.Error = fmt.Sprintf("expected status %d, got %d", check.Expect.Status, resp.StatusCode)
return detail
}
// Check expected body content
if check.Expect.BodyContains != "" {
body, err := io.ReadAll(io.LimitReader(resp.Body, 8192)) // read up to 8KB
if err != nil {
detail.Healthy = false
detail.Error = fmt.Sprintf("reading body: %v", err)
return detail
}
if !strings.Contains(string(body), check.Expect.BodyContains) {
detail.Healthy = false
detail.Error = fmt.Sprintf("body missing expected string %q", check.Expect.BodyContains)
return detail
}
}
detail.Healthy = true
return detail
}
// findProbeContainer returns the container name to probe for a stack.
// Prefers exact match with stack name, then prefix match (stack-service-N).
func findProbeContainer(stackName string, containers []ContainerInfo) string {
for _, c := range containers {
if c.Name == stackName && (c.State == StateRunning || c.State == StateUnhealthy) {
return c.Name
}
}
// Fallback: first running container with matching prefix
for _, c := range containers {
if strings.HasPrefix(c.Name, stackName) && (c.State == StateRunning || c.State == StateUnhealthy) {
return c.Name
}
}
return ""
}
// parseInterval parses a duration string like "5m", "30s", "1h".
// Returns 5 minutes as default if parsing fails.
func parseInterval(s string) time.Duration {
if s == "" {
return 5 * time.Minute
}
d, err := time.ParseDuration(s)
if err != nil {
return 5 * time.Minute
}
return d
}
// formatLatency formats a duration as a human-readable latency string.
func formatLatency(d time.Duration) string {
if d < time.Millisecond {
return fmt.Sprintf("%dµs", d.Microseconds())
}
if d < time.Second {
return fmt.Sprintf("%dms", d.Milliseconds())
}
return fmt.Sprintf("%.1fs", d.Seconds())
}
// methodOrEmpty returns the method string for logging, or empty for non-api checks.
func methodOrEmpty(check HealthCheckItem) string {
if check.Type == "api" && check.Method != "" {
return check.Method
}
if check.Type == "api" {
return "GET"
}
return "GET"
}