ActivityFetcher/internal/scheduler/scheduler.go
2025-10-14 10:45:43 +02:00

87 lines
1.6 KiB
Go

package scheduler
import (
"context"
"log"
"sync"
"time"
"fetcher/jobs"
)
// Runner executes registered jobs at their declared intervals.
type Runner struct {
logger *log.Logger
}
// New creates a Runner that logs through the provided logger. Logger may be nil.
func New(logger *log.Logger) *Runner {
return &Runner{logger: logger}
}
// Run starts all jobs and blocks until the context is cancelled.
func (r *Runner) Run(ctx context.Context, jobList []jobs.Job) {
if len(jobList) == 0 {
r.log("no jobs registered; scheduler idle until context cancelled")
<-ctx.Done()
return
}
var wg sync.WaitGroup
wg.Add(len(jobList))
for _, job := range jobList {
job := job
go func() {
defer wg.Done()
r.runJob(ctx, job)
}()
}
<-ctx.Done()
wg.Wait()
}
func (r *Runner) runJob(ctx context.Context, job jobs.Job) {
interval := job.Interval()
if interval <= 0 {
r.logf("job %q provided non-positive interval %s; defaulting to 1s", job.Name(), interval)
interval = time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
r.execute(ctx, job)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.execute(ctx, job)
}
}
}
func (r *Runner) execute(ctx context.Context, job jobs.Job) {
if ctx.Err() != nil {
return
}
if err := job.Run(ctx); err != nil {
r.logf("job %q failed: %v", job.Name(), err)
}
}
func (r *Runner) log(message string) {
if r.logger != nil {
r.logger.Print(message)
}
}
func (r *Runner) logf(format string, args ...interface{}) {
if r.logger != nil {
r.logger.Printf(format, args...)
}
}