From 1c681a3509d4b05933fe8c4f1f283ce4e195a284 Mon Sep 17 00:00:00 2001 From: Javier Ferrer Date: Fri, 5 May 2023 10:52:17 +0200 Subject: [PATCH] Serverless services (#3824) * Add serverless services * Load serverless services * Start serverless services on launch * Codacy changes * Changelog * Example serverless config * Add example serverless service * Add service name to logger Co-authored-by: Gianmaria Del Monte * Simplify function call Co-authored-by: Gianmaria Del Monte * Exit with errors if initserverless fails Co-authored-by: Gianmaria Del Monte * Add signal handling to serverless services * Use context to pass timeout on service stop --------- Co-authored-by: Gianmaria Del Monte --- changelog/unreleased/serverless-services.md | 7 + cmd/revad/internal/grace/grace.go | 26 +++ cmd/revad/runtime/loader.go | 1 + cmd/revad/runtime/runtime.go | 48 +++++- .../serverless-example/notifications.toml | 32 ++++ .../services/helloworld/helloworld.go | 98 +++++++++++ internal/serverless/services/loader/loader.go | 25 +++ pkg/rhttp/rhttp.go | 3 +- pkg/rserverless/rserverless.go | 161 ++++++++++++++++++ 9 files changed, 394 insertions(+), 7 deletions(-) create mode 100644 changelog/unreleased/serverless-services.md create mode 100644 examples/serverless-example/notifications.toml create mode 100644 internal/serverless/services/helloworld/helloworld.go create mode 100644 internal/serverless/services/loader/loader.go create mode 100644 pkg/rserverless/rserverless.go diff --git a/changelog/unreleased/serverless-services.md b/changelog/unreleased/serverless-services.md new file mode 100644 index 0000000000..054999b47c --- /dev/null +++ b/changelog/unreleased/serverless-services.md @@ -0,0 +1,7 @@ +Enhancement: Serverless Services + +New type of service (along with http and grpc) +which does not have a listening server. Useful for +the notifications service and others in the future. + +https://github.com/cs3org/reva/pull/3824 diff --git a/cmd/revad/internal/grace/grace.go b/cmd/revad/internal/grace/grace.go index 16f43165a7..52cd3619a8 100644 --- a/cmd/revad/internal/grace/grace.go +++ b/cmd/revad/internal/grace/grace.go @@ -41,6 +41,7 @@ type Watcher struct { ppid int lns map[string]net.Listener ss map[string]Server + SL Serverless pidFile string childPIDs []int } @@ -254,6 +255,12 @@ type Server interface { Address() string } +// Serverless is the interface that the serverless server implements. +type Serverless interface { + Stop() error + GracefulStop() error +} + // TrapSignals captures the OS signal. func (w *Watcher) TrapSignals() { signalCh := make(chan os.Signal, 1024) @@ -293,6 +300,11 @@ func (w *Watcher) TrapSignals() { } w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address()) } + err := w.SL.Stop() + if err != nil { + w.log.Error().Err(err).Msg("error stopping serverless server") + } + w.log.Info().Msg("serverless services abruptly closed") w.Exit(1) } } @@ -306,6 +318,14 @@ func (w *Watcher) TrapSignals() { w.Exit(1) } } + if w.SL != nil { + err := w.SL.GracefulStop() + if err != nil { + w.log.Error().Err(err).Msg("error stopping server") + w.log.Info().Msg("exit with error code 1") + w.Exit(1) + } + } w.log.Info().Msg("exit with error code 0") w.Exit(0) case syscall.SIGINT, syscall.SIGTERM: @@ -317,6 +337,12 @@ func (w *Watcher) TrapSignals() { w.log.Error().Err(err).Msg("error stopping server") } } + err := w.SL.Stop() + if err != nil { + w.log.Error().Err(err).Msg("error stopping serverless server") + } + w.log.Info().Msg("serverless services abruptly closed") + w.Exit(0) } } diff --git a/cmd/revad/runtime/loader.go b/cmd/revad/runtime/loader.go index da63b369ce..52488936d3 100644 --- a/cmd/revad/runtime/loader.go +++ b/cmd/revad/runtime/loader.go @@ -27,6 +27,7 @@ import ( _ "github.com/cs3org/reva/internal/http/interceptors/auth/tokenwriter/loader" _ "github.com/cs3org/reva/internal/http/interceptors/loader" _ "github.com/cs3org/reva/internal/http/services/loader" + _ "github.com/cs3org/reva/internal/serverless/services/loader" _ "github.com/cs3org/reva/pkg/app/provider/loader" _ "github.com/cs3org/reva/pkg/app/registry/loader" _ "github.com/cs3org/reva/pkg/appauth/manager/loader" diff --git a/cmd/revad/runtime/runtime.go b/cmd/revad/runtime/runtime.go index c2b64aa001..255c55bdf8 100644 --- a/cmd/revad/runtime/runtime.go +++ b/cmd/revad/runtime/runtime.go @@ -33,6 +33,7 @@ import ( "github.com/cs3org/reva/pkg/registry/memory" "github.com/cs3org/reva/pkg/rgrpc" "github.com/cs3org/reva/pkg/rhttp" + "github.com/cs3org/reva/pkg/rserverless" "github.com/cs3org/reva/pkg/sharedconf" rtrace "github.com/cs3org/reva/pkg/trace" "github.com/cs3org/reva/pkg/utils" @@ -93,13 +94,23 @@ func run(mainConf map[string]interface{}, coreConf *coreConf, logger *zerolog.Lo initCPUCount(coreConf, logger) servers := initServers(mainConf, logger) + serverless := initServerless(mainConf, logger) + + if len(servers) == 0 && serverless == nil { + logger.Info().Msg("nothing to do, no grpc/http/serverless enabled_services declared in config") + os.Exit(1) + } + watcher, err := initWatcher(logger, filename) if err != nil { log.Panic(err) } listeners := initListeners(watcher, servers, logger) + if serverless != nil { + watcher.SL = serverless + } - start(mainConf, servers, listeners, logger, watcher) + start(mainConf, servers, serverless, listeners, logger, watcher) } func initListeners(watcher *grace.Watcher, servers map[string]grace.Server, log *zerolog.Logger) map[string]net.Listener { @@ -141,13 +152,22 @@ func initServers(mainConf map[string]interface{}, log *zerolog.Logger) map[strin servers["grpc"] = s } - if len(servers) == 0 { - log.Info().Msg("nothing to do, no grpc/http enabled_services declared in config") - os.Exit(1) - } return servers } +func initServerless(mainConf map[string]interface{}, log *zerolog.Logger) *rserverless.Serverless { + if isEnabledServerless(mainConf) { + serverless, err := getServerless(mainConf["serverless"], log) + if err != nil { + log.Error().Err(err).Msg("error") + os.Exit(1) + } + return serverless + } + + return nil +} + func initTracing(conf *coreConf) { rtrace.SetTraceProvider(conf.TracingCollector, conf.TracingEndpoint, conf.TracingServiceName) } @@ -184,7 +204,7 @@ func handlePIDFlag(l *zerolog.Logger, pidFile string) (*grace.Watcher, error) { return w, nil } -func start(mainConf map[string]interface{}, servers map[string]grace.Server, listeners map[string]net.Listener, log *zerolog.Logger, watcher *grace.Watcher) { +func start(mainConf map[string]interface{}, servers map[string]grace.Server, serverless *rserverless.Serverless, listeners map[string]net.Listener, log *zerolog.Logger, watcher *grace.Watcher) { if isEnabledHTTP(mainConf) { go func() { if err := servers["http"].(*rhttp.Server).Start(listeners["http"]); err != nil { @@ -201,6 +221,13 @@ func start(mainConf map[string]interface{}, servers map[string]grace.Server, lis } }() } + if isEnabledServerless(mainConf) { + if err := serverless.Start(); err != nil { + log.Error().Err(err).Msg("error starting serverless services") + watcher.Exit(1) + } + } + watcher.TrapSignals() } @@ -264,6 +291,11 @@ func getHTTPServer(conf interface{}, l *zerolog.Logger) (*rhttp.Server, error) { return s, nil } +func getServerless(conf interface{}, l *zerolog.Logger) (*rserverless.Serverless, error) { + sub := l.With().Str("pkg", "rserverless").Logger() + return rserverless.New(conf, sub) +} + // adjustCPU parses string cpu and sets GOMAXPROCS // // according to its value. It accepts either @@ -365,6 +397,10 @@ func isEnabledGRPC(conf map[string]interface{}) bool { return isEnabled("grpc", conf) } +func isEnabledServerless(conf map[string]interface{}) bool { + return isEnabled("serverless", conf) +} + func isEnabled(key string, conf map[string]interface{}) bool { if a, ok := conf[key]; ok { if b, ok := a.(map[string]interface{}); ok { diff --git a/examples/serverless-example/notifications.toml b/examples/serverless-example/notifications.toml new file mode 100644 index 0000000000..f1fee178f2 --- /dev/null +++ b/examples/serverless-example/notifications.toml @@ -0,0 +1,32 @@ +[log] +output = "/var/log/revad/revad-notifications.log" +mode = "json" + +[shared] +gatewaysvc = "localhost:19000" +jwt_secret = "Pive-Fumkiu4" +skip_user_groups_in_token = true + +[serverless.services.notifications] +nats_address = "nats-server-01.example.com" +nats_token = "secret-token-example" +nats_template_subject = "reva-notifications-template" +nats_notification_subject = "reva-notifications-notification" +nats_trigger_subject = "reva-notifications-trigger" +storage_driver = "sql" +grouping_interval = 60 +grouping_maxsize = 100 + +[serverless.services.notifications.storage_drivers.sql] +db_username = "username" +db_password = "password" +db_host = "database.example.com" +db_port = 3306 +db_name = "notifications" + +[serverless.services.notifications.handlers.email] +smtp_server = "mx.example.com:25" +disable_auth = true +default_sender = "noreply@cernbox.cern.ch" + +[tracing] diff --git a/internal/serverless/services/helloworld/helloworld.go b/internal/serverless/services/helloworld/helloworld.go new file mode 100644 index 0000000000..50120cda11 --- /dev/null +++ b/internal/serverless/services/helloworld/helloworld.go @@ -0,0 +1,98 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package helloworld + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/cs3org/reva/pkg/rserverless" + "github.com/mitchellh/mapstructure" + "github.com/rs/zerolog" +) + +type config struct { + Outfile string `mapstructure:"outfile"` +} + +func (c *config) init() { + if c.Outfile == "" { + c.Outfile = "/tmp/revad-helloworld-hello" + } +} + +type svc struct { + conf *config + file *os.File + log *zerolog.Logger +} + +func init() { + rserverless.Register("helloworld", New) +} + +// New returns a new helloworld service. +func New(m map[string]interface{}, log *zerolog.Logger) (rserverless.Service, error) { + conf := &config{} + conf.init() + + if err := mapstructure.Decode(m, conf); err != nil { + return nil, err + } + + file, err := os.OpenFile(conf.Outfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Err(err) + return nil, err + } + + s := &svc{ + conf: conf, + log: log, + file: file, + } + + return s, nil +} + +// Start starts the helloworld service. +func (s *svc) Start() { + s.log.Debug().Msgf("helloworld server started, saying hello at %s", s.conf.Outfile) + go s.sayHello(s.conf.Outfile) +} + +// Close stops the helloworld service. +func (s *svc) Close(ctx context.Context) error { + return s.file.Close() +} + +func (s *svc) sayHello(filename string) { + for { + s.log.Info().Msg("saying hello") + h := fmt.Sprintf("%s - hello world!\n", time.Now().String()) + + _, err := s.file.Write([]byte(h)) + if err != nil { + s.log.Err(err) + } + time.Sleep(5 * time.Second) + } +} diff --git a/internal/serverless/services/loader/loader.go b/internal/serverless/services/loader/loader.go new file mode 100644 index 0000000000..1b466a144c --- /dev/null +++ b/internal/serverless/services/loader/loader.go @@ -0,0 +1,25 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package loader + +import ( + // Load core serverless services. + _ "github.com/cs3org/reva/internal/serverless/services/helloworld" + // Add your own service here. +) diff --git a/pkg/rhttp/rhttp.go b/pkg/rhttp/rhttp.go index 9bd689f405..de000aaac1 100644 --- a/pkg/rhttp/rhttp.go +++ b/pkg/rhttp/rhttp.go @@ -200,7 +200,8 @@ func (s *Server) registerServices() error { for svcName := range s.conf.Services { if s.isServiceEnabled(svcName) { newFunc := global.Services[svcName] - svc, err := newFunc(s.conf.Services[svcName], &s.log) + svcLogger := s.log.With().Str("service", svcName).Logger() + svc, err := newFunc(s.conf.Services[svcName], &svcLogger) if err != nil { err = errors.Wrapf(err, "http service %s could not be started,", svcName) return err diff --git a/pkg/rserverless/rserverless.go b/pkg/rserverless/rserverless.go new file mode 100644 index 0000000000..7af26640c5 --- /dev/null +++ b/pkg/rserverless/rserverless.go @@ -0,0 +1,161 @@ +// Copyright 2018-2023 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package rserverless + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/rs/zerolog" +) + +// Service represents a serverless service. +type Service interface { + Start() + Close(ctx context.Context) error +} + +// Services is a map of service name and its new function. +var Services = map[string]NewService{} + +// Register registers a new serverless service with name and new function. +func Register(name string, newFunc NewService) { + Services[name] = newFunc +} + +// NewService is the function that serverless services need to register at init time. +type NewService func(conf map[string]interface{}, log *zerolog.Logger) (Service, error) + +// Serverless contains the serveless collection of services. +type Serverless struct { + conf *config + log zerolog.Logger + services map[string]Service +} + +type config struct { + Services map[string]map[string]interface{} `mapstructure:"services"` +} + +// New returns a new serverless collection of services. +func New(m interface{}, l zerolog.Logger) (*Serverless, error) { + conf := &config{} + if err := mapstructure.Decode(m, conf); err != nil { + return nil, err + } + + n := &Serverless{ + conf: conf, + log: l, + services: map[string]Service{}, + } + return n, nil +} + +func (s *Serverless) isServiceEnabled(svcName string) bool { + _, ok := Services[svcName] + return ok +} + +// Start starts the serverless service collection. +func (s *Serverless) Start() error { + return s.registerAndStartServices() +} + +// GracefulStop gracefully stops the serverless services. +func (s *Serverless) GracefulStop() error { + var wg sync.WaitGroup + + for svcName, svc := range s.services { + wg.Add(1) + + go func(svcName string, svc Service) { + defer wg.Done() + + s.log.Info().Msgf("Sending stop request to service %s", svcName) + ctx := context.Background() + + err := svc.Close(ctx) + if err != nil { + s.log.Error().Err(err).Msgf("error stopping service %s", svcName) + } else { + s.log.Info().Msgf("service %s stopped", svcName) + } + }(svcName, svc) + } + + wg.Wait() + + return nil +} + +// Stop stops the serverless services with a one second deadline. +func (s *Serverless) Stop() error { + var wg sync.WaitGroup + + for svcName, svc := range s.services { + wg.Add(1) + + go func(svcName string, svc Service) { + defer wg.Done() + + s.log.Info().Msgf("Sending stop request to service %s", svcName) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err := svc.Close(ctx) + if err != nil { + s.log.Error().Err(err).Msgf("error stopping service %s", svcName) + } else { + s.log.Info().Msgf("service %s stopped", svcName) + } + }(svcName, svc) + } + + wg.Wait() + + return nil +} + +func (s *Serverless) registerAndStartServices() error { + for svcName := range s.conf.Services { + if s.isServiceEnabled(svcName) { + newFunc := Services[svcName] + svcLogger := s.log.With().Str("service", svcName).Logger() + svc, err := newFunc(s.conf.Services[svcName], &svcLogger) + if err != nil { + return errors.Wrapf(err, "serverless service %s could not be initialized", svcName) + } + + go svc.Start() + + s.services[svcName] = svc + + s.log.Info().Msgf("serverless service enabled: %s", svcName) + } else { + return fmt.Errorf("serverless service %s does not exist", svcName) + } + } + + return nil +}