Files
deploy-felhom-compose/controller/internal/scheduler/scheduler.go
T
admin 45f75a916c fix: P2+P3 bug fixes, hardening, and cleanup (18 files)
Bug fixes:
- Add applyEnvOverrides to LoadFromBytes (M05)
- Set state=failed on compose-up failure in selfupdate (M16)
- Clamp usableMB to min 0 in memory check (M22)
- Remove "manual" schedule from triggerAllCrossBackups (M23)
- Add mmcblk device handling for partition paths (M21)
- Fix stripPartition for mmcblk devices (L25)
- Fix TruncateStr for UTF-8 and negative maxLen (L05/L06)
- Fix AllDone to return false for empty restore plans (L14)
- Fix PushOnce to return actual errors (L39)
- Restore pending events on save failure in DrainPendingEvents (M03)
- Add duplicate check in AddStoragePath (M04)
- Call CleanupTempMounts after drive scan (H13)
- Log SetStep save errors (M25)

Hardening:
- Guard scheduler Start() against double-start (M14)
- Acquire mutex in scheduler Stop() before reading cancel (L24)
- Cap log lines parameter to 10000 (L31)
- Require POST for logout (L32)
- Use sync.Once for Server.Close() (L49)
- Panic on crypto/rand.Read failure in setup CSRF (L40)
- Validate Bearer token against Hub API key in CSRF (H16 fix)
- Replace custom hasPrefix with strings.HasPrefix (L13)
- Replace simpleHash with crc32.ChecksumIEEE (L48)

Cleanup:
- Remove dead imageName function (L02)
- Remove dead detectHostIPViaRoute function (L03)
- Rename shadowed copy variable to cp (L07)
- Copy DefaultEnabledEvents in GetNotificationPrefs early return (L09)
- Update BUGHUNT.md with comprehensive audit results

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 13:47:52 +01:00

285 lines
6.5 KiB
Go

