8e61cd7ec4
Add structured operational logging at INFO, WARN, and ERROR levels to every controller module. Standardize custom prefixes ([GEO], [SCHED], [SYNC]) to use [INFO/WARN/ERROR] [module] format. Fix misleveled logs (WARN->ERROR for data loss scenarios, WARN->INFO for routine operations). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
341 lines
8.3 KiB
Go
341 lines
8.3 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
budapestLoc *time.Location
|
|
budapestLocOnce sync.Once
|
|
)
|
|
|
|
func getBudapestLocation() *time.Location {
|
|
budapestLocOnce.Do(func() {
|
|
loc, err := time.LoadLocation("Europe/Budapest")
|
|
if err != nil {
|
|
log.Printf("[ERROR] Cannot load Europe/Budapest timezone: %v — using UTC", err)
|
|
loc = time.UTC
|
|
}
|
|
budapestLoc = loc
|
|
})
|
|
return budapestLoc
|
|
}
|
|
|
|
// JobFunc is the function signature for scheduler jobs.
|
|
type JobFunc func(ctx context.Context) error
|
|
|
|
// Job represents a scheduled task.
|
|
type Job struct {
|
|
Name string
|
|
Fn JobFunc
|
|
Interval time.Duration // for periodic jobs (every N)
|
|
Schedule string // for daily jobs ("02:30", "03:00") — mutually exclusive with Interval
|
|
LastRun time.Time
|
|
LastErr error
|
|
Running bool
|
|
}
|
|
|
|
// Scheduler manages periodic and daily jobs.
|
|
type Scheduler struct {
|
|
mu sync.Mutex
|
|
jobs []*Job
|
|
logger *log.Logger
|
|
debug bool
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
started bool
|
|
}
|
|
|
|
// SetDebug enables or disables debug logging.
|
|
func (s *Scheduler) SetDebug(on bool) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.debug = on
|
|
}
|
|
|
|
func (s *Scheduler) dbg(format string, args ...interface{}) {
|
|
if s.debug {
|
|
s.logger.Printf("[DEBUG] [sched] "+format, args...)
|
|
}
|
|
}
|
|
|
|
// New creates a new Scheduler.
|
|
func New(logger *log.Logger) *Scheduler {
|
|
return &Scheduler{
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Every registers a periodic job that runs every interval.
|
|
// If the scheduler is already started, the job's goroutine is launched immediately.
|
|
func (s *Scheduler) Every(name string, interval time.Duration, fn JobFunc) {
|
|
if interval <= 0 {
|
|
s.logger.Printf("[ERROR] [scheduler] Periodic job %s has invalid interval %s — job not registered", name, interval)
|
|
return
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
job := &Job{
|
|
Name: name,
|
|
Fn: fn,
|
|
Interval: interval,
|
|
}
|
|
s.jobs = append(s.jobs, job)
|
|
s.logger.Printf("[INFO] [scheduler] Registered periodic job: %s (every %s)", name, interval)
|
|
s.dbg("periodic job registered: name=%q interval=%s totalJobs=%d", name, interval, len(s.jobs))
|
|
|
|
if s.started {
|
|
s.wg.Add(1)
|
|
go s.runPeriodicJob(job)
|
|
}
|
|
}
|
|
|
|
// Daily registers a job that runs once per day at the specified time (HH:MM) in Europe/Budapest timezone.
|
|
// If the scheduler is already started, the job's goroutine is launched immediately.
|
|
func (s *Scheduler) Daily(name string, timeStr string, fn JobFunc) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Validate time format
|
|
if _, _, err := parseDailyTime(timeStr); err != nil {
|
|
s.logger.Printf("[ERROR] [scheduler] Daily job %s has invalid schedule %q: %v — job not started", name, timeStr, err)
|
|
return
|
|
}
|
|
|
|
job := &Job{
|
|
Name: name,
|
|
Fn: fn,
|
|
Schedule: timeStr,
|
|
}
|
|
s.jobs = append(s.jobs, job)
|
|
|
|
nextRun := nextDailyRun(timeStr)
|
|
s.logger.Printf("[INFO] [scheduler] Daily job %s scheduled for %s", name, nextRun.Format("2006-01-02 15:04 MST"))
|
|
s.dbg("daily job registered: name=%q schedule=%q nextRun=%s totalJobs=%d", name, timeStr, nextRun.Format(time.RFC3339), len(s.jobs))
|
|
|
|
if s.started {
|
|
s.wg.Add(1)
|
|
go s.runDailyJob(job)
|
|
}
|
|
}
|
|
|
|
// Start begins running all registered jobs. Safe to call only once.
|
|
func (s *Scheduler) Start(ctx context.Context) {
|
|
s.mu.Lock()
|
|
if s.cancel != nil {
|
|
s.mu.Unlock()
|
|
s.logger.Println("[WARN] [scheduler] Scheduler already started — ignoring duplicate Start()")
|
|
return
|
|
}
|
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
|
s.started = true
|
|
|
|
for _, job := range s.jobs {
|
|
if job.Interval > 0 {
|
|
s.wg.Add(1)
|
|
go s.runPeriodicJob(job)
|
|
} else if job.Schedule != "" {
|
|
s.wg.Add(1)
|
|
go s.runDailyJob(job)
|
|
}
|
|
}
|
|
|
|
s.logger.Printf("[INFO] [scheduler] Starting scheduler with %d jobs", len(s.jobs))
|
|
s.dbg("scheduler started: periodic=%d daily=%d", func() int {
|
|
n := 0
|
|
for _, j := range s.jobs {
|
|
if j.Interval > 0 {
|
|
n++
|
|
}
|
|
}
|
|
return n
|
|
}(), func() int {
|
|
n := 0
|
|
for _, j := range s.jobs {
|
|
if j.Schedule != "" {
|
|
n++
|
|
}
|
|
}
|
|
return n
|
|
}())
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// Stop cancels all jobs and waits for them to finish (30s timeout).
|
|
func (s *Scheduler) Stop() {
|
|
s.mu.Lock()
|
|
cancel := s.cancel
|
|
s.mu.Unlock()
|
|
s.logger.Printf("[INFO] [scheduler] Stopping scheduler")
|
|
if cancel != nil {
|
|
cancel()
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
s.logger.Println("[INFO] [scheduler] All jobs stopped")
|
|
case <-time.After(30 * time.Second):
|
|
s.logger.Println("[WARN] [scheduler] Scheduler stop timed out after 30s — some jobs may still be running")
|
|
}
|
|
}
|
|
|
|
// GetJobs returns a snapshot of all jobs (copies, not pointers).
|
|
func (s *Scheduler) GetJobs() []Job {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
result := make([]Job, len(s.jobs))
|
|
for i, j := range s.jobs {
|
|
result[i] = *j
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *Scheduler) runPeriodicJob(job *Job) {
|
|
defer s.wg.Done()
|
|
|
|
// Quiet mode: jobs with interval <= 30s only log failures
|
|
quiet := job.Interval <= 30*time.Second
|
|
|
|
ticker := time.NewTicker(job.Interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.executeJob(job, quiet)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Scheduler) runDailyJob(job *Job) {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
nextRun := nextDailyRun(job.Schedule)
|
|
waitDuration := time.Until(nextRun)
|
|
|
|
if waitDuration < 0 {
|
|
waitDuration = 0
|
|
}
|
|
|
|
s.dbg("daily job %s: next run at %s (waiting %s)", job.Name, nextRun.Format("2006-01-02 15:04:05 MST"), waitDuration.Round(time.Second))
|
|
|
|
timer := time.NewTimer(waitDuration)
|
|
select {
|
|
case <-s.ctx.Done():
|
|
timer.Stop()
|
|
s.dbg("daily job %s: context cancelled, stopping", job.Name)
|
|
return
|
|
case <-timer.C:
|
|
s.executeJob(job, false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Scheduler) executeJob(job *Job, quiet bool) {
|
|
s.mu.Lock()
|
|
if job.Running {
|
|
s.mu.Unlock()
|
|
s.logger.Printf("[WARN] [scheduler] Job %s still running, skipping", job.Name)
|
|
return
|
|
}
|
|
job.Running = true
|
|
s.mu.Unlock()
|
|
|
|
defer func() {
|
|
s.mu.Lock()
|
|
job.Running = false
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
// Panic recovery
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
s.mu.Lock()
|
|
job.LastErr = fmt.Errorf("panic: %v", r)
|
|
job.LastRun = time.Now()
|
|
s.mu.Unlock()
|
|
s.logger.Printf("[ERROR] [scheduler] Job %s panicked: %v", job.Name, r)
|
|
}
|
|
}()
|
|
|
|
if !quiet {
|
|
s.logger.Printf("[INFO] [scheduler] Running job: %s", job.Name)
|
|
}
|
|
s.dbg("job %s: execution starting", job.Name)
|
|
|
|
start := time.Now()
|
|
err := job.Fn(s.ctx)
|
|
elapsed := time.Since(start)
|
|
|
|
s.mu.Lock()
|
|
job.LastRun = time.Now()
|
|
job.LastErr = err
|
|
s.mu.Unlock()
|
|
|
|
if err != nil {
|
|
s.logger.Printf("[ERROR] [scheduler] Job %s failed: %v (took %s)", job.Name, err, elapsed.Round(time.Millisecond))
|
|
s.dbg("job %s: failed after %s: %v", job.Name, elapsed.Round(time.Millisecond), err)
|
|
} else if !quiet {
|
|
s.logger.Printf("[INFO] [scheduler] Job %s completed (took %s)", job.Name, elapsed.Round(time.Millisecond))
|
|
}
|
|
s.dbg("job %s: finished in %s (err=%v)", job.Name, elapsed.Round(time.Millisecond), err)
|
|
}
|
|
|
|
// parseDailyTime parses "HH:MM" and returns hour and minute.
|
|
func parseDailyTime(timeStr string) (int, int, error) {
|
|
var hour, min int
|
|
n, err := fmt.Sscanf(timeStr, "%d:%d", &hour, &min)
|
|
if err != nil || n != 2 {
|
|
return 0, 0, fmt.Errorf("expected HH:MM format, got %q", timeStr)
|
|
}
|
|
if hour < 0 || hour > 23 || min < 0 || min > 59 {
|
|
return 0, 0, fmt.Errorf("invalid time %q: hour must be 0-23, minute 0-59", timeStr)
|
|
}
|
|
return hour, min, nil
|
|
}
|
|
|
|
// NextDailyRun is the exported version of nextDailyRun for external callers.
|
|
func NextDailyRun(timeStr string) time.Time {
|
|
return nextDailyRun(timeStr)
|
|
}
|
|
|
|
// nextDailyRun calculates the next occurrence of the daily schedule in Europe/Budapest timezone.
|
|
func nextDailyRun(timeStr string) time.Time {
|
|
hour, min, err := parseDailyTime(timeStr)
|
|
if err != nil {
|
|
// Should not happen — validated at registration
|
|
return time.Now().Add(24 * time.Hour)
|
|
}
|
|
|
|
loc := getBudapestLocation()
|
|
|
|
now := time.Now().In(loc)
|
|
next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, loc)
|
|
|
|
// If the time has already passed today, schedule for tomorrow.
|
|
// Use time.Date with day+1 instead of Add(24h) to correctly handle DST transitions
|
|
// (spring forward/fall back in Europe/Budapest would shift by 1 hour with Add(24h)).
|
|
if !next.After(now) {
|
|
next = time.Date(now.Year(), now.Month(), now.Day()+1, hour, min, 0, 0, loc)
|
|
}
|
|
|
|
return next
|
|
}
|