Implementing a delayed job scheduler in Golang
Problem Statement
We need to implement a scheduler that can efficiently take jobs with a delay time associated with them and execute these jobs after the said amount of delay in seconds.
Solution
Let’s start with defining the required interfaces first. We will need a Job interface which will have a method called Execute()
type Job interface {
Execute()
}
We will also need a Scheduler interface that will contain a method called Schedule which takes a job and the duration after which it needs to be executed.
type Scheduler interface {
// Schedule takes a job and executes it after the given duration of time in sec
Schedule(job job.Job, duration time.Duration)
}
To start with our implementation let’s first implement the Job interface. Let’s just have a simple Summation job that takes 2 integers and calculates their sum.
type SumJob struct {
a int64
b int64
sum int64
}
func (p *SumJob) Execute() {
p.sum = p.a + p.b
}
func NewSumJob(a, b int64) *SumJob {
return &SumJob{
a: a,
b: b,
}
}
Now let’s try to implement the most basic delayed scheduler and work our optimisations from there. In my mind the most basic implementation is every time the scheduler receives a job it starts a new goroutine and puts it to sleep for the given amount of delay and once the sleep time lapses, we will call the Execute() function of that job.
type delayedScheduler struct {
}
func NewScheduler() *delayedScheduler {
return &delayedScheduler{}
}
func (d *delayedScheduler) Schedule(job job.Job, duration time.Duration) {
go func() {
time.Sleep(duration)
job.Execute()
}()
}
Now let’s try to benchmark our delayed scheduler. Golang provides support for benchmarking out of the box, we just need to write our benchmarking tests. To know more about Golang’s benchmarking you can refer to this benchmarking documentation.
func BenchmarkScheduler(b *testing.B) {
delayedScheduler := delayed_scheduler.NewScheduler()
sumJob1 := job.NewSumJob(2, 3)
sumJob2 := job.NewSumJob(3, 4)
sumJob3 := job.NewSumJob(4, 5)
for i := 0; i < b.N; i++ {
delayedScheduler.Schedule(sumJob1, 5*time.Second)
delayedScheduler.Schedule(sumJob2, 1*time.Second)
delayedScheduler.Schedule(sumJob3, 3*time.Second)
}
}
After running benchmark tests for 10k, 20k, 50k, 100k and 500k iterations following are the results —
goos: darwin
goarch: amd64
pkg: github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkScheduler-12 10000 5019 ns/op 1796 B/op 9 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.187s
BenchmarkScheduler-12 20000 4967 ns/op 1803 B/op 9 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.235s
BenchmarkScheduler-12 50000 5237 ns/op 1824 B/op 9 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.411s
BenchmarkScheduler-12 100000 5199 ns/op 1829 B/op 9 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.695s
BenchmarkScheduler-12 500000 5455 ns/op 1506 B/op 8 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 3.098s
These benchmarking numbers will later come in handy in comparison.
Let’s also take the CPU profile of this test to get some insights into what can be optimised here.
There are 2 things that stands out in this flame graph -
- time.Sleep() function call does a lot of internal things that take quite a big chunk of the CPU.
- Since we are spawning a lot of goroutines which are then put to sleep there is a lot of context-switching between these sleeping goroutines and a very big chunk of CPU is wasted on that (talking about the runtime.mcall() block here).
Optimising the solution with Priority Queue
What if instead of just spawning a goroutine and putting it to sleep we maintain a priority queue of the jobs and the order is defined by the execution time?
Here we will need a min priority queue based on the execution time of the job. In Golang we don’t have a priority queue data structure out of the box, but the “container/heap” package provides the heap interface that we can use here to implement our priority queue. First, we will create a new struct delayedJob, which will be a wrapper of the job interface and contain extra information such as priority which is just the execution time in epoch time representation and index which stores the position of this job in the priority queue. Below is the implementation of the priority queue based on the heap interface.
// delayedJob is the wrapper of Job which contains extra information required for the priority queue implementation
type delayedJob struct {
job job.Job
// priority is the epoch time at which this job needs to be executed
priority int64
// index is the position of this Job in the priorityQueue
index int
}
// priorityJobQueue is the priority queue implementation which will store the delayedJob in their order of execution
type priorityJobQueue []*delayedJob
func (pq priorityJobQueue) Len() int { return len(pq) }
func (pq priorityJobQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq priorityJobQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityJobQueue) Push(x interface{}) {
n := len(*pq)
delayedJob := x.(*delayedJob)
delayedJob.index = n
*pq = append(*pq, delayedJob)
}
func (pq *priorityJobQueue) Pop() interface{} {
old := *pq
n := len(old)
delayedJob := old[n-1]
old[n-1] = nil // avoid memory leak
delayedJob.index = -1 // for safety
*pq = old[0 : n-1]
return delayedJob
}
Now once we have our priority queue for the jobs, each time we call the Schedule() function of our scheduler we will just add that job to our priority queue and on a separate goroutine we will periodically check if the topmost job’s execution time has come or not and if it’s time for its execution, we will call it’s Execute() function in parallel.
type delayedScheduler struct {
ticker *time.Ticker
priorityJobQueue priorityJobQueue
sync.Mutex
done chan bool
}
func NewScheduler() *delayedScheduler {
delayedScheduler := &delayedScheduler{
priorityJobQueue: make(priorityJobQueue, 0),
done: make(chan bool),
ticker: time.NewTicker(1 * time.Second),
}
heap.Init(&delayedScheduler.priorityJobQueue)
go delayedScheduler.start()
return delayedScheduler
}
func (ds *delayedScheduler) start() {
go func() {
for {
select {
case <-ds.done:
ds.ticker.Stop()
return
case t := <-ds.ticker.C:
fmt.Println("Tick at ", t)
ds.Lock()
if len(ds.priorityJobQueue) != 0 {
job := heap.Pop(&ds.priorityJobQueue).(*delayedJob)
now := time.Now().Unix()
if job.priority-now > 0 {
heap.Push(&ds.priorityJobQueue, job)
} else {
go job.job.Execute()
}
}
ds.Unlock()
}
}
}()
}
func (ds *delayedScheduler) Stop() {
ds.done <- true
}
func (ds *delayedScheduler) Schedule(job job.Job, duration time.Duration) {
ds.Lock()
now := time.Now().Unix()
timeOfExecution := now + int64(duration.Seconds())
delayedJob := &delayedJob{
job: job,
priority: timeOfExecution,
}
heap.Push(&ds.priorityJobQueue, delayedJob)
ds.Unlock()
}
Additionally, you will notice I have also exposed a Stop() function which stops the scheduler.
With this optimisation, we have addressed the 2 problems seen in the flame graph above.
- We moved away from time.Sleep()
- We also don’t have any sleeping goroutines now.
Let’s get back to benchmarking again. We will use the same benchmarking test that we used earlier and will run the tests again for 10k, 20k, 50k, 100k and 500k iterations.
goos: darwin
goarch: amd64
pkg: github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkScheduler-12 10000 522.3 ns/op 212 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.134s
BenchmarkScheduler-12 20000 546.7 ns/op 222 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.140s
BenchmarkScheduler-12 50000 594.7 ns/op 228 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.157s
BenchmarkScheduler-12 100000 701.1 ns/op 229 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.202s
BenchmarkScheduler-12 500000 603.3 ns/op 226 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.433s
Looking at these numbers and comparing them to previous benchmarking we can see that we have made a considerable amount of improvement. Let’s take the CPU profile again and try to see if we can make anymore improvements to this.
We can see that time.Sleep() and runtime.mcall() have now disappeared from the profile but the GC is taking a lot of time. Let’s try to optimise the GC as well. In golang, we can play around with the GC value by using the debug.SetGCPercent() function. The default percentage is 100. If you increase the percentage the GC frequency decreases. Let’s try setting GC percentage as 200,500 and 1000 and see what works the best.
func BenchmarkScheduler(b *testing.B) {
debug.SetGCPercent(500)
delayedScheduler := delayed_scheduler.NewScheduler()
sumJob1 := job.NewSumJob(2, 3)
sumJob2 := job.NewSumJob(3, 4)
sumJob3 := job.NewSumJob(4, 5)
for i := 0; i < b.N; i++ {
delayedScheduler.Schedule(sumJob1, 5*time.Second)
delayedScheduler.Schedule(sumJob2, 1*time.Second)
delayedScheduler.Schedule(sumJob3, 3*time.Second)
}
}
For me, I got the optimal result at 500. Setting the GC more than that didn’t change anything.
We can see the GC is now under control in the above flame graph because of the GC optimisation that we did. Now let’s do our benchmarking test again for 10k, 20k, 50k, 100k and 500k iterations.
goos: darwin
goarch: amd64
pkg: github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkScheduler-12 10000 498.8 ns/op 212 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.134s
BenchmarkScheduler-12 20000 509.0 ns/op 221 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.144s
BenchmarkScheduler-12 50000 519.5 ns/op 228 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.158s
BenchmarkScheduler-12 100000 569.9 ns/op 229 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.220s
BenchmarkScheduler-12 500000 594.8 ns/op 226 B/op 3 allocs/op
PASS
ok github.com/NoobMaster-96/delayed-scheduler-golang/benchmarking 0.433s
We can see that the benchmarking numbers again show some improvements after the GC optimisations. Although It’s not that significant amount as compared to before but we have improved.
More scope for optimisation
One not-so-clear problem with our delayed scheduler will come at scale. Imagine a scenario where a lot of Jobs are scheduled to be executed at the same time. Our scheduler will then end up calling the Execute() function of these jobs. And all of these Execute() functions are being called in parallel this will lead to a sudden spike in the number of goroutines and the minimum memory required for a goroutine is 2KB. In an unbounded environment where the scheduler is allowed to spawn as many goroutines as it wants, this scenario can lead to the machine being out of memory and will crash the system. One solution for this problem is to limit the no. of goroutines that our scheduler can use by providing a bounded worker pool. You can try that on your own as a fun exercise😛.
GitHub links for all the code that we went through in this blog —
v0.1.0 Delayed scheduler with sleeping goroutines
v0.2.0 Delayed scheduler with priority queue and GC optimisations
Feel free to connect with me on LinkedIn if you want to discuss further on this, give any feedback or even otherwise!