package scheduler
import (
"context"
"fmt"
"log"
"sync"
"time"
)
var (
budapestLoc *time.Location
budapestLocOnce sync.Once
)
func getBudapestLocation() *time.Location {
budapestLocOnce.Do(func() {
loc, err := time.LoadLocation("Europe/Budapest")
if err != nil {
log.Printf("[ERROR] Cannot load Europe/Budapest timezone: %v — using UTC", err)
loc = time.UTC
}
budapestLoc = loc
})
return budapestLoc
}
// JobFunc is the function signature for scheduler jobs.
type JobFunc func(ctx context.Context) error
// Job represents a scheduled task.
type Job struct {
Name string
Fn JobFunc
Interval time.Duration // for periodic jobs (every N)
Schedule string // for daily jobs ("02:30", "03:00") — mutually exclusive with Interval
LastRun time.Time
LastErr error
Running bool
}
// Scheduler manages periodic and daily jobs.
type Scheduler struct {
mu sync.Mutex
jobs []*Job
logger *log.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// New creates a new Scheduler.
func New(logger *log.Logger) *Scheduler {
return &Scheduler{
logger: logger,
}
}
// Every registers a periodic job that runs every interval.
func (s *Scheduler) Every(name string, interval time.Duration, fn JobFunc) {
if interval <= 0 {
s.logger.Printf("[ERROR] Periodic job %s has invalid interval %s — job not registered", name, interval)
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.jobs = append(s.jobs, &Job{
Name: name,
Fn: fn,
Interval: interval,
})
s.logger.Printf("[SCHED] Registered periodic job: %s (every %s)", name, interval)
}
// Daily registers a job that runs once per day at the specified time (HH:MM) in Europe/Budapest timezone.
func (s *Scheduler) Daily(name string, timeStr string, fn JobFunc) {
s.mu.Lock()
defer s.mu.Unlock()
// Validate time format
if _, _, err := parseDailyTime(timeStr); err != nil {
s.logger.Printf("[ERROR] Daily job %s has invalid schedule %q: %v — job not started", name, timeStr, err)
return
}
s.jobs = append(s.jobs, &Job{
Name: name,
Fn: fn,
Schedule: timeStr,
})
nextRun := nextDailyRun(timeStr)
s.logger.Printf("[SCHED] Daily job %s scheduled for %s", name, nextRun.Format("2006-01-02 15:04 MST"))
}
// Start begins running all registered jobs. Safe to call only once.
func (s *Scheduler) Start(ctx context.Context) {
s.mu.Lock()
if s.cancel != nil {
s.mu.Unlock()
s.logger.Println("[WARN] Scheduler already started — ignoring duplicate Start()")
return
}
s.ctx, s.cancel = context.WithCancel(ctx)
for _, job := range s.jobs {
if job.Interval > 0 {
s.wg.Add(1)
go s.runPeriodicJob(job)
} else if job.Schedule != "" {
s.wg.Add(1)
go s.runDailyJob(job)
}
}
s.logger.Printf("[SCHED] Scheduler started with %d jobs", len(s.jobs))
s.mu.Unlock()
}
// Stop cancels all jobs and waits for them to finish (30s timeout).
func (s *Scheduler) Stop() {
s.mu.Lock()
cancel := s.cancel
s.mu.Unlock()
if cancel != nil {
cancel()
}
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
s.logger.Println("[SCHED] All jobs stopped")
case <-time.After(30 * time.Second):
s.logger.Println("[WARN] Scheduler stop timed out after 30s — some jobs may still be running")
}
}
// GetJobs returns a snapshot of all jobs (copies, not pointers).
func (s *Scheduler) GetJobs() []Job {
s.mu.Lock()
defer s.mu.Unlock()
result := make([]Job, len(s.jobs))
for i, j := range s.jobs {
result[i] = *j
}
return result
}
func (s *Scheduler) runPeriodicJob(job *Job) {
defer s.wg.Done()
// Quiet mode: jobs with interval <= 30s only log failures
quiet := job.Interval <= 30*time.Second
ticker := time.NewTicker(job.Interval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.executeJob(job, quiet)
}
}
}
func (s *Scheduler) runDailyJob(job *Job) {
defer s.wg.Done()
for {
nextRun := nextDailyRun(job.Schedule)
waitDuration := time.Until(nextRun)
if waitDuration < 0 {
waitDuration = 0
}
timer := time.NewTimer(waitDuration)
select {
case <-s.ctx.Done():
timer.Stop()
return
case <-timer.C:
s.executeJob(job, false)
}
}
}
func (s *Scheduler) executeJob(job *Job, quiet bool) {
s.mu.Lock()
if job.Running {
s.mu.Unlock()
s.logger.Printf("[WARN] Job %s still running, skipping", job.Name)
return
}
job.Running = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
job.Running = false
s.mu.Unlock()
}()
// Panic recovery
defer func() {
if r := recover(); r != nil {
s.mu.Lock()
job.LastErr = fmt.Errorf("panic: %v", r)
job.LastRun = time.Now()
s.mu.Unlock()
s.logger.Printf("[ERROR] Job %s panicked: %v", job.Name, r)
}
}()
if !quiet {
s.logger.Printf("[SCHED] Running job: %s", job.Name)
}
start := time.Now()
err := job.Fn(s.ctx)
elapsed := time.Since(start)
s.mu.Lock()
job.LastRun = time.Now()
job.LastErr = err
s.mu.Unlock()
if err != nil {
s.logger.Printf("[WARN] Job %s failed: %v (took %s)", job.Name, err, elapsed.Round(time.Millisecond))
} else if !quiet {
s.logger.Printf("[SCHED] Job %s completed (took %s)", job.Name, elapsed.Round(time.Millisecond))
}
}
// parseDailyTime parses "HH:MM" and returns hour and minute.
func parseDailyTime(timeStr string) (int, int, error) {
var hour, min int
n, err := fmt.Sscanf(timeStr, "%d:%d", &hour, &min)
if err != nil || n != 2 {
return 0, 0, fmt.Errorf("expected HH:MM format, got %q", timeStr)
}
if hour < 0 || hour > 23 || min < 0 || min > 59 {
return 0, 0, fmt.Errorf("invalid time %q: hour must be 0-23, minute 0-59", timeStr)
}
return hour, min, nil
}
// NextDailyRun is the exported version of nextDailyRun for external callers.
func NextDailyRun(timeStr string) time.Time {
return nextDailyRun(timeStr)
}
// nextDailyRun calculates the next occurrence of the daily schedule in Europe/Budapest timezone.
func nextDailyRun(timeStr string) time.Time {
hour, min, err := parseDailyTime(timeStr)
if err != nil {
// Should not happen — validated at registration
return time.Now().Add(24 * time.Hour)
}
loc := getBudapestLocation()
now := time.Now().In(loc)
next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, loc)
// If the time has already passed today, schedule for tomorrow.
// Use time.Date with day+1 instead of Add(24h) to correctly handle DST transitions
// (spring forward/fall back in Europe/Budapest would shift by 1 hour with Add(24h)).
if !next.After(now) {
next = time.Date(now.Year(), now.Month(), now.Day()+1, hour, min, 0, 0, loc)
}
return next
}