package stacks import ( "fmt" "io" "net" "net/http" "strings" "sync" "time" ) // probeTarget holds the info needed to probe a single stack. type probeTarget struct { stackName string containerName string checks []HealthCheckItem } // RunHealthProbes runs controller-side health probes for all running stacks // that have healthcheck configuration and whose interval has elapsed. // Called by the scheduler every minute. func (m *Manager) RunHealthProbes() error { // Phase 1: collect targets (under lock) m.mu.RLock() var targets []probeTarget skippedNotDue := 0 skippedNoContainer := 0 for name, stack := range m.stacks { if stack.State != StateRunning && stack.State != StateUnhealthy { continue } hc := stack.Meta.HealthCheck if hc == nil || len(hc.Checks) == 0 { continue } // Check if interval has elapsed since last probe. // Fast 10s probes until healthy, then normal interval (default 5m). // When HealthProbe is nil (just started/restarted), probe immediately. interval := parseInterval(hc.Interval) if stack.HealthProbe != nil { effectiveInterval := interval if !stack.HealthProbe.Healthy { effectiveInterval = 10 * time.Second } if time.Since(stack.HealthProbe.LastCheck) < effectiveInterval { skippedNotDue++ if m.isDebug() { sinceLastCheck := time.Since(stack.HealthProbe.LastCheck).Round(time.Second) m.logger.Printf("[DEBUG] [stacks] RunHealthProbes: skipping %s — last check %s ago, effective interval %s, healthy=%v", name, sinceLastCheck, effectiveInterval, stack.HealthProbe.Healthy) } continue } } // Find the main container to probe (matching stack name) containerName := findProbeContainer(name, stack.Containers) if containerName == "" { skippedNoContainer++ continue } targets = append(targets, probeTarget{ stackName: name, containerName: containerName, checks: hc.Checks, }) } m.mu.RUnlock() if m.isDebug() { m.logger.Printf("[DEBUG] [stacks] RunHealthProbes: collected %d targets (%d skipped not due, %d skipped no container)", len(targets), skippedNotDue, skippedNoContainer) } if len(targets) == 0 { return nil } // Phase 2: run all probes concurrently (no lock held) type probeResult struct { stackName string result *HealthProbeResult } results := make([]probeResult, len(targets)) var wg sync.WaitGroup for i, t := range targets { wg.Add(1) go func(idx int, t probeTarget) { defer wg.Done() result := m.runChecks(t) results[idx] = probeResult{stackName: t.stackName, result: result} }(i, t) } wg.Wait() // Phase 3: apply results and log (under lock) m.mu.Lock() okCount, failCount := 0, 0 for _, pr := range results { stack, ok := m.stacks[pr.stackName] if !ok { continue } stack.HealthProbe = pr.result if pr.result.Healthy { okCount++ // If Docker says running and probe is healthy, ensure state is running // (clears a previous unhealthy override) if stack.State == StateUnhealthy { stack.State = StateRunning } } else { failCount++ if stack.State == StateRunning { stack.State = StateUnhealthy } } } m.mu.Unlock() // Summary log if failCount > 0 { m.logger.Printf("[WARN] Health probes: %d ok, %d unhealthy (of %d probed)", okCount, failCount, len(targets)) } else { m.logger.Printf("[INFO] Health probes: %d ok (of %d probed)", okCount, len(targets)) } return nil } // runChecks executes all health check items for a single stack target. func (m *Manager) runChecks(t probeTarget) *HealthProbeResult { result := &HealthProbeResult{ LastCheck: time.Now(), Healthy: true, } for _, check := range t.checks { detail := m.runSingleCheck(t.containerName, check) result.Details = append(result.Details, detail) if detail.Healthy { if m.isDebug() { if detail.Status > 0 { m.logger.Printf("[DEBUG] Health probe %s: %s %s :%d%s → %d (%s)", t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Status, detail.Latency) } else { m.logger.Printf("[DEBUG] Health probe %s: TCP :%d → ok (%s)", t.stackName, check.Port, detail.Latency) } } } else { result.Healthy = false m.logger.Printf("[WARN] Health probe %s: %s %s :%d%s → %s", t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Error) } } return result } // runSingleCheck executes one health check item and returns the result. func (m *Manager) runSingleCheck(containerName string, check HealthCheckItem) HealthCheckDetail { target := fmt.Sprintf(":%d%s", check.Port, check.Path) switch check.Type { case "tcp": return m.probeTCP(containerName, check.Port, target) case "http", "api": return m.probeHTTP(containerName, check, target) default: return HealthCheckDetail{ Type: check.Type, Target: target, Healthy: false, Error: fmt.Sprintf("unknown check type: %s", check.Type), } } } // probeTCP tests if a TCP port is reachable on the container. func (m *Manager) probeTCP(containerName string, port int, target string) HealthCheckDetail { start := time.Now() addr := net.JoinHostPort(containerName, fmt.Sprintf("%d", port)) conn, err := net.DialTimeout("tcp", addr, 5*time.Second) latency := time.Since(start) detail := HealthCheckDetail{ Type: "tcp", Target: target, Latency: formatLatency(latency), } if err != nil { detail.Healthy = false detail.Error = err.Error() } else { conn.Close() detail.Healthy = true } return detail } // probeHTTP makes an HTTP request to the container and evaluates the result. // For "http" type: any response = healthy. For "api" type: validates expect rules. func (m *Manager) probeHTTP(containerName string, check HealthCheckItem, target string) HealthCheckDetail { url := fmt.Sprintf("http://%s:%d%s", containerName, check.Port, check.Path) method := check.Method if method == "" { method = "GET" } start := time.Now() client := &http.Client{ Timeout: 5 * time.Second, CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, } req, err := http.NewRequest(method, url, nil) if err != nil { return HealthCheckDetail{ Type: check.Type, Target: target, Healthy: false, Error: fmt.Sprintf("bad request: %v", err), Latency: "0ms", } } resp, err := client.Do(req) latency := time.Since(start) detail := HealthCheckDetail{ Type: check.Type, Target: target, Latency: formatLatency(latency), } if err != nil { detail.Healthy = false detail.Error = err.Error() return detail } defer resp.Body.Close() detail.Status = resp.StatusCode // For "http" type, any response means the service is alive if check.Type == "http" { detail.Healthy = true return detail } // For "api" type, validate expectations if check.Expect == nil { // No expectations = just check for a response (same as http) detail.Healthy = true return detail } // Check expected status code if check.Expect.Status > 0 && resp.StatusCode != check.Expect.Status { detail.Healthy = false detail.Error = fmt.Sprintf("expected status %d, got %d", check.Expect.Status, resp.StatusCode) return detail } // Check expected body content if check.Expect.BodyContains != "" { body, err := io.ReadAll(io.LimitReader(resp.Body, 8192)) // read up to 8KB if err != nil { detail.Healthy = false detail.Error = fmt.Sprintf("reading body: %v", err) return detail } if !strings.Contains(string(body), check.Expect.BodyContains) { detail.Healthy = false detail.Error = fmt.Sprintf("body missing expected string %q", check.Expect.BodyContains) return detail } } detail.Healthy = true return detail } // findProbeContainer returns the container name to probe for a stack. // Prefers exact match with stack name, then prefix match (stack-service-N). func findProbeContainer(stackName string, containers []ContainerInfo) string { for _, c := range containers { if c.Name == stackName && (c.State == StateRunning || c.State == StateUnhealthy) { return c.Name } } // Fallback: first running container with matching prefix for _, c := range containers { if strings.HasPrefix(c.Name, stackName) && (c.State == StateRunning || c.State == StateUnhealthy) { return c.Name } } return "" } // parseInterval parses a duration string like "5m", "30s", "1h". // Returns 5 minutes as default if parsing fails. func parseInterval(s string) time.Duration { if s == "" { return 5 * time.Minute } d, err := time.ParseDuration(s) if err != nil { return 5 * time.Minute } return d } // formatLatency formats a duration as a human-readable latency string. func formatLatency(d time.Duration) string { if d < time.Millisecond { return fmt.Sprintf("%dµs", d.Microseconds()) } if d < time.Second { return fmt.Sprintf("%dms", d.Milliseconds()) } return fmt.Sprintf("%.1fs", d.Seconds()) } // methodOrEmpty returns the method string for logging, or empty for non-api checks. func methodOrEmpty(check HealthCheckItem) string { if check.Type == "api" && check.Method != "" { return check.Method } if check.Type == "api" { return "GET" } return "GET" }