Files
admin 8e61cd7ec4 feat: comprehensive INFO/WARN/ERROR logging across all controller modules
Add structured operational logging at INFO, WARN, and ERROR levels to
every controller module. Standardize custom prefixes ([GEO], [SCHED],
[SYNC]) to use [INFO/WARN/ERROR] [module] format. Fix misleveled logs
(WARN->ERROR for data loss scenarios, WARN->INFO for routine operations).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 19:58:27 +01:00

346 lines
9.1 KiB
Go

package stacks
import (
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
"time"
)
// probeTarget holds the info needed to probe a single stack.
type probeTarget struct {
stackName string
containerName string
checks []HealthCheckItem
}
// RunHealthProbes runs controller-side health probes for all running stacks
// that have healthcheck configuration and whose interval has elapsed.
// Called by the scheduler every minute.
func (m *Manager) RunHealthProbes() error {
// Phase 1: collect targets (under lock)
m.mu.RLock()
var targets []probeTarget
skippedNotDue := 0
skippedNoContainer := 0
for name, stack := range m.stacks {
if stack.State != StateRunning && stack.State != StateUnhealthy {
continue
}
hc := stack.Meta.HealthCheck
if hc == nil || len(hc.Checks) == 0 {
continue
}
// Check if interval has elapsed since last probe.
// Fast 10s probes until healthy, then normal interval (default 5m).
// When HealthProbe is nil (just started/restarted), probe immediately.
interval := parseInterval(hc.Interval)
if stack.HealthProbe != nil {
effectiveInterval := interval
if !stack.HealthProbe.Healthy {
effectiveInterval = 10 * time.Second
}
if time.Since(stack.HealthProbe.LastCheck) < effectiveInterval {
skippedNotDue++
if m.isDebug() {
sinceLastCheck := time.Since(stack.HealthProbe.LastCheck).Round(time.Second)
m.logger.Printf("[DEBUG] [stacks] RunHealthProbes: skipping %s — last check %s ago, effective interval %s, healthy=%v",
name, sinceLastCheck, effectiveInterval, stack.HealthProbe.Healthy)
}
continue
}
}
// Find the main container to probe (matching stack name)
containerName := findProbeContainer(name, stack.Containers)
if containerName == "" {
skippedNoContainer++
continue
}
targets = append(targets, probeTarget{
stackName: name,
containerName: containerName,
checks: hc.Checks,
})
}
m.mu.RUnlock()
if m.isDebug() {
m.logger.Printf("[DEBUG] [stacks] RunHealthProbes: collected %d targets (%d skipped not due, %d skipped no container)",
len(targets), skippedNotDue, skippedNoContainer)
}
if len(targets) == 0 {
return nil
}
// Phase 2: run all probes concurrently (no lock held)
type probeResult struct {
stackName string
result *HealthProbeResult
}
results := make([]probeResult, len(targets))
var wg sync.WaitGroup
for i, t := range targets {
wg.Add(1)
go func(idx int, t probeTarget) {
defer wg.Done()
result := m.runChecks(t)
results[idx] = probeResult{stackName: t.stackName, result: result}
}(i, t)
}
wg.Wait()
// Phase 3: apply results and log (under lock)
m.mu.Lock()
okCount, failCount := 0, 0
for _, pr := range results {
stack, ok := m.stacks[pr.stackName]
if !ok {
continue
}
stack.HealthProbe = pr.result
if pr.result.Healthy {
okCount++
// If Docker says running and probe is healthy, ensure state is running
// (clears a previous unhealthy override)
if stack.State == StateUnhealthy {
stack.State = StateRunning
}
} else {
failCount++
if stack.State == StateRunning {
stack.State = StateUnhealthy
}
}
}
m.mu.Unlock()
// Summary log
if failCount > 0 {
m.logger.Printf("[WARN] Health probes: %d ok, %d unhealthy (of %d probed)", okCount, failCount, len(targets))
} else {
m.logger.Printf("[INFO] Health probes: %d ok (of %d probed)", okCount, len(targets))
}
return nil
}
// runChecks executes all health check items for a single stack target.
func (m *Manager) runChecks(t probeTarget) *HealthProbeResult {
result := &HealthProbeResult{
LastCheck: time.Now(),
Healthy: true,
}
for _, check := range t.checks {
detail := m.runSingleCheck(t.containerName, check)
result.Details = append(result.Details, detail)
if detail.Healthy {
if m.isDebug() {
if detail.Status > 0 {
m.logger.Printf("[DEBUG] Health probe %s: %s %s :%d%s → %d (%s)",
t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Status, detail.Latency)
} else {
m.logger.Printf("[DEBUG] Health probe %s: TCP :%d → ok (%s)",
t.stackName, check.Port, detail.Latency)
}
}
} else {
result.Healthy = false
m.logger.Printf("[WARN] Health probe %s: %s %s :%d%s → %s",
t.stackName, strings.ToUpper(check.Type), methodOrEmpty(check), check.Port, check.Path, detail.Error)
}
}
return result
}
// runSingleCheck executes one health check item and returns the result.
func (m *Manager) runSingleCheck(containerName string, check HealthCheckItem) HealthCheckDetail {
target := fmt.Sprintf(":%d%s", check.Port, check.Path)
switch check.Type {
case "tcp":
return m.probeTCP(containerName, check.Port, target)
case "http", "api":
return m.probeHTTP(containerName, check, target)
default:
return HealthCheckDetail{
Type: check.Type,
Target: target,
Healthy: false,
Error: fmt.Sprintf("unknown check type: %s", check.Type),
}
}
}
// probeTCP tests if a TCP port is reachable on the container.
func (m *Manager) probeTCP(containerName string, port int, target string) HealthCheckDetail {
start := time.Now()
addr := net.JoinHostPort(containerName, fmt.Sprintf("%d", port))
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
latency := time.Since(start)
detail := HealthCheckDetail{
Type: "tcp",
Target: target,
Latency: formatLatency(latency),
}
if err != nil {
detail.Healthy = false
detail.Error = err.Error()
} else {
conn.Close()
detail.Healthy = true
}
return detail
}
// probeHTTP makes an HTTP request to the container and evaluates the result.
// For "http" type: any response = healthy. For "api" type: validates expect rules.
func (m *Manager) probeHTTP(containerName string, check HealthCheckItem, target string) HealthCheckDetail {
url := fmt.Sprintf("http://%s:%d%s", containerName, check.Port, check.Path)
method := check.Method
if method == "" {
method = "GET"
}
start := time.Now()
client := &http.Client{
Timeout: 5 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
req, err := http.NewRequest(method, url, nil)
if err != nil {
return HealthCheckDetail{
Type: check.Type,
Target: target,
Healthy: false,
Error: fmt.Sprintf("bad request: %v", err),
Latency: "0ms",
}
}
resp, err := client.Do(req)
latency := time.Since(start)
detail := HealthCheckDetail{
Type: check.Type,
Target: target,
Latency: formatLatency(latency),
}
if err != nil {
detail.Healthy = false
detail.Error = err.Error()
return detail
}
defer resp.Body.Close()
detail.Status = resp.StatusCode
// For "http" type, any response means the service is alive
if check.Type == "http" {
detail.Healthy = true
return detail
}
// For "api" type, validate expectations
if check.Expect == nil {
// No expectations = just check for a response (same as http)
detail.Healthy = true
return detail
}
// Check expected status code
if check.Expect.Status > 0 && resp.StatusCode != check.Expect.Status {
detail.Healthy = false
detail.Error = fmt.Sprintf("expected status %d, got %d", check.Expect.Status, resp.StatusCode)
return detail
}
// Check expected body content
if check.Expect.BodyContains != "" {
body, err := io.ReadAll(io.LimitReader(resp.Body, 8192)) // read up to 8KB
if err != nil {
detail.Healthy = false
detail.Error = fmt.Sprintf("reading body: %v", err)
return detail
}
if !strings.Contains(string(body), check.Expect.BodyContains) {
detail.Healthy = false
detail.Error = fmt.Sprintf("body missing expected string %q", check.Expect.BodyContains)
return detail
}
}
detail.Healthy = true
return detail
}
// findProbeContainer returns the container name to probe for a stack.
// Prefers exact match with stack name, then prefix match (stack-service-N).
func findProbeContainer(stackName string, containers []ContainerInfo) string {
for _, c := range containers {
if c.Name == stackName && (c.State == StateRunning || c.State == StateUnhealthy) {
return c.Name
}
}
// Fallback: first running container with matching prefix
for _, c := range containers {
if strings.HasPrefix(c.Name, stackName) && (c.State == StateRunning || c.State == StateUnhealthy) {
return c.Name
}
}
return ""
}
// parseInterval parses a duration string like "5m", "30s", "1h".
// Returns 5 minutes as default if parsing fails.
func parseInterval(s string) time.Duration {
if s == "" {
return 5 * time.Minute
}
d, err := time.ParseDuration(s)
if err != nil {
return 5 * time.Minute
}
return d
}
// formatLatency formats a duration as a human-readable latency string.
func formatLatency(d time.Duration) string {
if d < time.Millisecond {
return fmt.Sprintf("%dµs", d.Microseconds())
}
if d < time.Second {
return fmt.Sprintf("%dms", d.Milliseconds())
}
return fmt.Sprintf("%.1fs", d.Seconds())
}
// methodOrEmpty returns the method string for logging, or empty for non-api checks.
func methodOrEmpty(check HealthCheckItem) string {
if check.Type == "api" && check.Method != "" {
return check.Method
}
if check.Type == "api" {
return "GET"
}
return "GET"
}