diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 18d2d77ddb7..e46c5276b68 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -42,11 +42,15 @@ func describeEtcdResponse(request EtcdRequest, response EtcdResponse) string { func describeEtcdRequest(request EtcdRequest) string { switch request.Type { case Txn: - describeOperations := describeEtcdOperations(request.Txn.OperationsOnSuccess) + onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess) if len(request.Txn.Conditions) != 0 { - return fmt.Sprintf("if(%s).then(%s)", describeEtcdConditions(request.Txn.Conditions), describeOperations) + if len(request.Txn.OperationsOnFailure) == 0 { + return fmt.Sprintf("if(%s).then(%s)", describeEtcdConditions(request.Txn.Conditions), onSuccess) + } + onFailure := describeEtcdOperations(request.Txn.OperationsOnFailure) + return fmt.Sprintf("if(%s).then(%s).else(%s)", describeEtcdConditions(request.Txn.Conditions), onSuccess, onFailure) } - return describeOperations + return onSuccess case LeaseGrant: return fmt.Sprintf("leaseGrant(%d)", request.LeaseGrant.LeaseID) case LeaseRevoke: @@ -75,14 +79,23 @@ func describeEtcdOperations(ops []EtcdOperation) string { } func describeTxnResponse(request *TxnRequest, response *TxnResponse) string { - if response.Failure { - return fmt.Sprintf("txn failed") - } respDescription := make([]string, len(response.Results)) - for i := range response.Results { - respDescription[i] = describeEtcdOperationResponse(request.OperationsOnSuccess[i], response.Results[i]) + for i, result := range response.Results { + if response.Failure { + respDescription[i] = describeEtcdOperationResponse(request.OperationsOnFailure[i], result) + } else { + respDescription[i] = describeEtcdOperationResponse(request.OperationsOnSuccess[i], result) + } + } + description := strings.Join(respDescription, ", ") + if len(request.Conditions) == 0 { + return description + } + if response.Failure { + return fmt.Sprintf("failure(%s)", description) + } else { + return fmt.Sprintf("success(%s)", description) } - return strings.Join(respDescription, ", ") } func describeEtcdOperation(op EtcdOperation) string { diff --git a/tests/robustness/model/describe_test.go b/tests/robustness/model/describe_test.go index 447e41c62a5..17540e3ce5a 100644 --- a/tests/robustness/model/describe_test.go +++ b/tests/robustness/model/describe_test.go @@ -81,13 +81,13 @@ func TestModelDescribe(t *testing.T) { }, { req: compareRevisionAndPutRequest("key7", 7, "77"), - resp: compareRevisionAndPutResponse(false, 7), - expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> txn failed, rev: 7`, + resp: txnEmptyResponse(false, 7), + expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> failure(), rev: 7`, }, { req: compareRevisionAndPutRequest("key8", 8, "88"), - resp: compareRevisionAndPutResponse(true, 8), - expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> ok, rev: 8`, + resp: txnPutResponse(true, 8), + expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> success(ok), rev: 8`, }, { req: compareRevisionAndPutRequest("key9", 9, "99"), @@ -95,7 +95,17 @@ func TestModelDescribe(t *testing.T) { expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`, }, { - req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}), + req: txnRequest([]EtcdCondition{{Key: "key9b", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9b", Value: ValueOrHash{Value: "991"}}}, []EtcdOperation{{Type: Range, Key: "key9b"}}), + resp: txnResponse([]EtcdOperationResult{{}}, true, 10), + expectDescribe: `if(mod_rev(key9b)==9).then(put("key9b", "991")).else(get("key9b")) -> success(ok), rev: 10`, + }, + { + req: txnRequest([]EtcdCondition{{Key: "key9c", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9c", Value: ValueOrHash{Value: "992"}}}, []EtcdOperation{{Type: Range, Key: "key9c"}}), + resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{Key: "key9c", ValueRevision: ValueRevision{Value: ValueOrHash{Value: "993"}, ModRevision: 10}}}}}, false, 10), + expectDescribe: `if(mod_rev(key9c)==9).then(put("key9c", "992")).else(get("key9c")) -> failure("993"), rev: 10`, + }, + { + req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}, nil), resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}, {}, {Deleted: 1}}, true, 10), expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`, }, diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index 2c0b88f56a8..e01293853cc 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -127,19 +127,20 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) { s.KeyValues = newKVs switch request.Type { case Txn: - success := true + failure := false for _, cond := range request.Txn.Conditions { if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision { - success = false + failure = true break } } - if !success { - return s, EtcdResponse{Revision: s.Revision, Txn: &TxnResponse{Failure: true}} + operations := request.Txn.OperationsOnSuccess + if failure { + operations = request.Txn.OperationsOnFailure } - opResp := make([]EtcdOperationResult, len(request.Txn.OperationsOnSuccess)) + opResp := make([]EtcdOperationResult, len(operations)) increaseRevision := false - for i, op := range request.Txn.OperationsOnSuccess { + for i, op := range operations { switch op.Type { case Range: opResp[i] = EtcdOperationResult{ @@ -198,7 +199,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) { if increaseRevision { s.Revision += 1 } - return s, EtcdResponse{Txn: &TxnResponse{Results: opResp}, Revision: s.Revision} + return s, EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: s.Revision} case LeaseGrant: lease := EtcdLease{ LeaseID: request.LeaseGrant.LeaseID, @@ -266,6 +267,7 @@ type EtcdRequest struct { type TxnRequest struct { Conditions []EtcdCondition OperationsOnSuccess []EtcdOperation + OperationsOnFailure []EtcdOperation } type EtcdCondition struct { diff --git a/tests/robustness/model/deterministic_test.go b/tests/robustness/model/deterministic_test.go index d0ecdf3bad4..f2b11f5f4be 100644 --- a/tests/robustness/model/deterministic_test.go +++ b/tests/robustness/model/deterministic_test.go @@ -191,7 +191,7 @@ func TestModelBase(t *testing.T) { }, }, { - name: "Txn sets new value if value matches expected", + name: "Txn executes onSuccess if revision matches expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse}, {req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true}, @@ -206,28 +206,26 @@ func TestModelBase(t *testing.T) { }, }, { - name: "Txn can expect on empty key", + name: "Txn can expect on key not existing", operations: []testOperation{ {req: getRequest("key1"), resp: emptyGetResponse(1).EtcdResponse}, {req: compareRevisionAndPutRequest("key1", 0, "2"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse}, - {req: compareRevisionAndPutRequest("key2", 0, "3"), resp: compareRevisionAndPutResponse(true, 3).EtcdResponse}, - {req: compareRevisionAndPutRequest("key3", 4, "4"), resp: compareRevisionAndPutResponse(false, 4).EtcdResponse, failure: true}, + {req: compareRevisionAndPutRequest("key1", 0, "3"), resp: compareRevisionAndPutResponse(true, 3).EtcdResponse, failure: true}, + {req: txnRequestSingleOperation(compareRevision("key1", 0), putOperation("key1", "4"), putOperation("key1", "5")), resp: txnPutResponse(false, 3).EtcdResponse}, + {req: getRequest("key1"), resp: getResponse("key1", "5", 3, 3).EtcdResponse}, + {req: compareRevisionAndPutRequest("key2", 0, "6"), resp: compareRevisionAndPutResponse(true, 4).EtcdResponse}, }, }, { - name: "Txn doesn't do anything if value doesn't match expected", + name: "Txn executes onFailure if revision doesn't match expected", operations: []testOperation{ {req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse}, - {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse, failure: true}, - {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true}, - {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 2).EtcdResponse, failure: true}, - {req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 1).EtcdResponse}, - {req: getRequest("key"), resp: getResponse("key", "2", 1, 1).EtcdResponse, failure: true}, - {req: getRequest("key"), resp: getResponse("key", "2", 2, 2).EtcdResponse, failure: true}, - {req: getRequest("key"), resp: getResponse("key", "3", 1, 1).EtcdResponse, failure: true}, - {req: getRequest("key"), resp: getResponse("key", "3", 1, 2).EtcdResponse, failure: true}, - {req: getRequest("key"), resp: getResponse("key", "3", 2, 2).EtcdResponse, failure: true}, - {req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse}, + {req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnPutResponse(false, 2).EtcdResponse, failure: true}, + {req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(false, 2).EtcdResponse, failure: true}, + {req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(true, 2).EtcdResponse, failure: true}, + {req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnPutResponse(true, 1).EtcdResponse, failure: true}, + {req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(true, 1).EtcdResponse}, + {req: txnRequestSingleOperation(compareRevision("key", 2), nil, putOperation("key", "2")), resp: txnPutResponse(false, 2).EtcdResponse}, }, }, { diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index 4198dd86a92..10a09468c34 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -189,58 +189,20 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r }) } -func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedRevision int64, start, end time.Duration, resp *clientv3.TxnResponse, err error) { - request := compareRevisionAndDeleteRequest(key, expectedRevision) - if err != nil { - h.appendFailed(request, start.Nanoseconds(), err) - return - } - var revision int64 - if resp != nil && resp.Header != nil { - revision = resp.Header.Revision - } - var deleted int64 - if resp != nil && len(resp.Responses) > 0 { - deleted = resp.Responses[0].GetResponseDeleteRange().Deleted - } - h.appendSuccessful(porcupine.Operation{ - ClientId: h.streamId, - Input: request, - Call: start.Nanoseconds(), - Output: compareRevisionAndDeleteResponse(resp.Succeeded, deleted, revision), - Return: end.Nanoseconds(), - }) - -} -func (h *AppendableHistory) AppendCompareRevisionAndPut(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) { - request := compareRevisionAndPutRequest(key, expectedRevision, value) - if err != nil { - h.appendFailed(request, start.Nanoseconds(), err) - return - } - var revision int64 - if resp != nil && resp.Header != nil { - revision = resp.Header.Revision - } - h.appendSuccessful(porcupine.Operation{ - ClientId: h.streamId, - Input: request, - Call: start.Nanoseconds(), - Output: compareRevisionAndPutResponse(resp.Succeeded, revision), - Return: end.Nanoseconds(), - }) -} - -func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) { +func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) { conds := []EtcdCondition{} for _, cmp := range cmp { conds = append(conds, toEtcdCondition(cmp)) } - ops := []EtcdOperation{} - for _, op := range onSuccess { - ops = append(ops, toEtcdOperation(op)) + modelOnSuccess := []EtcdOperation{} + for _, op := range clientOnSuccessOps { + modelOnSuccess = append(modelOnSuccess, toEtcdOperation(op)) + } + modelOnFailure := []EtcdOperation{} + for _, op := range clientOnFailure { + modelOnFailure = append(modelOnFailure, toEtcdOperation(op)) } - request := txnRequest(conds, ops) + request := txnRequest(conds, modelOnSuccess, modelOnFailure) if err != nil { h.appendFailed(request, start.Nanoseconds(), err) return @@ -293,6 +255,7 @@ func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) { default: panic(fmt.Sprintf("Compare not supported, target: %q, result: %q", cmp.Target, cmp.Result)) } + cond.ExpectedRevision = cmp.TargetUnion.(*etcdserverpb.Compare_ModRevision).ModRevision return cond } @@ -447,32 +410,51 @@ func deleteResponse(deleted int64, revision int64) EtcdNonDeterministicResponse return EtcdNonDeterministicResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}} } -func compareRevisionAndDeleteRequest(key string, expectedRevision int64) EtcdRequest { - return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Delete, Key: key}}) -} - func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest { - return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}}) + return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil) } func compareRevisionAndPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse { - var result []EtcdOperationResult if succeeded { - result = []EtcdOperationResult{{}} + return txnPutResponse(succeeded, revision) } - return txnResponse(result, succeeded, revision) + return txnEmptyResponse(succeeded, revision) } -func compareRevisionAndDeleteResponse(succeeded bool, deleted, revision int64) EtcdNonDeterministicResponse { - var result []EtcdOperationResult - if succeeded { - result = []EtcdOperationResult{{Deleted: deleted}} +func compareRevision(key string, expectedRevision int64) *EtcdCondition { + return &EtcdCondition{Key: key, ExpectedRevision: expectedRevision} +} + +func putOperation(key, value string) *EtcdOperation { + return &EtcdOperation{Type: Put, Key: key, Value: ToValueOrHash(value)} +} + +func txnRequestSingleOperation(cond *EtcdCondition, onSuccess, onFailure *EtcdOperation) EtcdRequest { + var conds []EtcdCondition + if cond != nil { + conds = []EtcdCondition{*cond} } - return txnResponse(result, succeeded, revision) + var onSuccess2 []EtcdOperation + if onSuccess != nil { + onSuccess2 = []EtcdOperation{*onSuccess} + } + var onFailure2 []EtcdOperation + if onFailure != nil { + onFailure2 = []EtcdOperation{*onFailure} + } + return txnRequest(conds, onSuccess2, onFailure2) +} + +func txnRequest(conds []EtcdCondition, onSuccess, onFailure []EtcdOperation) EtcdRequest { + return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conditions: conds, OperationsOnSuccess: onSuccess, OperationsOnFailure: onFailure}} +} + +func txnPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse { + return txnResponse([]EtcdOperationResult{{}}, succeeded, revision) } -func txnRequest(conds []EtcdCondition, onSuccess []EtcdOperation) EtcdRequest { - return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conditions: conds, OperationsOnSuccess: onSuccess}} +func txnEmptyResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse { + return txnResponse([]EtcdOperationResult{}, succeeded, revision) } func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) EtcdNonDeterministicResponse { diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index ec47dc08cc8..c36e97c2119 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -131,56 +131,21 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) error { return nil } -func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error { - txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key)) - c.opMux.Lock() - defer c.opMux.Unlock() - callTime := time.Since(c.baseTime) - resp, err := txn.Commit() - returnTime := time.Since(c.baseTime) - c.operations.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err) - return err -} - -func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error { - txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value)) - c.opMux.Lock() - defer c.opMux.Unlock() - callTime := time.Since(c.baseTime) - resp, err := txn.Commit() - returnTime := time.Since(c.baseTime) - c.operations.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err) - return err -} - -func (c *RecordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn { - txn := c.client.Txn(ctx) - var cmp clientv3.Cmp - if expectedRevision == 0 { - cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0) - } else { - cmp = clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision) - } - return txn.If( - cmp, - ).Then( - op, - ) -} - -func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error { +func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) { txn := c.client.Txn(ctx).If( - cmp..., + conditions..., ).Then( - ops..., + onSuccess..., + ).Else( + onFailure..., ) c.opMux.Lock() defer c.opMux.Unlock() callTime := time.Since(c.baseTime) resp, err := txn.Commit() returnTime := time.Since(c.baseTime) - c.operations.AppendTxn(cmp, ops, callTime, returnTime, resp, err) - return err + c.operations.AppendTxn(conditions, onSuccess, onFailure, callTime, returnTime, resp, err) + return resp, err } func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) { diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 589ed31a1c6..864472fef3e 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -122,7 +122,7 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) var err error - switch etcdRequestType(pickRandom(t.writeChoices)) { + switch pickRandom(t.writeChoices) { case Put: err = c.Put(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId())) case LargePut: @@ -130,13 +130,14 @@ func (t etcdTraffic) Write(ctx context.Context, c *RecordingClient, limiter *rat case Delete: err = c.Delete(writeCtx, key) case MultiOpTxn: - err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id)) + _, err = c.Txn(writeCtx, nil, t.pickMultiTxnOps(id), nil) case CompareAndSet: - var expectRevision int64 + var expectedRevision int64 if lastValues != nil { - expectRevision = lastValues.ModRevision + expectedRevision = lastValues.ModRevision } - err = c.CompareRevisionAndPut(writeCtx, key, fmt.Sprintf("%d", id.NewRequestId()), expectRevision) + value := fmt.Sprintf("%d", id.NewRequestId()) + _, err = c.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, value)}, nil) case PutWithLease: leaseId := lm.LeaseId(cid) if leaseId == 0 { diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 63f44032949..5cd9bfb0bb5 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -56,15 +56,8 @@ type kubernetesTraffic struct { writeChoices []choiceWeight[KubernetesRequestType] } -type KubernetesRequestType string - -const ( - KubernetesUpdate KubernetesRequestType = "update" - KubernetesCreate KubernetesRequestType = "create" - KubernetesDelete KubernetesRequestType = "delete" -) - func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) { + kc := &kubernetesClient{client: c} s := newStorage() keyPrefix := "/registry/" + t.resource + "/" g := errgroup.Group{} @@ -78,7 +71,9 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl return nil default: } - resp, err := t.Range(ctx, c, keyPrefix, true) + listCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + resp, err := kc.List(listCtx, keyPrefix) + cancel() if err != nil { continue } @@ -103,14 +98,18 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl } // Avoid multiple failed writes in a row if lastWriteFailed { - resp, err := t.Range(ctx, c, keyPrefix, true) + listCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + resp, err := kc.List(listCtx, keyPrefix) + cancel() if err != nil { continue } s.Reset(resp) limiter.Wait(ctx) } - err := t.Write(ctx, c, ids, s) + writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + err := t.Write(writeCtx, kc, ids, s) + cancel() lastWriteFailed = err != nil if err != nil { continue @@ -121,28 +120,26 @@ func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingCl g.Wait() } -func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, s *storage) (err error) { - writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - defer cancel() +func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage) (err error) { count := s.Count() if count < t.averageKeyCount/2 { - err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) + err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) } else { key, rev := s.PickRandom() if rev == 0 { return errors.New("storage empty") } if count > t.averageKeyCount*3/2 { - err = t.Delete(writeCtx, c, key, rev) + _, err = kc.OptimisticDelete(ctx, key, rev) } else { op := pickRandom(t.writeChoices) switch op { case KubernetesDelete: - err = t.Delete(writeCtx, c, key, rev) + _, err = kc.OptimisticDelete(ctx, key, rev) case KubernetesUpdate: - err = t.Update(writeCtx, c, key, fmt.Sprintf("%d", ids.NewRequestId()), rev) + _, err = kc.OptimisticUpdate(ctx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev) case KubernetesCreate: - err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) + err = kc.OptimisticCreate(ctx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId())) default: panic(fmt.Sprintf("invalid choice: %q", op)) } @@ -155,31 +152,58 @@ func (t kubernetesTraffic) generateKey() string { return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5)) } -func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) (*clientv3.GetResponse, error) { - ctx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := c.Range(ctx, key, withPrefix) - cancel() +type KubernetesRequestType string + +const ( + KubernetesDelete KubernetesRequestType = "delete" + KubernetesUpdate KubernetesRequestType = "update" + KubernetesCreate KubernetesRequestType = "create" +) + +type kubernetesClient struct { + client *RecordingClient +} + +func (k kubernetesClient) List(ctx context.Context, key string) (*clientv3.GetResponse, error) { + resp, err := k.client.Range(ctx, key, true) + if err != nil { + return nil, err + } return resp, err } -func (t kubernetesTraffic) Create(ctx context.Context, c *RecordingClient, key, value string) error { - return t.Update(ctx, c, key, value, 0) +func (k kubernetesClient) OptimisticDelete(ctx context.Context, key string, expectedRevision int64) (*mvccpb.KeyValue, error) { + return k.optimisticOperationOrGet(ctx, key, clientv3.OpDelete(key), expectedRevision) } -func (t kubernetesTraffic) Update(ctx context.Context, c *RecordingClient, key, value string, expectedRevision int64) error { - ctx, cancel := context.WithTimeout(ctx, RequestTimeout) - err := c.CompareRevisionAndPut(ctx, key, value, expectedRevision) - cancel() - return err +func (k kubernetesClient) OptimisticUpdate(ctx context.Context, key, value string, expectedRevision int64) (*mvccpb.KeyValue, error) { + return k.optimisticOperationOrGet(ctx, key, clientv3.OpPut(key, value), expectedRevision) } -func (t kubernetesTraffic) Delete(ctx context.Context, c *RecordingClient, key string, expectedRevision int64) error { - ctx, cancel := context.WithTimeout(ctx, RequestTimeout) - err := c.CompareRevisionAndDelete(ctx, key, expectedRevision) - cancel() +func (k kubernetesClient) OptimisticCreate(ctx context.Context, key, value string) error { + _, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", 0)}, []clientv3.Op{clientv3.OpPut(key, value)}, nil) return err } +// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing. +// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure. +func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) { + resp, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{operation}, []clientv3.Op{clientv3.OpGet(key)}) + if err != nil { + return nil, err + } + if !resp.Succeeded { + getResp := (*clientv3.GetResponse)(resp.Responses[0].GetResponseRange()) + if err != nil || len(getResp.Kvs) == 0 { + return nil, err + } + if len(getResp.Kvs) == 1 { + return getResp.Kvs[0], err + } + } + return nil, err +} + type storage struct { mux sync.RWMutex keyRevision map[string]int64