Skip to content

Commit

Permalink
Serverless services (#3824)
Browse files Browse the repository at this point in the history
* Add serverless services

* Load serverless services

* Start serverless services on launch

* Codacy changes

* Changelog

* Example serverless config

* Add example serverless service

* Add service name to logger

Co-authored-by: Gianmaria Del Monte <g.macmount@gmail.com>

* Simplify function call

Co-authored-by: Gianmaria Del Monte <g.macmount@gmail.com>

* Exit with errors if initserverless fails

Co-authored-by: Gianmaria Del Monte <g.macmount@gmail.com>

* Add signal handling to serverless services

* Use context to pass timeout on service stop

---------

Co-authored-by: Gianmaria Del Monte <g.macmount@gmail.com>
  • Loading branch information
javfg and gmgigi96 authored May 5, 2023
1 parent 80606f0 commit 1c681a3
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 7 deletions.
7 changes: 7 additions & 0 deletions changelog/unreleased/serverless-services.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: Serverless Services

New type of service (along with http and grpc)
which does not have a listening server. Useful for
the notifications service and others in the future.

https://github.com/cs3org/reva/pull/3824
26 changes: 26 additions & 0 deletions cmd/revad/internal/grace/grace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Watcher struct {
ppid int
lns map[string]net.Listener
ss map[string]Server
SL Serverless
pidFile string
childPIDs []int
}
Expand Down Expand Up @@ -254,6 +255,12 @@ type Server interface {
Address() string
}

// Serverless is the interface that the serverless server implements.
type Serverless interface {
Stop() error
GracefulStop() error
}

// TrapSignals captures the OS signal.
func (w *Watcher) TrapSignals() {
signalCh := make(chan os.Signal, 1024)
Expand Down Expand Up @@ -293,6 +300,11 @@ func (w *Watcher) TrapSignals() {
}
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
}
err := w.SL.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping serverless server")
}
w.log.Info().Msg("serverless services abruptly closed")
w.Exit(1)
}
}
Expand All @@ -306,6 +318,14 @@ func (w *Watcher) TrapSignals() {
w.Exit(1)
}
}
if w.SL != nil {
err := w.SL.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
w.log.Info().Msg("exit with error code 1")
w.Exit(1)
}
}
w.log.Info().Msg("exit with error code 0")
w.Exit(0)
case syscall.SIGINT, syscall.SIGTERM:
Expand All @@ -317,6 +337,12 @@ func (w *Watcher) TrapSignals() {
w.log.Error().Err(err).Msg("error stopping server")
}
}
err := w.SL.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping serverless server")
}
w.log.Info().Msg("serverless services abruptly closed")

w.Exit(0)
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/cs3org/reva/internal/http/interceptors/auth/tokenwriter/loader"
_ "github.com/cs3org/reva/internal/http/interceptors/loader"
_ "github.com/cs3org/reva/internal/http/services/loader"
_ "github.com/cs3org/reva/internal/serverless/services/loader"
_ "github.com/cs3org/reva/pkg/app/provider/loader"
_ "github.com/cs3org/reva/pkg/app/registry/loader"
_ "github.com/cs3org/reva/pkg/appauth/manager/loader"
Expand Down
48 changes: 42 additions & 6 deletions cmd/revad/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cs3org/reva/pkg/registry/memory"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/rhttp"
"github.com/cs3org/reva/pkg/rserverless"
"github.com/cs3org/reva/pkg/sharedconf"
rtrace "github.com/cs3org/reva/pkg/trace"
"github.com/cs3org/reva/pkg/utils"
Expand Down Expand Up @@ -93,13 +94,23 @@ func run(mainConf map[string]interface{}, coreConf *coreConf, logger *zerolog.Lo
initCPUCount(coreConf, logger)

servers := initServers(mainConf, logger)
serverless := initServerless(mainConf, logger)

if len(servers) == 0 && serverless == nil {
logger.Info().Msg("nothing to do, no grpc/http/serverless enabled_services declared in config")
os.Exit(1)
}

watcher, err := initWatcher(logger, filename)
if err != nil {
log.Panic(err)
}
listeners := initListeners(watcher, servers, logger)
if serverless != nil {
watcher.SL = serverless
}

start(mainConf, servers, listeners, logger, watcher)
start(mainConf, servers, serverless, listeners, logger, watcher)
}

func initListeners(watcher *grace.Watcher, servers map[string]grace.Server, log *zerolog.Logger) map[string]net.Listener {
Expand Down Expand Up @@ -141,13 +152,22 @@ func initServers(mainConf map[string]interface{}, log *zerolog.Logger) map[strin
servers["grpc"] = s
}

if len(servers) == 0 {
log.Info().Msg("nothing to do, no grpc/http enabled_services declared in config")
os.Exit(1)
}
return servers
}

func initServerless(mainConf map[string]interface{}, log *zerolog.Logger) *rserverless.Serverless {
if isEnabledServerless(mainConf) {
serverless, err := getServerless(mainConf["serverless"], log)
if err != nil {
log.Error().Err(err).Msg("error")
os.Exit(1)
}
return serverless
}

return nil
}

func initTracing(conf *coreConf) {
rtrace.SetTraceProvider(conf.TracingCollector, conf.TracingEndpoint, conf.TracingServiceName)
}
Expand Down Expand Up @@ -184,7 +204,7 @@ func handlePIDFlag(l *zerolog.Logger, pidFile string) (*grace.Watcher, error) {
return w, nil
}

