Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-durable event streams #4411

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/non-durable-streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Add option to configure streams non durable

Adds an option to disable persistence of event streams

https://github.com/cs3org/reva/pull/4411
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/gdexlab/go-render v1.0.1
github.com/go-chi/chi/v5 v5.0.8
github.com/go-ldap/ldap/v3 v3.4.6
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9
github.com/go-micro/plugins/v4/server/http v1.2.1
github.com/go-micro/plugins/v4/store/nats-js v1.1.0
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-00010101000000-000000000000
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7 h1:/RpJVLKmKT2OcEnKCPaS6n+zygNzYDzwoYgPQEgcEiQ=
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20230807070816-bc05fb076ce7/go.mod h1:lYuiEYKQTpbE2LA8HEcC8D6kQ29M7ILfEak3dzeucEg=
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9 h1:YOIavj+ZgO9HzukpdXZCvQv+AahjW/fTVFVF4QFRabw=
github.com/go-micro/plugins/v4/events/natsjs v1.2.2-0.20231215124540-f7f8d3274bf9/go.mod h1:cL0O63th39fZ+M/aRJvajz7Qnmv+UTXugOq1k3qrYiQ=
github.com/go-micro/plugins/v4/registry/consul v1.2.1 h1:3wctYMtstwQLCjoJ1HA6mKGGFF1hcdKDv5MzHakB1jE=
github.com/go-micro/plugins/v4/registry/consul v1.2.1/go.mod h1:wTat7/K9XQ+i64VbbcMYFcEwipYfSgJM51HcA/sgsM4=
github.com/go-micro/plugins/v4/registry/etcd v1.2.0 h1:tcHlU1GzvX3oZa8WQH8ylMCGie5qD5g98YWTESJjeqQ=
Expand Down
2 changes: 1 addition & 1 deletion internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) {
if ok {
tlsCert = val.(string)
}
return stream.NatsFromConfig(m["name"].(string), stream.NatsConfig{
return stream.NatsFromConfig(m["name"].(string), false, stream.NatsConfig{
Endpoint: m["address"].(string),
Cluster: m["clusterID"].(string),
EnableTLS: m["enable-tls"].(bool),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,5 +1267,5 @@ func estreamFromConfig(c eventconfig) (events.Stream, error) {
return nil, nil
}

return stream.NatsFromConfig("storageprovider", stream.NatsConfig(c))
return stream.NatsFromConfig("storageprovider", false, stream.NatsConfig(c))
}
2 changes: 1 addition & 1 deletion internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
if conf.NatsAddress == "" || conf.NatsClusterID == "" {
log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.")
} else {
s, err := stream.NatsFromConfig("dataprovider", stream.NatsConfig{
s, err := stream.NatsFromConfig("dataprovider", false, stream.NatsConfig{
Endpoint: conf.NatsAddress,
Cluster: conf.NatsClusterID,
EnableTLS: conf.NatsEnableTLS,
Expand Down
2 changes: 1 addition & 1 deletion pkg/events/example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Server() {

// Client builds a nats client
func Client() events.Stream {
c, err := stream.NatsFromConfig("name of stream", stream.NatsConfig{
c, err := stream.NatsFromConfig("name of stream", false, stream.NatsConfig{
Endpoint: "127.0.0.1:9233",
Cluster: "test-cluster",
})
Expand Down
13 changes: 10 additions & 3 deletions pkg/events/stream/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type NatsConfig struct {
}

// NatsFromConfig returns a nats stream from the given config
func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) {
func NatsFromConfig(connName string, disableDurability bool, cfg NatsConfig) (events.Stream, error) {
var tlsConf *tls.Config
if cfg.EnableTLS {
var rootCAPool *x509.CertPool
Expand All @@ -48,13 +48,20 @@ func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) {
RootCAs: rootCAPool,
}
}
return Nats(

opts := []natsjs.Option{
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Endpoint),
natsjs.ClusterID(cfg.Cluster),
natsjs.SynchronousPublish(true),
natsjs.Name(connName),
)
}

if disableDurability {
opts = append(opts, natsjs.DisableDurableStreams())
}

return Nats(opts...)

}

Expand Down
2 changes: 1 addition & 1 deletion pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {

var es events.Stream
if c.Events.Endpoint != "" {
es, err = stream.NatsFromConfig("jsoncs3-share-manager", stream.NatsConfig(c.Events))
es, err = stream.NatsFromConfig("jsoncs3-share-manager", false, stream.NatsConfig(c.Events))
if err != nil {
return nil, err
}
Expand Down
Loading