Skip to content

Commit

Permalink
Add signal handling to serverless services
Browse files Browse the repository at this point in the history
  • Loading branch information
javfg committed May 3, 2023
1 parent fc7bdcf commit b10ff76
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
26 changes: 26 additions & 0 deletions cmd/revad/internal/grace/grace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Watcher struct {
ppid int
lns map[string]net.Listener
ss map[string]Server
SL Serverless
pidFile string
childPIDs []int
}
Expand Down Expand Up @@ -254,6 +255,12 @@ type Server interface {
Address() string
}

// Serverless is the interface that the serverless server implements.
type Serverless interface {
Stop() error
GracefulStop() error
}

// TrapSignals captures the OS signal.
func (w *Watcher) TrapSignals() {
signalCh := make(chan os.Signal, 1024)
Expand Down Expand Up @@ -293,6 +300,11 @@ func (w *Watcher) TrapSignals() {
}
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
}
err := w.SL.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping serverless server")
}
w.log.Info().Msg("serverless services abruptly closed")
w.Exit(1)
}
}
Expand All @@ -306,6 +318,14 @@ func (w *Watcher) TrapSignals() {
w.Exit(1)
}
}
if w.SL != nil {
err := w.SL.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
w.log.Info().Msg("exit with error code 1")
w.Exit(1)
}
}
w.log.Info().Msg("exit with error code 0")
w.Exit(0)
case syscall.SIGINT, syscall.SIGTERM:
Expand All @@ -317,6 +337,12 @@ func (w *Watcher) TrapSignals() {
w.log.Error().Err(err).Msg("error stopping server")
}
}
err := w.SL.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping serverless server")
}
w.log.Info().Msg("serverless services abruptly closed")

w.Exit(0)
}
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/revad/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func run(mainConf map[string]interface{}, coreConf *coreConf, logger *zerolog.Lo
log.Panic(err)
}
listeners := initListeners(watcher, servers, logger)
if serverless != nil {
watcher.SL = serverless
}

start(mainConf, servers, serverless, listeners, logger, watcher)
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/rserverless/rserverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package rserverless

import (
"fmt"
"time"

"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
Expand All @@ -29,6 +30,8 @@ import (
// Service represents a serverless service.
type Service interface {
Start()
GracefulStop() error
Stop() error
}

// Services is a map of service name and its new function.
Expand Down Expand Up @@ -78,6 +81,74 @@ func (s *Serverless) Start() error {
return s.registerServices()
}

func stillRunning(stoppedServices map[string]bool) []string {
stillRunning := []string{}

for svcName, stopped := range stoppedServices {
if !stopped {
stillRunning = append(stillRunning, svcName)
}
}

return stillRunning
}

// GracefulStop gracefully stops the serverless services.
func (s *Serverless) GracefulStop() error {
stoppedServices := make(map[string]bool, len(s.services))

for svcName, svc := range s.services {
stoppedServices[svcName] = false

go func(svcName string, svc Service) {
s.log.Info().Msgf("trying to stop serverless service %s", svcName)
if err := svc.GracefulStop(); err != nil {
s.log.Error().Err(err).Msgf("error gracefully stopping service %s, trying hard stop", svcName)
if err := svc.Stop(); err != nil {
s.log.Error().Err(err).Msgf("error hard stopping service %s", svcName)
} else {
stoppedServices[svcName] = true
}
} else {
s.log.Info().Msgf("service %s stopped", svcName)
stoppedServices[svcName] = true
}
}(svcName, svc)
}

count := 9 // one second less than the grace watcher deadlne
ticker := time.NewTicker(time.Second)
for ; true; <-ticker.C {
count--
stillRunningServices := stillRunning(stoppedServices)

if len(stillRunningServices) == 0 {
s.log.Info().Msg("all services are stopped")
return nil
}

if count <= 0 {
s.log.Info().Msg("deadline reached before stopping all services")
return errors.Errorf("the services %v will stop abruptly", stillRunningServices)
}
}

return nil
}

// Stop stop the serverless services without waiting.
func (s *Serverless) Stop() error {
for svcName, svc := range s.services {
if err := svc.Stop(); err != nil {
s.log.Error().Err(err).Msgf("error stopping service %s", svcName)
} else {
s.log.Info().Msgf("service %s stopped", svcName)
}
}

return nil
}

func (s *Serverless) registerServices() error {
for svcName := range s.conf.Services {
if s.isServiceEnabled(svcName) {
Expand Down

0 comments on commit b10ff76

Please sign in to comment.