func start(mainConf map[string]interface{}, servers map[string]grace.Server, listeners map[string]net.Listener, log *zerolog.Logger, watcher *grace.Watcher) {
func start(mainConf map[string]interface{}, servers map[string]grace.Server, serverless *rserverless.Serverless, listeners map[string]net.Listener, log *zerolog.Logger, watcher *grace.Watcher) {
if isEnabledHTTP(mainConf) {
go func() {
if err := servers["http"].(*rhttp.Server).Start(listeners["http"]); err != nil {
Expand All @@ -201,6 +221,13 @@ func start(mainConf map[string]interface{}, servers map[string]grace.Server, lis
}
}()
}
if isEnabledServerless(mainConf) {
if err := serverless.Start(); err != nil {
log.Error().Err(err).Msg("error starting serverless services")
watcher.Exit(1)
}
}

watcher.TrapSignals()
}

Expand Down Expand Up @@ -264,6 +291,11 @@ func getHTTPServer(conf interface{}, l *zerolog.Logger) (*rhttp.Server, error) {
return s, nil
}

func getServerless(conf interface{}, l *zerolog.Logger) (*rserverless.Serverless, error) {
sub := l.With().Str("pkg", "rserverless").Logger()
return rserverless.New(conf, sub)
}

// adjustCPU parses string cpu and sets GOMAXPROCS
//
// according to its value. It accepts either
Expand Down Expand Up @@ -365,6 +397,10 @@ func isEnabledGRPC(conf map[string]interface{}) bool {
return isEnabled("grpc", conf)
}

func isEnabledServerless(conf map[string]interface{}) bool {
return isEnabled("serverless", conf)
}

func isEnabled(key string, conf map[string]interface{}) bool {
if a, ok := conf[key]; ok {
if b, ok := a.(map[string]interface{}); ok {
Expand Down
32 changes: 32 additions & 0 deletions examples/serverless-example/notifications.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[log]
output = "/var/log/revad/revad-notifications.log"
mode = "json"

[shared]
gatewaysvc = "localhost:19000"
jwt_secret = "Pive-Fumkiu4"
skip_user_groups_in_token = true

[serverless.services.notifications]
nats_address = "nats-server-01.example.com"
nats_token = "secret-token-example"
nats_template_subject = "reva-notifications-template"
nats_notification_subject = "reva-notifications-notification"
nats_trigger_subject = "reva-notifications-trigger"
storage_driver = "sql"
grouping_interval = 60
grouping_maxsize = 100

[serverless.services.notifications.storage_drivers.sql]
db_username = "username"
db_password = "password"
db_host = "database.example.com"
db_port = 3306
db_name = "notifications"

[serverless.services.notifications.handlers.email]
smtp_server = "mx.example.com:25"
disable_auth = true
default_sender = "noreply@cernbox.cern.ch"

[tracing]
98 changes: 98 additions & 0 deletions internal/serverless/services/helloworld/helloworld.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2018-2023 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package helloworld

import (
"context"
"fmt"
"os"
"time"

"github.com/cs3org/reva/pkg/rserverless"
"github.com/mitchellh/mapstructure"
"github.com/rs/zerolog"
)

type config struct {
Outfile string `mapstructure:"outfile"`
}

func (c *config) init() {
if c.Outfile == "" {
c.Outfile = "/tmp/revad-helloworld-hello"
}
}

type svc struct {
conf *config
file *os.File
log *zerolog.Logger
}

func init() {
rserverless.Register("helloworld", New)
}

// New returns a new helloworld service.
func New(m map[string]interface{}, log *zerolog.Logger) (rserverless.Service, error) {
conf := &config{}
conf.init()

if err := mapstructure.Decode(m, conf); err != nil {
return nil, err
}

file, err := os.OpenFile(conf.Outfile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Err(err)
return nil, err
}

s := &svc{
conf: conf,
log: log,
file: file,
}

return s, nil
}

// Start starts the helloworld service.
func (s *svc) Start() {
s.log.Debug().Msgf("helloworld server started, saying hello at %s", s.conf.Outfile)
go s.sayHello(s.conf.Outfile)
}

// Close stops the helloworld service.
func (s *svc) Close(ctx context.Context) error {
return s.file.Close()
}

func (s *svc) sayHello(filename string) {
for {
s.log.Info().Msg("saying hello")
h := fmt.Sprintf("%s - hello world!\n", time.Now().String())

_, err := s.file.Write([]byte(h))
if err != nil {
s.log.Err(err)
}
time.Sleep(5 * time.Second)
}
}
25 changes: 25 additions & 0 deletions internal/serverless/services/loader/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2018-2023 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package loader

import (
// Load core serverless services.
_ "github.com/cs3org/reva/internal/serverless/services/helloworld"
// Add your own service here.
)
3 changes: 2 additions & 1 deletion pkg/rhttp/rhttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ func (s *Server) registerServices() error {
for svcName := range s.conf.Services {
if s.isServiceEnabled(svcName) {
newFunc := global.Services[svcName]
svc, err := newFunc(s.conf.Services[svcName], &s.log)
svcLogger := s.log.With().Str("service", svcName).Logger()
svc, err := newFunc(s.conf.Services[svcName], &svcLogger)
if err != nil {
err = errors.Wrapf(err, "http service %s could not be started,", svcName)
return err
Expand Down
Loading

0 comments on commit 1c681a3

Please sign in to comment.