Skip to content

Commit

Permalink
Merge pull request #15888 from serathius/robustness-k8s-client
Browse files Browse the repository at this point in the history
tests/robustness: Implement Kubernetes optimistic concurrency operations
  • Loading branch information
serathius authored May 15, 2023
2 parents 0efa1c1 + 6e53792 commit 4675e5c
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 179 deletions.
31 changes: 22 additions & 9 deletions tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ func describeEtcdResponse(request EtcdRequest, response EtcdResponse) string {
func describeEtcdRequest(request EtcdRequest) string {
switch request.Type {
case Txn:
describeOperations := describeEtcdOperations(request.Txn.OperationsOnSuccess)
onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess)
if len(request.Txn.Conditions) != 0 {
return fmt.Sprintf("if(%s).then(%s)", describeEtcdConditions(request.Txn.Conditions), describeOperations)
if len(request.Txn.OperationsOnFailure) == 0 {
return fmt.Sprintf("if(%s).then(%s)", describeEtcdConditions(request.Txn.Conditions), onSuccess)
}
onFailure := describeEtcdOperations(request.Txn.OperationsOnFailure)
return fmt.Sprintf("if(%s).then(%s).else(%s)", describeEtcdConditions(request.Txn.Conditions), onSuccess, onFailure)
}
return describeOperations
return onSuccess
case LeaseGrant:
return fmt.Sprintf("leaseGrant(%d)", request.LeaseGrant.LeaseID)
case LeaseRevoke:
Expand Down Expand Up @@ -75,14 +79,23 @@ func describeEtcdOperations(ops []EtcdOperation) string {
}

func describeTxnResponse(request *TxnRequest, response *TxnResponse) string {
if response.Failure {
return fmt.Sprintf("txn failed")
}
respDescription := make([]string, len(response.Results))
for i := range response.Results {
respDescription[i] = describeEtcdOperationResponse(request.OperationsOnSuccess[i], response.Results[i])
for i, result := range response.Results {
if response.Failure {
respDescription[i] = describeEtcdOperationResponse(request.OperationsOnFailure[i], result)
} else {
respDescription[i] = describeEtcdOperationResponse(request.OperationsOnSuccess[i], result)
}
}
description := strings.Join(respDescription, ", ")
if len(request.Conditions) == 0 {
return description
}
if response.Failure {
return fmt.Sprintf("failure(%s)", description)
} else {
return fmt.Sprintf("success(%s)", description)
}
return strings.Join(respDescription, ", ")
}

func describeEtcdOperation(op EtcdOperation) string {
Expand Down
20 changes: 15 additions & 5 deletions tests/robustness/model/describe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,31 @@ func TestModelDescribe(t *testing.T) {
},
{
req: compareRevisionAndPutRequest("key7", 7, "77"),
resp: compareRevisionAndPutResponse(false, 7),
expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> txn failed, rev: 7`,
resp: txnEmptyResponse(false, 7),
expectDescribe: `if(mod_rev(key7)==7).then(put("key7", "77")) -> failure(), rev: 7`,
},
{
req: compareRevisionAndPutRequest("key8", 8, "88"),
resp: compareRevisionAndPutResponse(true, 8),
expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> ok, rev: 8`,
resp: txnPutResponse(true, 8),
expectDescribe: `if(mod_rev(key8)==8).then(put("key8", "88")) -> success(ok), rev: 8`,
},
{
req: compareRevisionAndPutRequest("key9", 9, "99"),
resp: failedResponse(errors.New("failed")),
expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`,
},
{
req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}),
req: txnRequest([]EtcdCondition{{Key: "key9b", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9b", Value: ValueOrHash{Value: "991"}}}, []EtcdOperation{{Type: Range, Key: "key9b"}}),
resp: txnResponse([]EtcdOperationResult{{}}, true, 10),
expectDescribe: `if(mod_rev(key9b)==9).then(put("key9b", "991")).else(get("key9b")) -> success(ok), rev: 10`,
},
{
req: txnRequest([]EtcdCondition{{Key: "key9c", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9c", Value: ValueOrHash{Value: "992"}}}, []EtcdOperation{{Type: Range, Key: "key9c"}}),
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{Key: "key9c", ValueRevision: ValueRevision{Value: ValueOrHash{Value: "993"}, ModRevision: 10}}}}}, false, 10),
expectDescribe: `if(mod_rev(key9c)==9).then(put("key9c", "992")).else(get("key9c")) -> failure("993"), rev: 10`,
},
{
req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}, nil),
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}, {}, {Deleted: 1}}, true, 10),
expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`,
},
Expand Down
16 changes: 9 additions & 7 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,20 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
s.KeyValues = newKVs
switch request.Type {
case Txn:
success := true
failure := false
for _, cond := range request.Txn.Conditions {
if val := s.KeyValues[cond.Key]; val.ModRevision != cond.ExpectedRevision {
success = false
failure = true
break
}
}
if !success {
return s, EtcdResponse{Revision: s.Revision, Txn: &TxnResponse{Failure: true}}
operations := request.Txn.OperationsOnSuccess
if failure {
operations = request.Txn.OperationsOnFailure
}
opResp := make([]EtcdOperationResult, len(request.Txn.OperationsOnSuccess))
opResp := make([]EtcdOperationResult, len(operations))
increaseRevision := false
for i, op := range request.Txn.OperationsOnSuccess {
for i, op := range operations {
switch op.Type {
case Range:
opResp[i] = EtcdOperationResult{
Expand Down Expand Up @@ -198,7 +199,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
if increaseRevision {
s.Revision += 1
}
return s, EtcdResponse{Txn: &TxnResponse{Results: opResp}, Revision: s.Revision}
return s, EtcdResponse{Txn: &TxnResponse{Failure: failure, Results: opResp}, Revision: s.Revision}
case LeaseGrant:
lease := EtcdLease{
LeaseID: request.LeaseGrant.LeaseID,
Expand Down Expand Up @@ -266,6 +267,7 @@ type EtcdRequest struct {
type TxnRequest struct {
Conditions []EtcdCondition
OperationsOnSuccess []EtcdOperation
OperationsOnFailure []EtcdOperation
}

type EtcdCondition struct {
Expand Down
28 changes: 13 additions & 15 deletions tests/robustness/model/deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestModelBase(t *testing.T) {
},
},
{
name: "Txn sets new value if value matches expected",
name: "Txn executes onSuccess if revision matches expected",
operations: []testOperation{
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
{req: compareRevisionAndPutRequest("key", 1, "2"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true},
Expand All @@ -206,28 +206,26 @@ func TestModelBase(t *testing.T) {
},
},
{
name: "Txn can expect on empty key",
name: "Txn can expect on key not existing",
operations: []testOperation{
{req: getRequest("key1"), resp: emptyGetResponse(1).EtcdResponse},
{req: compareRevisionAndPutRequest("key1", 0, "2"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse},
{req: compareRevisionAndPutRequest("key2", 0, "3"), resp: compareRevisionAndPutResponse(true, 3).EtcdResponse},
{req: compareRevisionAndPutRequest("key3", 4, "4"), resp: compareRevisionAndPutResponse(false, 4).EtcdResponse, failure: true},
{req: compareRevisionAndPutRequest("key1", 0, "3"), resp: compareRevisionAndPutResponse(true, 3).EtcdResponse, failure: true},
{req: txnRequestSingleOperation(compareRevision("key1", 0), putOperation("key1", "4"), putOperation("key1", "5")), resp: txnPutResponse(false, 3).EtcdResponse},
{req: getRequest("key1"), resp: getResponse("key1", "5", 3, 3).EtcdResponse},
{req: compareRevisionAndPutRequest("key2", 0, "6"), resp: compareRevisionAndPutResponse(true, 4).EtcdResponse},
},
},
{
name: "Txn doesn't do anything if value doesn't match expected",
name: "Txn executes onFailure if revision doesn't match expected",
operations: []testOperation{
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 2).EtcdResponse, failure: true},
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(true, 1).EtcdResponse, failure: true},
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 2).EtcdResponse, failure: true},
{req: compareRevisionAndPutRequest("key", 2, "3"), resp: compareRevisionAndPutResponse(false, 1).EtcdResponse},
{req: getRequest("key"), resp: getResponse("key", "2", 1, 1).EtcdResponse, failure: true},
{req: getRequest("key"), resp: getResponse("key", "2", 2, 2).EtcdResponse, failure: true},
{req: getRequest("key"), resp: getResponse("key", "3", 1, 1).EtcdResponse, failure: true},
{req: getRequest("key"), resp: getResponse("key", "3", 1, 2).EtcdResponse, failure: true},
{req: getRequest("key"), resp: getResponse("key", "3", 2, 2).EtcdResponse, failure: true},
{req: getRequest("key"), resp: getResponse("key", "1", 1, 1).EtcdResponse},
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnPutResponse(false, 2).EtcdResponse, failure: true},
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(false, 2).EtcdResponse, failure: true},
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(true, 2).EtcdResponse, failure: true},
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnPutResponse(true, 1).EtcdResponse, failure: true},
{req: txnRequestSingleOperation(compareRevision("key", 1), nil, putOperation("key", "2")), resp: txnEmptyResponse(true, 1).EtcdResponse},
{req: txnRequestSingleOperation(compareRevision("key", 2), nil, putOperation("key", "2")), resp: txnPutResponse(false, 2).EtcdResponse},
},
},
{
Expand Down
106 changes: 44 additions & 62 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,58 +189,20 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
})
}

func (h *AppendableHistory) AppendCompareRevisionAndDelete(key string, expectedRevision int64, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
request := compareRevisionAndDeleteRequest(key, expectedRevision)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
var deleted int64
if resp != nil && len(resp.Responses) > 0 {
deleted = resp.Responses[0].GetResponseDeleteRange().Deleted
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: compareRevisionAndDeleteResponse(resp.Succeeded, deleted, revision),
Return: end.Nanoseconds(),
})

}
func (h *AppendableHistory) AppendCompareRevisionAndPut(key string, expectedRevision int64, value string, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
request := compareRevisionAndPutRequest(key, expectedRevision, value)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
}
var revision int64
if resp != nil && resp.Header != nil {
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
Input: request,
Call: start.Nanoseconds(),
Output: compareRevisionAndPutResponse(resp.Succeeded, revision),
Return: end.Nanoseconds(),
})
}

func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, onSuccess []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, clientOnFailure []clientv3.Op, start, end time.Duration, resp *clientv3.TxnResponse, err error) {
conds := []EtcdCondition{}
for _, cmp := range cmp {
conds = append(conds, toEtcdCondition(cmp))
}
ops := []EtcdOperation{}
for _, op := range onSuccess {
ops = append(ops, toEtcdOperation(op))
modelOnSuccess := []EtcdOperation{}
for _, op := range clientOnSuccessOps {
modelOnSuccess = append(modelOnSuccess, toEtcdOperation(op))
}
modelOnFailure := []EtcdOperation{}
for _, op := range clientOnFailure {
modelOnFailure = append(modelOnFailure, toEtcdOperation(op))
}
request := txnRequest(conds, ops)
request := txnRequest(conds, modelOnSuccess, modelOnFailure)
if err != nil {
h.appendFailed(request, start.Nanoseconds(), err)
return
Expand Down Expand Up @@ -293,6 +255,7 @@ func toEtcdCondition(cmp clientv3.Cmp) (cond EtcdCondition) {
default:
panic(fmt.Sprintf("Compare not supported, target: %q, result: %q", cmp.Target, cmp.Result))
}
cond.ExpectedRevision = cmp.TargetUnion.(*etcdserverpb.Compare_ModRevision).ModRevision
return cond
}

Expand Down Expand Up @@ -447,32 +410,51 @@ func deleteResponse(deleted int64, revision int64) EtcdNonDeterministicResponse
return EtcdNonDeterministicResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{{Deleted: deleted}}}, Revision: revision}}
}

