Skip to content

Commit

Permalink
test(graphsync): add additional tests
Browse files Browse the repository at this point in the history
add additional tests, remove commented out code, improve migrations
  • Loading branch information
hannahhoward committed Jul 21, 2022
1 parent 6c93355 commit db0c40d
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 65 deletions.
2 changes: 1 addition & 1 deletion channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func New(ds datastore.Batching,
StateEntryFuncs: ChannelStateEntryFuncs,
Notifier: c.dispatch,
FinalityStates: ChannelFinalityStates,
}, channelMigrations, versioning.VersionKey("2"))
}, channelMigrations, versioning.VersionKey("3"))
if err != nil {
return nil, err
}
Expand Down
154 changes: 154 additions & 0 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package channels_test

import (
"bytes"
"context"
"errors"
"math/rand"
"testing"
"time"

versionedds "github.com/filecoin-project/go-ds-versioning/pkg/datastore"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
Expand All @@ -16,7 +20,10 @@ import (

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal/migrations"
"github.com/filecoin-project/go-data-transfer/v2/testutil"
versioning "github.com/filecoin-project/go-ds-versioning/pkg"
)

func TestChannels(t *testing.T) {
Expand Down Expand Up @@ -392,6 +399,153 @@ func TestIsChannelCleaningUp(t *testing.T) {
require.False(t, channels.IsChannelCleaningUp(datatransfer.Cancelled))
}

func TestMigrations(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
}
numChannels := 5
transferIDs := make([]datatransfer.TransferID, numChannels)
initiators := make([]peer.ID, numChannels)
responders := make([]peer.ID, numChannels)
baseCids := make([]cid.Cid, numChannels)

totalSizes := make([]uint64, numChannels)
sents := make([]uint64, numChannels)
receiveds := make([]uint64, numChannels)

messages := make([]string, numChannels)
vouchers := make([]datatransfer.TypedVoucher, numChannels)
voucherResults := make([]datatransfer.TypedVoucher, numChannels)
sentIndex := make([]int64, numChannels)
receivedIndex := make([]int64, numChannels)
queuedIndex := make([]int64, numChannels)
allSelector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node()
selfPeer := testutil.GeneratePeers(1)[0]

list, err := migrations.GetChannelStateMigrations(selfPeer)
require.NoError(t, err)
vds, up := versionedds.NewVersionedDatastore(ds, list, versioning.VersionKey("2"))
require.NoError(t, up(ctx))

initialStatuses := []datatransfer.Status{
datatransfer.Requested,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.Ongoing,
}
for i := 0; i < numChannels; i++ {
transferIDs[i] = datatransfer.TransferID(rand.Uint64())
initiators[i] = testutil.GeneratePeers(1)[0]
responders[i] = testutil.GeneratePeers(1)[0]
baseCids[i] = testutil.GenerateCids(1)[0]
totalSizes[i] = rand.Uint64()
sents[i] = rand.Uint64()
receiveds[i] = rand.Uint64()
messages[i] = string(testutil.RandomBytes(20))
vouchers[i] = testutil.NewTestTypedVoucher()
voucherResults[i] = testutil.NewTestTypedVoucher()
sentIndex[i] = rand.Int63()
receivedIndex[i] = rand.Int63()
queuedIndex[i] = rand.Int63()
channel := migrations.ChannelStateV2{
TransferID: transferIDs[i],
Initiator: initiators[i],
Responder: responders[i],
BaseCid: baseCids[i],
Selector: internal.CborGenCompatibleNode{
Node: allSelector,
},
Sender: initiators[i],
Recipient: responders[i],
TotalSize: totalSizes[i],
Status: initialStatuses[i],
Sent: sents[i],
Received: receiveds[i],
Message: messages[i],
Vouchers: []internal.EncodedVoucher{
{
Type: vouchers[i].Type,
Voucher: internal.CborGenCompatibleNode{
Node: vouchers[i].Voucher,
},
},
},
VoucherResults: []internal.EncodedVoucherResult{
{
Type: voucherResults[i].Type,
VoucherResult: internal.CborGenCompatibleNode{
Node: voucherResults[i].Voucher,
},
},
},
SentBlocksTotal: sentIndex[i],
ReceivedBlocksTotal: receivedIndex[i],
QueuedBlocksTotal: queuedIndex[i],
SelfPeer: selfPeer,
}
buf := new(bytes.Buffer)
err = channel.MarshalCBOR(buf)
require.NoError(t, err)
err = vds.Put(ctx, datastore.NewKey(datatransfer.ChannelID{
Initiator: initiators[i],
Responder: responders[i],
ID: transferIDs[i],
}.String()), buf.Bytes())
require.NoError(t, err)
}

channelList, err := channels.New(ds, notifier, &fakeEnv{}, selfPeer)
require.NoError(t, err)
err = channelList.Start(ctx)
require.NoError(t, err)

expectedStatuses := []datatransfer.Status{
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.Ongoing,
datatransfer.Ongoing,
datatransfer.Ongoing,
}

expectedInitiatorPaused := []bool{false, true, false, true, false}
expectedResponderPaused := []bool{false, false, true, true, false}
for i := 0; i < numChannels; i++ {

channel, err := channelList.GetByID(ctx, datatransfer.ChannelID{
Initiator: initiators[i],
Responder: responders[i],
ID: transferIDs[i],
})
require.NoError(t, err)
require.Equal(t, selfPeer, channel.SelfPeer())
require.Equal(t, transferIDs[i], channel.TransferID())
require.Equal(t, baseCids[i], channel.BaseCID())
require.Equal(t, allSelector, channel.Selector())
require.Equal(t, initiators[i], channel.Sender())
require.Equal(t, responders[i], channel.Recipient())
require.Equal(t, totalSizes[i], channel.TotalSize())
require.Equal(t, sents[i], channel.Sent())
require.Equal(t, receiveds[i], channel.Received())
require.Equal(t, messages[i], channel.Message())
require.Equal(t, vouchers[i], channel.LastVoucher())
require.Equal(t, voucherResults[i], channel.LastVoucherResult())
require.Equal(t, expectedStatuses[i], channel.Status())
require.Equal(t, expectedInitiatorPaused[i], channel.InitiatorPaused())
require.Equal(t, expectedResponderPaused[i], channel.ResponderPaused())
require.Equal(t, basicnode.NewInt(sentIndex[i]), channel.SentIndex())
require.Equal(t, basicnode.NewInt(receivedIndex[i]), channel.ReceivedIndex())
require.Equal(t, basicnode.NewInt(queuedIndex[i]), channel.QueuedIndex())

}
}

type event struct {
event datatransfer.Event
state datatransfer.ChannelState
Expand Down
10 changes: 9 additions & 1 deletion channels/internal/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.Channel
sentIndex := basicnode.NewInt(oldChannelState.SentBlocksTotal)
queuedIndex := basicnode.NewInt(oldChannelState.QueuedBlocksTotal)

responderPaused := oldChannelState.Status == datatransfer.ResponderPaused || oldChannelState.Status == datatransfer.BothPaused
initiatorPaused := oldChannelState.Status == datatransfer.InitiatorPaused || oldChannelState.Status == datatransfer.BothPaused
newStatus := oldChannelState.Status
if newStatus == datatransfer.ResponderPaused || newStatus == datatransfer.InitiatorPaused || newStatus == datatransfer.BothPaused {
newStatus = datatransfer.Ongoing
}
return &internal.ChannelState{
SelfPeer: oldChannelState.SelfPeer,
TransferID: oldChannelState.TransferID,
Expand All @@ -86,7 +92,7 @@ func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.Channel
Sender: oldChannelState.Sender,
Recipient: oldChannelState.Recipient,
TotalSize: oldChannelState.TotalSize,
Status: oldChannelState.Status,
Status: newStatus,
Queued: oldChannelState.Queued,
Sent: oldChannelState.Sent,
Received: oldChannelState.Received,
Expand All @@ -98,6 +104,8 @@ func MigrateChannelState2To3(oldChannelState *ChannelStateV2) (*internal.Channel
QueuedIndex: internal.CborGenCompatibleNode{Node: queuedIndex},
DataLimit: oldChannelState.DataLimit,
RequiresFinalization: oldChannelState.RequiresFinalization,
InitiatorPaused: initiatorPaused,
ResponderPaused: responderPaused,
Stages: oldChannelState.Stages,
}, nil
}
Expand Down
16 changes: 0 additions & 16 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,6 @@ func (m *manager) processTransferEvent(ctx context.Context, chid datatransfer.Ch
}
msg := message.UpdateResponse(chid.ID, true)
return m.transport.SendMessage(ctx, chid, msg)
/*case datatransfer.TransportReceivedVoucherRequest:
voucher, err := evt.Request.TypedVoucher()
if err != nil {
return err
}
return m.channels.NewVoucher(chid, voucher)
case datatransfer.TransportReceivedUpdateRequest:
if evt.Request.IsPaused() {
return m.pauseOther(chid)
}
return m.resumeOther(chid)
case datatransfer.TransportReceivedCancelRequest:
log.Infof("channel %s: received cancel request, cleaning up channel", chid)
return m.channels.Cancel(chid)
case datatransfer.TransportReceivedResponse:
return m.receiveResponse(chid, evt.Response)*/
case datatransfer.TransportTransferCancelled:
log.Warnf("channel %+v was cancelled: %s", chid, evt.ErrorMessage)
return m.channels.RequestCancelled(chid, errors.New(evt.ErrorMessage))
Expand Down
54 changes: 17 additions & 37 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,6 @@ type TransportQueuedData struct {
// TransportReachedDataLimit occurs when a channel hits a previously set data limit
type TransportReachedDataLimit struct{}

/*
type TransportReceivedVoucherRequest struct {
Request Request
}
type TransportReceivedUpdateRequest struct {
Request Request
}
type TransportReceivedCancelRequest struct {
Request Request
}
// TransportReceivedResponse occurs when we receive a response to a request
type TransportReceivedResponse struct {
Response Response
}
*/

// TransportTransferCancelled occurs when a request we opened (with the given channel Id) to
// receive data is cancelled by us.
type TransportTransferCancelled struct {
Expand Down Expand Up @@ -110,13 +91,6 @@ type EventsHandler interface {
// ChannelState queries for the current channel state
ChannelState(ctx context.Context, chid ChannelID) (ChannelState, error)

// OnRequestReceived occurs when we receive a request for the given channel ID
// return values are a message to send an error if the transport should be closed
OnRequestReceived(chid ChannelID, msg Request) (Response, error)

// OnRequestReceived occurs when we receive a response to a request
OnResponseReceived(chid ChannelID, msg Response) error

// OnTransportEvent is dispatched when an event occurs on the transport
// It MAY be dispatched asynchronously by the transport to the time the
// event occurs
Expand All @@ -126,6 +100,17 @@ type EventsHandler interface {
// have a synchronous return
OnTransportEvent(chid ChannelID, event TransportEvent)

// OnRequestReceived occurs when we receive a request for the given channel ID
// return values are a message to send an error if the transport should be closed
// TODO: in a future improvement, a received request should become a
// just TransportEvent, and should be handled asynchronously
OnRequestReceived(chid ChannelID, msg Request) (Response, error)

// OnRequestReceived occurs when we receive a response to a request
// TODO: in a future improvement, a received response should become a
// just TransportEvent, and should be handled asynchronously
OnResponseReceived(chid ChannelID, msg Response) error

// OnContextAugment allows the transport to attach data transfer tracing information
// to its local context, in order to create a hierarchical trace
OnContextAugment(chid ChannelID) func(context.Context) context.Context
Expand Down Expand Up @@ -192,17 +177,12 @@ type TransportCapabilities struct {
Pausable bool
}

func (TransportOpenedChannel) transportEvent() {}
func (TransportInitiatedTransfer) transportEvent() {}
func (TransportReceivedData) transportEvent() {}
func (TransportSentData) transportEvent() {}
func (TransportQueuedData) transportEvent() {}
func (TransportReachedDataLimit) transportEvent() {}

/*func (TransportReceivedVoucherRequest) transportEvent() {}
func (TransportReceivedUpdateRequest) transportEvent() {}
func (TransportReceivedCancelRequest) transportEvent() {}
func (TransportReceivedResponse) transportEvent() {}*/
func (TransportOpenedChannel) transportEvent() {}
func (TransportInitiatedTransfer) transportEvent() {}
func (TransportReceivedData) transportEvent() {}
func (TransportSentData) transportEvent() {}
func (TransportQueuedData) transportEvent() {}
func (TransportReachedDataLimit) transportEvent() {}
func (TransportTransferCancelled) transportEvent() {}
func (TransportErrorSendingData) transportEvent() {}
func (TransportErrorReceivingData) transportEvent() {}
Expand Down
Loading

0 comments on commit db0c40d

Please sign in to comment.