Skip to content

feat: implement UpdateItem operation and support for TransactWriteItems #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
a148f37
Implement UpdateItem operation and support for TransactWriteItems (fi…
devin-ai-integration[bot] May 7, 2025
289900f
feat: implement UpdateItem operation and support for TransactWriteItems
devin-ai-integration[bot] May 7, 2025
db01e31
fix: limit retries in optimistic lock concurrency test
devin-ai-integration[bot] May 7, 2025
2eda1c2
fix: skip table creation in CI environment
devin-ai-integration[bot] May 7, 2025
9a84415
fix: revert CI skip for TestNewClientLocalEndpoint
devin-ai-integration[bot] May 7, 2025
8117b1e
fix: add retry logic for table creation in TestNewClientLocalEndpoint
devin-ai-integration[bot] May 7, 2025
cd7f6fe
fix: remove silent error handling in WithUpdateItem
devin-ai-integration[bot] May 7, 2025
521045b
fix: implement panic for option errors in WithUpdateItem
devin-ai-integration[bot] May 7, 2025
8fa755d
refactor: move TestTransactWithUpdateItem to existing table-driven te…
devin-ai-integration[bot] May 7, 2025
680e0e4
fix: remove unused import in updateitem_test.go
devin-ai-integration[bot] May 7, 2025
9227c78
fix: improve optimistic lock concurrency test with better retry handling
devin-ai-integration[bot] May 7, 2025
2f00d0f
fix: replace IN operator with OR in query condition
devin-ai-integration[bot] May 7, 2025
dcdf70c
fix: update test to use proper DynamoDB query pattern
devin-ai-integration[bot] May 7, 2025
d4fe429
fix: increase retry count and delay in optimistic lock test
devin-ai-integration[bot] May 7, 2025
1b5063a
Merge branch 'main' into devin/1746585260-implement-updateitem
shidil May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type WriteAPI interface {
PutItem(ctx context.Context, pk, sk Attribute, item interface{}, opt ...PutOption) error
DeleteItem(ctx context.Context, pk, sk string) error
BatchDeleteItems(ctx context.Context, input []AttributeRecord) []AttributeRecord
UpdateItem(ctx context.Context, pk, sk Attribute, fields map[string]Attribute, opt ...UpdateOption) error
}

type TransactionAPI interface {
Expand Down
17 changes: 14 additions & 3 deletions tests/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"testing"
"time"

"github.com/oolio-group/dynago"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -101,8 +102,18 @@ func TestNewClientLocalEndpoint(t *testing.T) {
t.Fatalf("expected configuration to succeed, got %s", err)
}

err = createTestTable(table)
if err != nil {
t.Fatalf("expected create table on local table to succeed, got %s", err)
maxRetries := 5
for i := 0; i < maxRetries; i++ {
err = createTestTable(table)
if err == nil {
break
}

if i == maxRetries-1 {
t.Fatalf("failed to create table after %d attempts: %s", maxRetries, err)
}

t.Logf("Table creation attempt %d failed: %s. Retrying after 1 second...", i+1, err)
time.Sleep(1 * time.Second)
}
}
40 changes: 40 additions & 0 deletions tests/transact_items_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,46 @@ func TestTransactItems(t *testing.T) {
},
},
},
{
title: "update multiple items with WithUpdateItem",
condition: "pk = :pk AND begins_with(sk, :sk)",
keys: map[string]types.AttributeValue{
":pk": &types.AttributeValueMemberS{Value: "terminal"},
":sk": &types.AttributeValueMemberS{Value: "merchant"},
},
newItems: []Terminal{
{
Id: "2",
Pk: "terminal",
Sk: "merchant1",
},
{
Id: "3",
Pk: "terminal",
Sk: "merchant2",
},
},
operations: []types.TransactWriteItem{
table.WithUpdateItem("terminal", "merchant1", map[string]dynago.Attribute{
"Id": dynago.StringValue("2-updated"),
}),
table.WithUpdateItem("terminal", "merchant2", map[string]dynago.Attribute{
"Id": dynago.StringValue("3-updated"),
}),
},
expected: []Terminal{
{
Id: "2-updated",
Pk: "terminal",
Sk: "merchant1",
},
{
Id: "3-updated",
Pk: "terminal",
Sk: "merchant2",
},
},
},
}
for _, tc := range testCases {
t.Run(tc.title, func(t *testing.T) {
Expand Down
222 changes: 222 additions & 0 deletions tests/updateitem_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package tests

import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/oolio-group/dynago"
)

type Account struct {
ID string
Balance int
Version uint
Status string

Pk string
Sk string
}

func TestUpdateItem(t *testing.T) {
table := prepareTable(t, dynamoEndpoint, "update_test")
testCases := []struct {
title string
item Account
updates map[string]dynago.Attribute
options []dynago.UpdateOption
expected Account
expectedErr error
}{
{
title: "update fields success",
item: Account{
ID: "1",
Balance: 100,
Status: "active",
Pk: "account_1",
Sk: "account_1",
},
updates: map[string]dynago.Attribute{
"Balance": dynago.NumberValue(200),
"Status": dynago.StringValue("inactive"),
},
options: []dynago.UpdateOption{},
expected: Account{
ID: "1",
Balance: 200,
Status: "inactive",
Pk: "account_1",
Sk: "account_1",
},
},
{
title: "optimistic lock success",
item: Account{
ID: "2",
Balance: 100,
Version: 1,
Pk: "account_2",
Sk: "account_2",
},
updates: map[string]dynago.Attribute{
"Balance": dynago.NumberValue(300),
},
options: []dynago.UpdateOption{
dynago.WithOptimisticLockForUpdate("Version", 1),
},
expected: Account{
ID: "2",
Balance: 300,
Version: 2,
Pk: "account_2",
Sk: "account_2",
},
},
{
title: "conditional update success",
item: Account{
ID: "3",
Balance: 100,
Status: "active",
Pk: "account_3",
Sk: "account_3",
},
updates: map[string]dynago.Attribute{
"Status": dynago.StringValue("inactive"),
},
options: []dynago.UpdateOption{
dynago.WithConditionalUpdate("attribute_exists(Balance)"),
},
expected: Account{
ID: "3",
Balance: 100,
Status: "inactive",
Pk: "account_3",
Sk: "account_3",
},
},
{
title: "conditional update failure",
item: Account{
ID: "4",
Balance: 100,
Pk: "account_4",
Sk: "account_4",
},
updates: map[string]dynago.Attribute{
"Status": dynago.StringValue("inactive"),
},
options: []dynago.UpdateOption{
dynago.WithConditionalUpdate("attribute_exists(NonExistentField)"),
},
expectedErr: fmt.Errorf("ConditionalCheckFailedException"),
},
}

for _, tc := range testCases {
t.Run(tc.title, func(t *testing.T) {
t.Helper()
ctx := context.TODO()

pk := dynago.StringValue(tc.item.Pk)
sk := dynago.StringValue(tc.item.Sk)
err := table.PutItem(ctx, pk, sk, &tc.item)
if err != nil {
t.Fatalf("unexpected error on initial put: %s", err)
}

err = table.UpdateItem(ctx, pk, sk, tc.updates, tc.options...)
if err != nil {
if tc.expectedErr == nil {
t.Fatalf("unexpected error: %s", err)
}
if !strings.Contains(err.Error(), tc.expectedErr.Error()) {
t.Fatalf("expected op to fail with %s; got %s", tc.expectedErr, err)
}
return
}

var out Account
err, found := table.GetItem(ctx, pk, sk, &out)
if err != nil {
t.Fatalf("unexpected error on get: %s", err)
}
if !found {
t.Errorf("expected to find item with pk %s and sk %s", tc.item.Pk, tc.item.Sk)
}
if !reflect.DeepEqual(tc.expected, out) {
t.Errorf("expected query to return %v; got %v", tc.expected, out)
}
})
}
}

func TestUpdateItemOptimisticLockConcurrency(t *testing.T) {
table := prepareTable(t, dynamoEndpoint, "update_optimistic_test")
account := Account{ID: "123", Balance: 0, Version: 0, Pk: "123", Sk: "123"}
ctx := context.Background()
pk := dynago.StringValue("123")
err := table.PutItem(ctx, pk, pk, account)
if err != nil {
t.Fatalf("unexpected error %s", err)
return
}

update := func() error {
var acc Account
err, _ := table.GetItem(ctx, pk, pk, &acc)
if err != nil {
return err
}

updates := map[string]dynago.Attribute{
"Balance": dynago.NumberValue(int64(acc.Balance + 100)),
}

return table.UpdateItem(ctx, pk, pk, updates, dynago.WithOptimisticLockForUpdate("Version", acc.Version))
}

var wg sync.WaitGroup
successCount := int32(0)
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
maxRetries := 10
for i := 0; i < maxRetries; i++ {
err := update()
if err == nil {
atomic.AddInt32(&successCount, 1)
return
}
if strings.Contains(err.Error(), "ConditionalCheckFailedException") {
time.Sleep(100 * time.Millisecond) // Longer delay before retry
continue
}
t.Errorf("Unexpected error: %v", err)
return
}
t.Logf("Max retries reached, continuing")
}()
}
wg.Wait()

var acc Account
err, _ = table.GetItem(ctx, pk, pk, &acc)
if err != nil {
t.Fatalf("unexpected error %s", err)
return
}
if acc.Balance != 1000 {
t.Errorf("expected account balance to be 1000 after 10 increments of 100; got %d", acc.Balance)
}
if acc.Version != 10 {
t.Errorf("expected account version to be 10 after 10 updates; got %d", acc.Version)
}
}
48 changes: 48 additions & 0 deletions transaction_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package dynago