func compareRevisionAndDeleteRequest(key string, expectedRevision int64) EtcdRequest {
return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Delete, Key: key}})
}

func compareRevisionAndPutRequest(key string, expectedRevision int64, value string) EtcdRequest {
return txnRequest([]EtcdCondition{{Key: key, ExpectedRevision: expectedRevision}}, []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}})
return txnRequestSingleOperation(compareRevision(key, expectedRevision), putOperation(key, value), nil)
}

func compareRevisionAndPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse {
var result []EtcdOperationResult
if succeeded {
result = []EtcdOperationResult{{}}
return txnPutResponse(succeeded, revision)
}
return txnResponse(result, succeeded, revision)
return txnEmptyResponse(succeeded, revision)
}

func compareRevisionAndDeleteResponse(succeeded bool, deleted, revision int64) EtcdNonDeterministicResponse {
var result []EtcdOperationResult
if succeeded {
result = []EtcdOperationResult{{Deleted: deleted}}
func compareRevision(key string, expectedRevision int64) *EtcdCondition {
return &EtcdCondition{Key: key, ExpectedRevision: expectedRevision}
}

func putOperation(key, value string) *EtcdOperation {
return &EtcdOperation{Type: Put, Key: key, Value: ToValueOrHash(value)}
}

func txnRequestSingleOperation(cond *EtcdCondition, onSuccess, onFailure *EtcdOperation) EtcdRequest {
var conds []EtcdCondition
if cond != nil {
conds = []EtcdCondition{*cond}
}
return txnResponse(result, succeeded, revision)
var onSuccess2 []EtcdOperation
if onSuccess != nil {
onSuccess2 = []EtcdOperation{*onSuccess}
}
var onFailure2 []EtcdOperation
if onFailure != nil {
onFailure2 = []EtcdOperation{*onFailure}
}
return txnRequest(conds, onSuccess2, onFailure2)
}

func txnRequest(conds []EtcdCondition, onSuccess, onFailure []EtcdOperation) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conditions: conds, OperationsOnSuccess: onSuccess, OperationsOnFailure: onFailure}}
}

func txnPutResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse {
return txnResponse([]EtcdOperationResult{{}}, succeeded, revision)
}

func txnRequest(conds []EtcdCondition, onSuccess []EtcdOperation) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conditions: conds, OperationsOnSuccess: onSuccess}}
func txnEmptyResponse(succeeded bool, revision int64) EtcdNonDeterministicResponse {
return txnResponse([]EtcdOperationResult{}, succeeded, revision)
}

func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) EtcdNonDeterministicResponse {
Expand Down
49 changes: 7 additions & 42 deletions tests/robustness/traffic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,56 +131,21 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) error {
return nil
}

func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error {
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key))
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := txn.Commit()
returnTime := time.Since(c.baseTime)
c.operations.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err)
return err
}

func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error {
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value))
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := txn.Commit()
returnTime := time.Since(c.baseTime)
c.operations.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err)
return err
}

func (c *RecordingClient) compareRevisionTxn(ctx context.Context, key string, expectedRevision int64, op clientv3.Op) clientv3.Txn {
txn := c.client.Txn(ctx)
var cmp clientv3.Cmp
if expectedRevision == 0 {
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
} else {
cmp = clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)
}
return txn.If(
cmp,
).Then(
op,
)
}

func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []clientv3.Op) error {
func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) {
txn := c.client.Txn(ctx).If(
cmp...,
conditions...,
).Then(
ops...,
onSuccess...,
).Else(
onFailure...,
)
c.opMux.Lock()
defer c.opMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := txn.Commit()
returnTime := time.Since(c.baseTime)
c.operations.AppendTxn(cmp, ops, callTime, returnTime, resp, err)
return err
c.operations.AppendTxn(conditions, onSuccess, onFailure, callTime, returnTime, resp, err)
return resp, err
}

func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
Expand Down
Loading

0 comments on commit 4675e5c

Please sign in to comment.