2e9634e50f
- Clear HealthProbe on StartStack/RestartStack so stale unhealthy state isn't re-applied by RefreshStatus - Use 10s probe interval for unhealthy/new stacks (nil HealthProbe probes immediately on next tick), switch to normal 5m interval once healthy - Scheduler frequency 1m → 10s to support fast probing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
332 lines
8.5 KiB
Go
332 lines
8.5 KiB
Go
package stacks
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// probeTarget holds the info needed to probe a single stack.
|
|
type probeTarget struct {
|
|
stackName string
|
|
containerName string
|
|
checks []HealthCheckItem
|
|
}
|
|
|
|
// RunHealthProbes runs controller-side health probes for all running stacks
|
|
// that have healthcheck configuration and whose interval has elapsed.
|
|
// Called by the scheduler every minute.
|
|
func (m *Manager) RunHealthProbes() error {
|
|
// Phase 1: collect targets (under lock)
|
|
m.mu.RLock()
|
|
var targets []probeTarget
|
|
for name, stack := range m.stacks {
|
|
if stack.State != StateRunning && stack.State != StateUnhealthy {
|
|
continue
|
|
}
|
|
hc := stack.Meta.HealthCheck
|
|
if hc == nil || len(hc.Checks) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Check if interval has elapsed since last probe.
|
|
// Fast 10s probes until healthy, then normal interval (default 5m).
|
|
// When HealthProbe is nil (just started/restarted), probe immediately.
|
|
interval := parseInterval(hc.Interval)
|
|
if stack.HealthProbe != nil {
|
|
effectiveInterval := interval
|
|
if !stack.HealthProbe.Healthy {
|
|
effectiveInterval = 10 * time.Second
|
|
}
|
|
if time.Since(stack.HealthProbe.LastCheck) < effectiveInterval {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Find the main container to probe (matching stack name)
|
|
containerName := findProbeContainer(name, stack.Containers)
|
|
if containerName == "" {
|
|
continue
|
|
}
|
|
|
|
targets = append(targets, probeTarget{
|
|
stackName: name,
|
|
containerName: containerName,
|
|
checks: hc.Checks,
|
|
})
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
if len(targets) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Phase 2: run all probes concurrently (no lock held)
|
|
type probeResult struct {
|
|
stackName string
|
|
result *HealthProbeResult
|
|
}
|
|
|
|
results := make([]probeResult, len(targets))
|
|
var wg sync.WaitGroup
|
|
|
|
for i, t := range targets {
|
|
wg.Add(1)
|
|
go func(idx int, t probeTarget) {
|
|
defer wg.Done()
|
|
result := m.runChecks(t)
|
|
results[idx] = probeResult{stackName: t.stackName, result: result}
|
|
}(i, t)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Phase 3: apply results and log (under lock)
|
|
m.mu.Lock()
|
|
okCount, failCount := 0, 0
|
|
for _, pr := range results {
|
|
stack, ok := m.stacks[pr.stackName]
|
|
if !ok {
|
|
continue
|
|
}
|
|
stack.HealthProbe = pr.result
|
|
|
|
if pr.result.Healthy {
|
|
okCount++
|
|
// If Docker says running and probe is healthy, ensure state is running
|
|
// (clears a previous unhealthy override)
|
|
if stack.State == StateUnhealthy {
|
|
stack.State = StateRunning
|
|
}
|
|
} else {
|
|
failCount++
|
|
if stack.State == StateRunning {
|
|
stack.State = StateUnhealthy
|
|
}
|
|
}
|
|
}
|
|
m.mu.Unlock()
|
|
|
|
// Summary log
|
|
if failCount > 0 {
|
|
m.logger.Printf("[INFO] Health probes: %d ok, %d unhealthy (of %d probed)", okCount, failCount, len(targets))
|
|
} else if m.isDebug() {
|
|
m.logger.Printf("[DEBUG] Health probes: %d ok (of %d probed)", okCount, len(targets))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runChecks executes all health check items for a single stack target.
|
|
func (m *Manager) runChecks(t probeTarget) *HealthProbeResult {
|
|
result := &HealthProbeResult{
|
|
LastCheck: time.Now(),
|
|
Healthy: true,
|
|
}
|
|
|
|
for _, check := range t.checks {
|
|
detail := m.runSingleCheck(t.containerName, check)
|
|
result.Details = append(result.Details, detail)
|
|
|
|
if detail.Healthy {
|
|
if m.isDebug() {
|
|
if detail.Status > 0 {
|
|
m.logger.Printf("[DEBUG] Health probe %s: %s %s :%d%s → %d (%s)",
|
|
t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Status, detail.Latency)
|
|
} else {
|
|
m.logger.Printf("[DEBUG] Health probe %s: TCP :%d → ok (%s)",
|
|
t.stackName, check.Port, detail.Latency)
|
|
}
|
|
}
|
|
} else {
|
|
result.Healthy = false
|
|
m.logger.Printf("[WARN] Health probe %s: %s %s :%d%s → %s",
|
|
t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Error)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// runSingleCheck executes one health check item and returns the result.
|
|
func (m *Manager) runSingleCheck(containerName string, check HealthCheckItem) HealthCheckDetail {
|
|
target := fmt.Sprintf(":%d%s", check.Port, check.Path)
|
|
|
|
switch check.Type {
|
|
case "tcp":
|
|
return m.probeTCP(containerName, check.Port, target)
|
|
case "http", "api":
|
|
return m.probeHTTP(containerName, check, target)
|
|
default:
|
|
return HealthCheckDetail{
|
|
Type: check.Type,
|
|
Target: target,
|
|
Healthy: false,
|
|
Error: fmt.Sprintf("unknown check type: %s", check.Type),
|
|
}
|
|
}
|
|
}
|
|
|
|
// probeTCP tests if a TCP port is reachable on the container.
|
|
func (m *Manager) probeTCP(containerName string, port int, target string) HealthCheckDetail {
|
|
start := time.Now()
|
|
addr := net.JoinHostPort(containerName, fmt.Sprintf("%d", port))
|
|
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
|
|
latency := time.Since(start)
|
|
|
|
detail := HealthCheckDetail{
|
|
Type: "tcp",
|
|
Target: target,
|
|
Latency: formatLatency(latency),
|
|
}
|
|
|
|
if err != nil {
|
|
detail.Healthy = false
|
|
detail.Error = err.Error()
|
|
} else {
|
|
conn.Close()
|
|
detail.Healthy = true
|
|
}
|
|
return detail
|
|
}
|
|
|
|
// probeHTTP makes an HTTP request to the container and evaluates the result.
|
|
// For "http" type: any response = healthy. For "api" type: validates expect rules.
|
|
func (m *Manager) probeHTTP(containerName string, check HealthCheckItem, target string) HealthCheckDetail {
|
|
url := fmt.Sprintf("http://%s:%d%s", containerName, check.Port, check.Path)
|
|
method := check.Method
|
|
if method == "" {
|
|
method = "GET"
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
client := &http.Client{
|
|
Timeout: 5 * time.Second,
|
|
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
|
return http.ErrUseLastResponse
|
|
},
|
|
}
|
|
|
|
req, err := http.NewRequest(method, url, nil)
|
|
if err != nil {
|
|
return HealthCheckDetail{
|
|
Type: check.Type,
|
|
Target: target,
|
|
Healthy: false,
|
|
Error: fmt.Sprintf("bad request: %v", err),
|
|
Latency: "0ms",
|
|
}
|
|
}
|
|
|
|
resp, err := client.Do(req)
|
|
latency := time.Since(start)
|
|
|
|
detail := HealthCheckDetail{
|
|
Type: check.Type,
|
|
Target: target,
|
|
Latency: formatLatency(latency),
|
|
}
|
|
|
|
if err != nil {
|
|
detail.Healthy = false
|
|
detail.Error = err.Error()
|
|
return detail
|
|
}
|
|
defer resp.Body.Close()
|
|
detail.Status = resp.StatusCode
|
|
|
|
// For "http" type, any response means the service is alive
|
|
if check.Type == "http" {
|
|
detail.Healthy = true
|
|
return detail
|
|
}
|
|
|
|
// For "api" type, validate expectations
|
|
if check.Expect == nil {
|
|
// No expectations = just check for a response (same as http)
|
|
detail.Healthy = true
|
|
return detail
|
|
}
|
|
|
|
// Check expected status code
|
|
if check.Expect.Status > 0 && resp.StatusCode != check.Expect.Status {
|
|
detail.Healthy = false
|
|
detail.Error = fmt.Sprintf("expected status %d, got %d", check.Expect.Status, resp.StatusCode)
|
|
return detail
|
|
}
|
|
|
|
// Check expected body content
|
|
if check.Expect.BodyContains != "" {
|
|
body, err := io.ReadAll(io.LimitReader(resp.Body, 8192)) // read up to 8KB
|
|
if err != nil {
|
|
detail.Healthy = false
|
|
detail.Error = fmt.Sprintf("reading body: %v", err)
|
|
return detail
|
|
}
|
|
if !strings.Contains(string(body), check.Expect.BodyContains) {
|
|
detail.Healthy = false
|
|
detail.Error = fmt.Sprintf("body missing expected string %q", check.Expect.BodyContains)
|
|
return detail
|
|
}
|
|
}
|
|
|
|
detail.Healthy = true
|
|
return detail
|
|
}
|
|
|
|
// findProbeContainer returns the container name to probe for a stack.
|
|
// Prefers exact match with stack name, then prefix match (stack-service-N).
|
|
func findProbeContainer(stackName string, containers []ContainerInfo) string {
|
|
for _, c := range containers {
|
|
if c.Name == stackName && (c.State == StateRunning || c.State == StateUnhealthy) {
|
|
return c.Name
|
|
}
|
|
}
|
|
// Fallback: first running container with matching prefix
|
|
for _, c := range containers {
|
|
if strings.HasPrefix(c.Name, stackName) && (c.State == StateRunning || c.State == StateUnhealthy) {
|
|
return c.Name
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// parseInterval parses a duration string like "5m", "30s", "1h".
|
|
// Returns 5 minutes as default if parsing fails.
|
|
func parseInterval(s string) time.Duration {
|
|
if s == "" {
|
|
return 5 * time.Minute
|
|
}
|
|
d, err := time.ParseDuration(s)
|
|
if err != nil {
|
|
return 5 * time.Minute
|
|
}
|
|
return d
|
|
}
|
|
|
|
// formatLatency formats a duration as a human-readable latency string.
|
|
func formatLatency(d time.Duration) string {
|
|
if d < time.Millisecond {
|
|
return fmt.Sprintf("%dµs", d.Microseconds())
|
|
}
|
|
if d < time.Second {
|
|
return fmt.Sprintf("%dms", d.Milliseconds())
|
|
}
|
|
return fmt.Sprintf("%.1fs", d.Seconds())
|
|
}
|
|
|
|
// methodOrEmpty returns the method string for logging, or empty for non-api checks.
|
|
func methodOrEmpty(check HealthCheckItem) string {
|
|
if check.Type == "api" && check.Method != "" {
|
|
return check.Method
|
|
}
|
|
if check.Type == "api" {
|
|
return "GET"
|
|
}
|
|
return "GET"
|
|
}
|