package scheduler import ( "context" "fmt" "log" "sync" "time" ) var ( budapestLoc *time.Location budapestLocOnce sync.Once ) func getBudapestLocation() *time.Location { budapestLocOnce.Do(func() { loc, err := time.LoadLocation("Europe/Budapest") if err != nil { log.Printf("[ERROR] Cannot load Europe/Budapest timezone: %v — using UTC", err) loc = time.UTC } budapestLoc = loc }) return budapestLoc } // JobFunc is the function signature for scheduler jobs. type JobFunc func(ctx context.Context) error // Job represents a scheduled task. type Job struct { Name string Fn JobFunc Interval time.Duration // for periodic jobs (every N) Schedule string // for daily jobs ("02:30", "03:00") — mutually exclusive with Interval LastRun time.Time LastErr error Running bool } // Scheduler manages periodic and daily jobs. type Scheduler struct { mu sync.Mutex jobs []*Job logger *log.Logger ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // New creates a new Scheduler. func New(logger *log.Logger) *Scheduler { return &Scheduler{ logger: logger, } } // Every registers a periodic job that runs every interval. func (s *Scheduler) Every(name string, interval time.Duration, fn JobFunc) { if interval <= 0 { s.logger.Printf("[ERROR] Periodic job %s has invalid interval %s — job not registered", name, interval) return } s.mu.Lock() defer s.mu.Unlock() s.jobs = append(s.jobs, &Job{ Name: name, Fn: fn, Interval: interval, }) s.logger.Printf("[SCHED] Registered periodic job: %s (every %s)", name, interval) } // Daily registers a job that runs once per day at the specified time (HH:MM) in Europe/Budapest timezone. func (s *Scheduler) Daily(name string, timeStr string, fn JobFunc) { s.mu.Lock() defer s.mu.Unlock() // Validate time format if _, _, err := parseDailyTime(timeStr); err != nil { s.logger.Printf("[ERROR] Daily job %s has invalid schedule %q: %v — job not started", name, timeStr, err) return } s.jobs = append(s.jobs, &Job{ Name: name, Fn: fn, Schedule: timeStr, }) nextRun := nextDailyRun(timeStr) s.logger.Printf("[SCHED] Daily job %s scheduled for %s", name, nextRun.Format("2006-01-02 15:04 MST")) } // Start begins running all registered jobs. Safe to call only once. func (s *Scheduler) Start(ctx context.Context) { s.mu.Lock() if s.cancel != nil { s.mu.Unlock() s.logger.Println("[WARN] Scheduler already started — ignoring duplicate Start()") return } s.ctx, s.cancel = context.WithCancel(ctx) for _, job := range s.jobs { if job.Interval > 0 { s.wg.Add(1) go s.runPeriodicJob(job) } else if job.Schedule != "" { s.wg.Add(1) go s.runDailyJob(job) } } s.logger.Printf("[SCHED] Scheduler started with %d jobs", len(s.jobs)) s.mu.Unlock() } // Stop cancels all jobs and waits for them to finish (30s timeout). func (s *Scheduler) Stop() { s.mu.Lock() cancel := s.cancel s.mu.Unlock() if cancel != nil { cancel() } done := make(chan struct{}) go func() { s.wg.Wait() close(done) }() select { case <-done: s.logger.Println("[SCHED] All jobs stopped") case <-time.After(30 * time.Second): s.logger.Println("[WARN] Scheduler stop timed out after 30s — some jobs may still be running") } } // GetJobs returns a snapshot of all jobs (copies, not pointers). func (s *Scheduler) GetJobs() []Job { s.mu.Lock() defer s.mu.Unlock() result := make([]Job, len(s.jobs)) for i, j := range s.jobs { result[i] = *j } return result } func (s *Scheduler) runPeriodicJob(job *Job) { defer s.wg.Done() // Quiet mode: jobs with interval <= 30s only log failures quiet := job.Interval <= 30*time.Second ticker := time.NewTicker(job.Interval) defer ticker.Stop() for { select { case <-s.ctx.Done(): return case <-ticker.C: s.executeJob(job, quiet) } } } func (s *Scheduler) runDailyJob(job *Job) { defer s.wg.Done() for { nextRun := nextDailyRun(job.Schedule) waitDuration := time.Until(nextRun) if waitDuration < 0 { waitDuration = 0 } timer := time.NewTimer(waitDuration) select { case <-s.ctx.Done(): timer.Stop() return case <-timer.C: s.executeJob(job, false) } } } func (s *Scheduler) executeJob(job *Job, quiet bool) { s.mu.Lock() if job.Running { s.mu.Unlock() s.logger.Printf("[WARN] Job %s still running, skipping", job.Name) return } job.Running = true s.mu.Unlock() defer func() { s.mu.Lock() job.Running = false s.mu.Unlock() }() // Panic recovery defer func() { if r := recover(); r != nil { s.mu.Lock() job.LastErr = fmt.Errorf("panic: %v", r) job.LastRun = time.Now() s.mu.Unlock() s.logger.Printf("[ERROR] Job %s panicked: %v", job.Name, r) } }() if !quiet { s.logger.Printf("[SCHED] Running job: %s", job.Name) } start := time.Now() err := job.Fn(s.ctx) elapsed := time.Since(start) s.mu.Lock() job.LastRun = time.Now() job.LastErr = err s.mu.Unlock() if err != nil { s.logger.Printf("[WARN] Job %s failed: %v (took %s)", job.Name, err, elapsed.Round(time.Millisecond)) } else if !quiet { s.logger.Printf("[SCHED] Job %s completed (took %s)", job.Name, elapsed.Round(time.Millisecond)) } } // parseDailyTime parses "HH:MM" and returns hour and minute. func parseDailyTime(timeStr string) (int, int, error) { var hour, min int n, err := fmt.Sscanf(timeStr, "%d:%d", &hour, &min) if err != nil || n != 2 { return 0, 0, fmt.Errorf("expected HH:MM format, got %q", timeStr) } if hour < 0 || hour > 23 || min < 0 || min > 59 { return 0, 0, fmt.Errorf("invalid time %q: hour must be 0-23, minute 0-59", timeStr) } return hour, min, nil } // NextDailyRun is the exported version of nextDailyRun for external callers. func NextDailyRun(timeStr string) time.Time { return nextDailyRun(timeStr) } // nextDailyRun calculates the next occurrence of the daily schedule in Europe/Budapest timezone. func nextDailyRun(timeStr string) time.Time { hour, min, err := parseDailyTime(timeStr) if err != nil { // Should not happen — validated at registration return time.Now().Add(24 * time.Hour) } loc := getBudapestLocation() now := time.Now().In(loc) next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, loc) // If the time has already passed today, schedule for tomorrow. // Use time.Date with day+1 instead of Add(24h) to correctly handle DST transitions // (spring forward/fall back in Europe/Budapest would shift by 1 hour with Add(24h)). if !next.After(now) { next = time.Date(now.Year(), now.Month(), now.Day()+1, hour, min, 0, 0, loc) } return next }