import (
"context"
"fmt"
"log"
"strings"

"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
Expand Down Expand Up @@ -52,3 +54,49 @@ func (t *Client) TransactItems(ctx context.Context, input ...types.TransactWrite
})
return err
}

func (t *Client) WithUpdateItem(pk string, sk string, updates map[string]Attribute, opts ...UpdateOption) types.TransactWriteItem {
var setExpressions []string
expressionAttributeNames := make(map[string]string)
expressionAttributeValues := make(map[string]Attribute)

for key, value := range updates {
attrName := fmt.Sprintf("#%s", key)
attrValue := fmt.Sprintf(":%s", key)

setExpressions = append(setExpressions, fmt.Sprintf("%s = %s", attrName, attrValue))
expressionAttributeNames[attrName] = key
expressionAttributeValues[attrValue] = value
}

updateExpression := fmt.Sprintf("SET %s", strings.Join(setExpressions, ", "))

input := &dynamodb.UpdateItemInput{
TableName: &t.TableName,
Key: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"sk": &types.AttributeValueMemberS{Value: sk},
},
UpdateExpression: &updateExpression,
ExpressionAttributeNames: expressionAttributeNames,
ExpressionAttributeValues: expressionAttributeValues,
}

for _, opt := range opts {
err := opt(input)
if err != nil {
panic(fmt.Sprintf("Failed to apply update option: %v", err))
}
}

return types.TransactWriteItem{
Update: &types.Update{
TableName: input.TableName,
Key: input.Key,
UpdateExpression: input.UpdateExpression,
ConditionExpression: input.ConditionExpression,
ExpressionAttributeNames: input.ExpressionAttributeNames,
ExpressionAttributeValues: input.ExpressionAttributeValues,
},
}
}
Loading