Use Case: Service Lifecycle Management#
When I was designing a new data pipeline for Adentro (Zenreach at the time), the design called for several microservices, most of which would read messages from a Kafka topic, do some simple transformation, and write back out to another topic. As I was planning out the work, I realized that each of those microservices would share a lot of the same boilerplate code: reading config, setting up a logger, setting up a Kafka consumer and producer, and handling graceful shutdown. I further considered that other microservices in our stack shared similar boilerplate, with Kafka consumers replaced by an HTTP server or a polling job.
Because we had a monorepo setup, and all config and logging (should) share the same setup, it made sense to create the appropriate common libraries in order to factor out and minimize the boilerplate. Thus, the task package was born.
Problem Description#
A microservice ideally does just one thing. That thing can be thought of as a Task, which needs to start and stop. Beyond that, how the lifecycle of a Task is managed is independent of the Task itself and can be factored out into common code once a Task interface is well-defined. The problem is one of determining what that interface should be, and then writing the boilerplate code into a common reusable library.
Defining the Task interface#
What are common tasks?#
Although my original use-case was ostensibly about Kafka consumers, there are a myriad of other possibilities, the most obvious and common of which is an HTTP server. Ironically, even the Kafka consumer services had need of an HTTP server in order to expose Prometheus metrics, so the services were actually doing a minimum of two things.
Another use case that existed in our design was a polling job that periodically would purge older database records. There are many other ways to accomplish this, but for the sake of brevity let’s just roll with it.
How does graceful shutdown work?#
This is a fun topic: a quick Google search will reveal that this is a super common question as it isn’t exactly obvious. In the end, it generally comes down to catching signals in a channel:
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// wait for a signal
<-sigChan
// do the shutdownHowever, this is insufficient: what if the Task fails and produces an unrecoverable error? A panic would just crash, but some other error condition might cause the service to hang here forever waiting for a signal but otherwise not perform. The typical solution is to use a select to wait on another channel.
select {
case <-sigChan:
// handle signal
case <-doneChan:
// handle task termination
}This pattern works well enough for two items, but starts to become a pain quickly if there are others. In the case of our Kafka consumer, there would be three: the consumer, the metrics server, and the signal channel. What I took away from this however is that waiting on a signal is itself a Task.
The Task interface#
A Task needs to be told when to start, and when to stop. It also needs to communicate when it has stopped prematurely due to an error condition.
A naive interface might look like:
type Task interface {
Run(errChan chan<- error)
Stop()
}Where Run is non-blocking, causing the Task to start. The provided channel allows for async communication back to the caller in the event of an error. Stop tells the Task to stop running.
This should throw up a few red flags:
- Managing multiple error channels will become a pain point.
Stopis blocking, and itself has no way to communicate back to the caller.
Here’s a better solution:
type Task interface {
Run(context.Context) error
}Where Run is a blocking call which returns nil on a clean completion or an error otherwise. It must respect the provided context, and gracefully terminate when the context is cancelled.
What does it mean when a Task terminates returning nil? Yes, it means that that Task completed without error, but if the Task is complete, should we tell other Tasks begin to shutdown? This is a matter of philosophy about what a Task actually is. This could go in either direction, but has a significant impact on how we implement the management of our Tasks, even though it doesn’t necessarily change the interface itself.
When I wrote the task package originally, I chose to view a Task as something that should never terminate until explicitly told to (in the happy path). We’ll continue with this approach, but I’ll touch on this again in an alternative solution.
Managing Tasks#
If you haven’t guessed by now, the answer is to use errgroup
. This is similar to waitgroup
, except that instead of running a set of func()s you can instead run a set of func() errors and the final Wait func will return the first error that was received. A short example:
var group errgroup.Group
// non-blocking
group.Go(func() error {
return doStuff(ctx)
})
// non-blocking
group.Go(func() error {
return doOtherStuff(ctx)
})
// Wait for all goroutines to finish.
err := group.Wait()It should be obvious from this example that doStuff can be directly replaced with our Task.Run func.
The task package#
We now have enough design context that we can begin to build out this library. I’ve now done this three times: originally at Adentro, again at Zircuit (from scratch), and once more for this blog (a fork of the Zircuit implementation). Each time has been a learning experience, so I encourage you dear reader to try it yourself before just accepting my solution.
Introducing the Manager#
Let’s start with the bare-bones implementation. We need a struct to hold the errgroup.Group as well as the Context and CancelFunc that will be used to coordinate shutdown.
type Manager struct {
ctx context.Context
cancel context.CancelFunc
group errgroup.Group
}Creating a Manager is straightforward:
func NewManager() *Manager {
ctx, cancel := context.WithCancel(context.Background())
return &Manager{
ctx: ctx,
cancel: cancel,
}
}Waiting for all Tasks to complete#
Wait blocks until all goroutines have finished and returns the first error encountered:
func (tm *Manager) Wait() error {
return tm.group.Wait()
}Manually stopping all Tasks#
Stop just needs to cancel the context and then Wait:
func (tm *Manager) Stop() error {
tm.cancel()
return tm.Wait()
}Running a Task#
To be useful, the Manager needs to Run one or more Tasks:
func (tm *Manager) Run(task Task) {
tm.group.Go(func() error {
defer tm.cancel()
return task.Run(tm.ctx)
})
}The defer tm.cancel() line is the implementation of the key design choice we made earlier: if any Task completes, the shared context is cancelled prompting any remaining tasks to begin their own graceful shutdown. See An alternative approach
for a different take on this.
The ossignal package#
Before we can test and improve the Manager, we’ll need a simple Task to work with. Since virtually every service will need to listen for an os signal, we’ll start with that.
type Task struct {
sigCh chan os.Signal
}
func NewTask() *Task {
t := &Task{sigCh: make(chan os.Signal, 1)}
signal.Notify(t.sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
return t
}
func (t *Task) Run(ctx context.Context) error {
select {
case <-t.sigCh:
case <-ctx.Done():
}
signal.Stop(t.sigCh)
return nil
}The channel is buffered so a signal arriving before Run is called isn’t dropped. signal.Notify is called at construction time rather than in Run, so signals are captured from the moment the task is created. signal.Stop deregisters the channel on exit so the runtime stops sending to it.
Try it out#
func main () {
tm := task.NewManager()
tm.Run(ossignal.NewTask())
err := tm.Wait()
fmt.Println(err)
}If you run this, nothing will happen. The program will (correctly) hang until a signal is received. A Ctrl-C from the terminal will send a SIGINT, or you could be more creative and send a signal using the kill command. Once the signal is received, the program will terminate.
Improvements#
The code presented here so far is extremely bare-bones. A few things that should be added:
- A
Cleanup(f func() error)func on theManagerto allow registration of clean up work that should execute only afterTasks have completed. For example, multipleTasks might share a database connection that should be closed only when all of them are done. - Optional logging: allow for an
slog.Loggerto be provided to both theManagerand theossignal.Taskto provide for logging. - A guard in
Manager.Runthat returnsErrManagerStoppedif called after the manager has already stopped. sync.OnceinManager.Waitto make it safe to call concurrently or repeatedly, always returning the same result.- A shutdown timeout on
Manager.Wait: after the context is cancelled, wait a bounded amount of time for tasks to finish before giving up withErrShutdownTimeout. RunEphemeralon theManagerfor tasks that are expected to complete without error while others continue running — for example, a one-time initialization step.- Use my version of errgroup
in order to recover from a
panicinside aTask.
An alternative approach#
In the design, we made a choice: When any Task terminates for any reason, we tell all other tasks to gracefully shutdown. Let’s suppose that instead we only do this if a Task terminates with an error. With this subtle shift in philosophy we can rewrite our Manager to better leverage errgroup:
type Manager struct {
ctx context.Context
cancel context.CancelFunc
group *errgroup.Group
}
func NewManager() *Manager {
ctxA, cancel := context.WithCancel(context.Background())
group, ctxB := errgroup.WithContext(ctxA)
return &Manager{
ctx: ctxB,
cancel: cancel,
group: group,
}
}The context returned from WithContext
will be automatically cancelled if any of the tasks return an error. This allows for us to simplify the Run func to just:
func (tm *Manager) Run(task Task) {
tm.group.Go(func() error {
return task.Run(tm.ctx)
})
}The Stop and Wait funcs remain unchanged.
Benefits of this approach#
With this in place, the bare-bones Manager really is just a thin wrapper around errgroup.Group. This is a good thing, as it means there is less surface area available for bugs.
In addition, there’s no longer a need to implement a RunEphemeral func for tasks that are expected to complete, although note that if they fail this still results in a shutdown.
Consequences for ossignal#
We do need to reconsider how the ossignal.Task should function. As-is, the Task returns nil when a signal is received, which now no longer results in other Tasks being told to shutdown.
An obvious solution is to have ossignal return an error. This will absolutely work, but it feels wrong to return an error in the happy path. Moreover, this would result in Manager.Wait returning that same error. We could add to the hack by filtering out that error in Wait, but even if we do that, we will have masked any error returned by other Tasks after the shutdown has begun. This is an unacceptable trade-off.
A better solution is to improve the flexibility of our ossignal.Task: Allow users to choose the signal(s) to listen for, and allow them to set a callback func:
func DefaultSignals() []os.Signal {
return []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT}
}
type Task struct {
sigCh chan os.Signal
onSignal func(os.Signal)
}
type options struct {
signals []os.Signal
onSignal func(os.Signal)
}
func WithSignals(signals ...os.Signal) Option {
return func(options *options) {
options.signals = signals
}
}
func WithOnSignal(fn func(os.Signal)) Option {
return func(options *options) {
options.onSignal = fn
}
}And then:
func NewTask(opts ...Option) *Task {
options := options{signals: DefaultSignals()}
for _, opt := range opts {
opt(&options)
}
task := &Task{
sigCh: make(chan os.Signal, 1),
onSignal: options.onSignal,
}
signal.Notify(task.sigCh, options.signals...)
return task
}
func (t *Task) Run(ctx context.Context) error {
select {
case sig := <-t.sigCh:
if t.onSignal != nil {
t.onSignal(sig)
}
case <-ctx.Done():
}
signal.Stop(t.sigCh)
return nil
}Finally, let’s update our main to make use of these changes:
func main () {
tm := task.NewManager()
tm.Run(ossignal.NewTask(ossignal.WithOnSignal(
func(_ os.Signal) {
tm.cancel() // Oops! This would now need to be exposed
},
)))
err := tm.Wait()
fmt.Println(err)
}Now we have a different problem: Manager does not expose the cancel func. If we chose to do that, then we just have to accept that shutdown can be initiated by other parties. That’s probably okay, but we do lose some of the encapsulation we were aiming for.
Which approach is best?#
This is a matter of opinion TBH. Given this blog is named “Jim’s Opinion” I’ll go ahead and give you mine: stick with the original. We can (and will) certainly take the improvements we made to ossignal here, but
- I don’t like the loss of encapsulation.
- The original version has seen production use for many years now at two different companies.
- There’s a risk of the degenerate case: all the other tasks complete without error except the
ossignal. Now the service is sitting idle forever.
Next Steps#
The complete code for the task package can be found on GitHub: task . Note that the code will likely differ from what was presented here, as it is expanded and improved over time.
Related Reading#
- Go By Example: WaitGroups
- Go Docs for golang.org/x/sync/errgroup
- Previous article Extended Error Use Case: calm
, which introduces a panic-recovering wrapper around
errgroup
Some code in this article is based on work from zkr-go-common , licensed under the MIT License, originally authored by wood-jp at Zircuit
