diff --git a/changelog/unreleased/non-durable-streams.md b/changelog/unreleased/non-durable-streams.md new file mode 100644 index 0000000000..e1b6eb372a --- /dev/null +++ b/changelog/unreleased/non-durable-streams.md @@ -0,0 +1,5 @@ +Enhancement: Add option to configure streams non durable + +Adds an option to disable persistence of event streams + +https://github.com/cs3org/reva/pull/4411 diff --git a/go.mod b/go.mod index 4a1338ce10..4430a570b5 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/gdexlab/go-render v1.0.1 github.com/go-chi/chi/v5 v5.0.8 github.com/go-ldap/ldap/v3 v3.4.6 - github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 + github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9 github.com/go-micro/plugins/v4/server/http v1.2.1 github.com/go-micro/plugins/v4/store/nats-js v1.1.0 github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-00010101000000-000000000000 diff --git a/go.sum b/go.sum index eba6267bab..189a8bd9f0 100644 --- a/go.sum +++ b/go.sum @@ -618,6 +618,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 h1:/RpJVLKmKT2OcEnKCPaS6n+zygNzYDzwoYgPQEgcEiQ= github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7/go.mod h1:lYuiEYKQTpbE2LA8HEcC8D6kQ29M7ILfEak3dzeucEg= +github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9 h1:YOIavj+ZgO9HzukpdXZCvQv+AahjW/fTVFVF4QFRabw= +github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9/go.mod h1:cL0O63th39fZ+M/aRJvajz7Qnmv+UTXugOq1k3qrYiQ= github.com/go-micro/plugins/v4/registry/consul v1.2.1 h1:3wctYMtstwQLCjoJ1HA6mKGGFF1hcdKDv5MzHakB1jE= github.com/go-micro/plugins/v4/registry/consul v1.2.1/go.mod h1:wTat7/K9XQ+i64VbbcMYFcEwipYfSgJM51HcA/sgsM4= github.com/go-micro/plugins/v4/registry/etcd v1.2.0 h1:tcHlU1GzvX3oZa8WQH8ylMCGie5qD5g98YWTESJjeqQ= diff --git a/internal/grpc/interceptors/eventsmiddleware/events.go b/internal/grpc/interceptors/eventsmiddleware/events.go index 35c4045d42..4162503d1d 100644 --- a/internal/grpc/interceptors/eventsmiddleware/events.go +++ b/internal/grpc/interceptors/eventsmiddleware/events.go @@ -228,7 +228,7 @@ func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) { if ok { tlsCert = val.(string) } - return stream.NatsFromConfig(m["name"].(string), stream.NatsConfig{ + return stream.NatsFromConfig(m["name"].(string), false, stream.NatsConfig{ Endpoint: m["address"].(string), Cluster: m["clusterID"].(string), EnableTLS: m["enable-tls"].(bool), diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index f7812783cf..5b1274c0e4 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -1267,5 +1267,5 @@ func estreamFromConfig(c eventconfig) (events.Stream, error) { return nil, nil } - return stream.NatsFromConfig("storageprovider", stream.NatsConfig(c)) + return stream.NatsFromConfig("storageprovider", false, stream.NatsConfig(c)) } diff --git a/internal/http/services/dataprovider/dataprovider.go b/internal/http/services/dataprovider/dataprovider.go index 8c3a31195f..cb930b95a7 100644 --- a/internal/http/services/dataprovider/dataprovider.go +++ b/internal/http/services/dataprovider/dataprovider.go @@ -80,7 +80,7 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) if conf.NatsAddress == "" || conf.NatsClusterID == "" { log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.") } else { - s, err := stream.NatsFromConfig("dataprovider", stream.NatsConfig{ + s, err := stream.NatsFromConfig("dataprovider", false, stream.NatsConfig{ Endpoint: conf.NatsAddress, Cluster: conf.NatsClusterID, EnableTLS: conf.NatsEnableTLS, diff --git a/pkg/events/example/example.go b/pkg/events/example/example.go index 37f1568a30..3caf62a78b 100644 --- a/pkg/events/example/example.go +++ b/pkg/events/example/example.go @@ -67,7 +67,7 @@ func Server() { // Client builds a nats client func Client() events.Stream { - c, err := stream.NatsFromConfig("name of stream", stream.NatsConfig{ + c, err := stream.NatsFromConfig("name of stream", false, stream.NatsConfig{ Endpoint: "127.0.0.1:9233", Cluster: "test-cluster", }) diff --git a/pkg/events/stream/nats.go b/pkg/events/stream/nats.go index f302822fc7..4910d694e2 100644 --- a/pkg/events/stream/nats.go +++ b/pkg/events/stream/nats.go @@ -25,7 +25,7 @@ type NatsConfig struct { } // NatsFromConfig returns a nats stream from the given config -func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) { +func NatsFromConfig(connName string, disableDurability bool, cfg NatsConfig) (events.Stream, error) { var tlsConf *tls.Config if cfg.EnableTLS { var rootCAPool *x509.CertPool @@ -48,13 +48,20 @@ func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) { RootCAs: rootCAPool, } } - return Nats( + + opts := []natsjs.Option{ natsjs.TLSConfig(tlsConf), natsjs.Address(cfg.Endpoint), natsjs.ClusterID(cfg.Cluster), natsjs.SynchronousPublish(true), natsjs.Name(connName), - ) + } + + if disableDurability { + opts = append(opts, natsjs.DisableDurableStreams()) + } + + return Nats(opts...) } diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 220c34b3f6..f0fa2cbd51 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -175,7 +175,7 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { var es events.Stream if c.Events.Endpoint != "" { - es, err = stream.NatsFromConfig("jsoncs3-share-manager", stream.NatsConfig(c.Events)) + es, err = stream.NatsFromConfig("jsoncs3-share-manager", false, stream.NatsConfig(c.Events)) if err != nil { return nil, err }