Skip to content

Commit

Permalink
[FIXED] Add discard policy repair logic in CreateKeyValue (#1617)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Apr 18, 2024
1 parent fa996fc commit 9d4b227
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
21 changes: 20 additions & 1 deletion jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -488,8 +489,26 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
// errors are joined so that backwards compatibility is retained
// and previous checks for ErrStreamNameAlreadyInUse will still work.
err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err)

// If we have a failure to add, it could be because we have
// a config change if the KV was created against before a bug fix
// that changed the value of discard policy.
// We will check if the stream exists and if the only difference
// is the discard policy, we will update the stream.
// The same logic applies for KVs created pre 2.9.x and
// the AllowDirect setting.
if stream, _ = js.Stream(ctx, scfg.Name); stream != nil {
cfg := stream.CachedInfo().Config
cfg.Discard = scfg.Discard
cfg.AllowDirect = scfg.AllowDirect
if reflect.DeepEqual(cfg, scfg) {
stream, err = js.UpdateStream(ctx, scfg)
}
}
}
if err != nil {
return nil, err
}
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
Expand Down
55 changes: 55 additions & 0 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,3 +1634,58 @@ func TestKeyValueCompression(t *testing.T) {
t.Fatalf("Expected stream to be compressed with S2")
}
}

func TestKeyValueCreateRepairOldKV(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// create a standard kv
_, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "A",
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

// get stream config and set discard policy to old and AllowDirect to false
stream, err := js.Stream(ctx, "KV_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
streamCfg := stream.CachedInfo().Config
streamCfg.Discard = jetstream.DiscardOld
streamCfg.AllowDirect = false

// create a new kv with the same name - client should fix the config
_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "A",
})
if err != nil {
t.Fatalf("Error creating kv: %v", err)
}

// get stream config again and check if the discard policy is set to new
stream, err = js.Stream(ctx, "KV_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}
if stream.CachedInfo().Config.Discard != jetstream.DiscardNew {
t.Fatalf("Expected stream to have discard policy set to new")
}
if !stream.CachedInfo().Config.AllowDirect {
t.Fatalf("Expected stream to have AllowDirect set to true")
}

// attempting to create a new kv with the same name and different settings should fail
_, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "A",
Description: "New KV",
})
if !errors.Is(err, jetstream.ErrBucketExists) {
t.Fatalf("Expected error to be ErrBucketExists, got: %v", err)
}
}

0 comments on commit 9d4b227

Please sign in to comment.