Files
felhom-controller/controller/internal/scheduler/scheduler.go
T
admin 95c821deb2 feat: comprehensive debug logging across all controller modules
Add detailed [DEBUG] logging to every controller module when
logging.level is set to "debug". Each module with stateful debug
uses SetDebug(bool) wired from main.go. Covers stacks, backup,
cloudflare, integrations, system, monitor, settings, scheduler,
web handlers, storage, metrics, API, selfupdate, and assets.

Also includes the app export/import (.fab bundles) feature from
v0.32.0 and its debug page integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 18:14:43 +01:00

340 lines
8.1 KiB
Go

package scheduler
import (
"context"
"fmt"
"log"
"sync"
"time"
)
var (
budapestLoc *time.Location
budapestLocOnce sync.Once
)
func getBudapestLocation() *time.Location {
budapestLocOnce.Do(func() {
loc, err := time.LoadLocation("Europe/Budapest")
if err != nil {
log.Printf("[ERROR] Cannot load Europe/Budapest timezone: %v — using UTC", err)
loc = time.UTC
}
budapestLoc = loc
})
return budapestLoc
}
// JobFunc is the function signature for scheduler jobs.
type JobFunc func(ctx context.Context) error
// Job represents a scheduled task.
type Job struct {
Name string
Fn JobFunc
Interval time.Duration // for periodic jobs (every N)
Schedule string // for daily jobs ("02:30", "03:00") — mutually exclusive with Interval
LastRun time.Time
LastErr error
Running bool
}
// Scheduler manages periodic and daily jobs.
type Scheduler struct {
mu sync.Mutex
jobs []*Job
logger *log.Logger
debug bool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
started bool
}
// SetDebug enables or disables debug logging.
func (s *Scheduler) SetDebug(on bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.debug = on
}
func (s *Scheduler) dbg(format string, args ...interface{}) {
if s.debug {
s.logger.Printf("[DEBUG] [sched] "+format, args...)
}
}
// New creates a new Scheduler.
func New(logger *log.Logger) *Scheduler {
return &Scheduler{
logger: logger,
}
}
// Every registers a periodic job that runs every interval.
// If the scheduler is already started, the job's goroutine is launched immediately.
func (s *Scheduler) Every(name string, interval time.Duration, fn JobFunc) {
if interval <= 0 {
s.logger.Printf("[ERROR] Periodic job %s has invalid interval %s — job not registered", name, interval)
return
}
s.mu.Lock()
defer s.mu.Unlock()
job := &Job{
Name: name,
Fn: fn,
Interval: interval,
}
s.jobs = append(s.jobs, job)
s.logger.Printf("[SCHED] Registered periodic job: %s (every %s)", name, interval)
s.dbg("periodic job registered: name=%q interval=%s totalJobs=%d", name, interval, len(s.jobs))
if s.started {
s.wg.Add(1)
go s.runPeriodicJob(job)
}
}
// Daily registers a job that runs once per day at the specified time (HH:MM) in Europe/Budapest timezone.
// If the scheduler is already started, the job's goroutine is launched immediately.
func (s *Scheduler) Daily(name string, timeStr string, fn JobFunc) {
s.mu.Lock()
defer s.mu.Unlock()
// Validate time format
if _, _, err := parseDailyTime(timeStr); err != nil {
s.logger.Printf("[ERROR] Daily job %s has invalid schedule %q: %v — job not started", name, timeStr, err)
return
}
job := &Job{
Name: name,
Fn: fn,
Schedule: timeStr,
}
s.jobs = append(s.jobs, job)
nextRun := nextDailyRun(timeStr)
s.logger.Printf("[SCHED] Daily job %s scheduled for %s", name, nextRun.Format("2006-01-02 15:04 MST"))
s.dbg("daily job registered: name=%q schedule=%q nextRun=%s totalJobs=%d", name, timeStr, nextRun.Format(time.RFC3339), len(s.jobs))
if s.started {
s.wg.Add(1)
go s.runDailyJob(job)
}
}
// Start begins running all registered jobs. Safe to call only once.
func (s *Scheduler) Start(ctx context.Context) {
s.mu.Lock()
if s.cancel != nil {
s.mu.Unlock()
s.logger.Println("[WARN] Scheduler already started — ignoring duplicate Start()")
return
}
s.ctx, s.cancel = context.WithCancel(ctx)
s.started = true
for _, job := range s.jobs {
if job.Interval > 0 {
s.wg.Add(1)
go s.runPeriodicJob(job)
} else if job.Schedule != "" {
s.wg.Add(1)
go s.runDailyJob(job)
}
}
s.logger.Printf("[SCHED] Scheduler started with %d jobs", len(s.jobs))
s.dbg("scheduler started: periodic=%d daily=%d", func() int {
n := 0
for _, j := range s.jobs {
if j.Interval > 0 {
n++
}
}
return n
}(), func() int {
n := 0
for _, j := range s.jobs {
if j.Schedule != "" {
n++
}
}
return n
}())
s.mu.Unlock()
}
// Stop cancels all jobs and waits for them to finish (30s timeout).
func (s *Scheduler) Stop() {
s.mu.Lock()
cancel := s.cancel
s.mu.Unlock()
if cancel != nil {
cancel()
}
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
s.logger.Println("[SCHED] All jobs stopped")
case <-time.After(30 * time.Second):
s.logger.Println("[WARN] Scheduler stop timed out after 30s — some jobs may still be running")
}
}
// GetJobs returns a snapshot of all jobs (copies, not pointers).
func (s *Scheduler) GetJobs() []Job {
s.mu.Lock()
defer s.mu.Unlock()
result := make([]Job, len(s.jobs))
for i, j := range s.jobs {
result[i] = *j
}
return result
}
func (s *Scheduler) runPeriodicJob(job *Job) {
defer s.wg.Done()
// Quiet mode: jobs with interval <= 30s only log failures
quiet := job.Interval <= 30*time.Second
ticker := time.NewTicker(job.Interval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.executeJob(job, quiet)
}
}
}
func (s *Scheduler) runDailyJob(job *Job) {
defer s.wg.Done()
for {
nextRun := nextDailyRun(job.Schedule)
waitDuration := time.Until(nextRun)
if waitDuration < 0 {
waitDuration = 0
}
s.dbg("daily job %s: next run at %s (waiting %s)", job.Name, nextRun.Format("2006-01-02 15:04:05 MST"), waitDuration.Round(time.Second))
timer := time.NewTimer(waitDuration)
select {
case <-s.ctx.Done():
timer.Stop()
s.dbg("daily job %s: context cancelled, stopping", job.Name)
return
case <-timer.C:
s.executeJob(job, false)
}
}
}
func (s *Scheduler) executeJob(job *Job, quiet bool) {
s.mu.Lock()
if job.Running {
s.mu.Unlock()
s.logger.Printf("[WARN] Job %s still running, skipping", job.Name)
return
}
job.Running = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
job.Running = false
s.mu.Unlock()
}()
// Panic recovery
defer func() {
if r := recover(); r != nil {
s.mu.Lock()
job.LastErr = fmt.Errorf("panic: %v", r)
job.LastRun = time.Now()
s.mu.Unlock()
s.logger.Printf("[ERROR] Job %s panicked: %v", job.Name, r)
}
}()
if !quiet {
s.logger.Printf("[SCHED] Running job: %s", job.Name)
}
s.dbg("job %s: execution starting", job.Name)
start := time.Now()
err := job.Fn(s.ctx)
elapsed := time.Since(start)
s.mu.Lock()
job.LastRun = time.Now()
job.LastErr = err
s.mu.Unlock()
if err != nil {
s.logger.Printf("[WARN] Job %s failed: %v (took %s)", job.Name, err, elapsed.Round(time.Millisecond))
s.dbg("job %s: failed after %s: %v", job.Name, elapsed.Round(time.Millisecond), err)
} else if !quiet {
s.logger.Printf("[SCHED] Job %s completed (took %s)", job.Name, elapsed.Round(time.Millisecond))
}
s.dbg("job %s: finished in %s (err=%v)", job.Name, elapsed.Round(time.Millisecond), err)
}
// parseDailyTime parses "HH:MM" and returns hour and minute.
func parseDailyTime(timeStr string) (int, int, error) {
var hour, min int
n, err := fmt.Sscanf(timeStr, "%d:%d", &hour, &min)
if err != nil || n != 2 {
return 0, 0, fmt.Errorf("expected HH:MM format, got %q", timeStr)
}
if hour < 0 || hour > 23 || min < 0 || min > 59 {
return 0, 0, fmt.Errorf("invalid time %q: hour must be 0-23, minute 0-59", timeStr)
}
return hour, min, nil
}
// NextDailyRun is the exported version of nextDailyRun for external callers.
func NextDailyRun(timeStr string) time.Time {
return nextDailyRun(timeStr)
}
// nextDailyRun calculates the next occurrence of the daily schedule in Europe/Budapest timezone.
func nextDailyRun(timeStr string) time.Time {
hour, min, err := parseDailyTime(timeStr)
if err != nil {
// Should not happen — validated at registration
return time.Now().Add(24 * time.Hour)
}
loc := getBudapestLocation()
now := time.Now().In(loc)
next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, loc)
// If the time has already passed today, schedule for tomorrow.
// Use time.Date with day+1 instead of Add(24h) to correctly handle DST transitions
// (spring forward/fall back in Europe/Budapest would shift by 1 hour with Add(24h)).
if !next.After(now) {
next = time.Date(now.Year(), now.Month(), now.Day()+1, hour, min, 0, 0, loc)
}
return next
}