Skip to content

Commit

Permalink
Change service GetIndex / AddIndex to return channel instead of array (
Browse files Browse the repository at this point in the history
…#1444)

* feat: yugabyte db impl

* feat: run yugabyte tests against a dockerized yugabyte

* fix: use out own yugabyte docker image

* fix: use yugabyte 2.17.2.0 docker image

* feat: piece doctor yugabyte impl

* fix: go mod tidy

* refactor: remove SetCarSize as its not longer being used

* refactor: remove functionality to mark index as errored (not being used)

* feat: implement delete commands

* refactor: consolidate test params

* feat: add lid yugabyte config

* fix: port map yugabyte postgres to standard port

* Fix yugabyte CI (#1433)

* fix: yugabyte tests in CI

* docker-compose.yml ; Dockerfile.test ; connect to `yugabyte` and not localhost

* add tag

* test lid

* make gen

* fixup

* move couchbase settings under build tag

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* wip: service GetIndex returns channel of records instead of array

* feat: return channel from AddIndex and GetIndex

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
  • Loading branch information
dirkmc and nonsense committed May 17, 2023
1 parent 1316ee5 commit 2b4de7c
Show file tree
Hide file tree
Showing 17 changed files with 545 additions and 169 deletions.
19 changes: 15 additions & 4 deletions cmd/migrate-lid/couch-to-yuga.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/filecoin-project/boostd-data/couchbase"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/yugabyte"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -211,16 +212,26 @@ func migrateLidToLidIndex(ctx context.Context, pieceCid cid.Cid, source StoreMig
}

// Load the index from the source store
records, err := source.GetIndex(ctx, pieceCid)
idx, err := source.GetIndex(ctx, pieceCid)
if err != nil {
return false, fmt.Errorf("loading index %s: %w", pieceCid, err)
}

var records []model.Record
for r := range idx {
if r.Error != nil {
return false, r.Error
}
records = append(records, r.Record)
}

// Add the index to the destination store
addStart := time.Now()
err = dest.AddIndex(ctx, pieceCid, records, true)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", pieceCid, err)
respch := dest.AddIndex(ctx, pieceCid, records, true)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", pieceCid, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
13 changes: 8 additions & 5 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boostd-data/ldb"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-address"
vfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -43,8 +44,8 @@ import (
type StoreMigrationApi interface {
Start(ctx context.Context) error
IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex(context.Context, cid.Cid) ([]model.Record, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error
GetIndex(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress
AddDealForPiece(ctx context.Context, pcid cid.Cid, info model.DealInfo) error
ListPieces(ctx context.Context) ([]cid.Cid, error)
GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error)
Expand Down Expand Up @@ -333,9 +334,11 @@ func migrateIndex(ctx context.Context, ipath idxPath, store StoreMigrationApi, f

// Add the index to the store
addStart := time.Now()
err = store.AddIndex(ctx, pieceCid, records, false)
if err != nil {
return false, fmt.Errorf("adding index %s to store: %w", ipath.path, err)
respch := store.AddIndex(ctx, pieceCid, records, false)
for resp := range respch {
if resp.Err != "" {
return false, fmt.Errorf("adding index %s to store: %s", ipath.path, err)
}
}
log.Debugw("AddIndex", "took", time.Since(addStart).String())

Expand Down
38 changes: 29 additions & 9 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-jsonrpc"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
Expand All @@ -18,10 +19,10 @@ var log = logger.Logger("boostd-data-client")
type Store struct {
client struct {
AddDealForPiece func(context.Context, cid.Cid, model.DealInfo) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) error
AddIndex func(context.Context, cid.Cid, []model.Record, bool) <-chan types.AddIndexProgress
IsIndexed func(ctx context.Context, pieceCid cid.Cid) (bool, error)
IsCompleteIndex func(ctx context.Context, pieceCid cid.Cid) (bool, error)
GetIndex func(context.Context, cid.Cid) ([]model.Record, error)
GetIndex func(context.Context, cid.Cid) (<-chan types.IndexRecord, error)
GetOffsetSize func(context.Context, cid.Cid, mh.Multihash) (*model.OffsetSize, error)
ListPieces func(ctx context.Context) ([]cid.Cid, error)
GetPieceMetadata func(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error)
Expand All @@ -37,16 +38,17 @@ type Store struct {
FlaggedPiecesList func(ctx context.Context, cursor *time.Time, offset int, limit int) ([]model.FlaggedPiece, error)
FlaggedPiecesCount func(ctx context.Context) (int, error)
}
closer jsonrpc.ClientCloser
closer jsonrpc.ClientCloser
dialOpts []jsonrpc.Option
}

func NewStore() *Store {
return &Store{}
func NewStore(dialOpts ...jsonrpc.Option) *Store {
return &Store{dialOpts: dialOpts}
}

func (s *Store) Dial(ctx context.Context, addr string) error {
var err error
s.closer, err = jsonrpc.NewClient(ctx, addr, "boostddata", &s.client, nil)
s.closer, err = jsonrpc.NewMergeClient(ctx, addr, "boostddata", []interface{}{&s.client}, nil, s.dialOpts...)
if err != nil {
return fmt.Errorf("dialing local index directory server: %w", err)
}
Expand All @@ -66,7 +68,10 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (index.Index, er
}

var records []index.Record
for _, r := range resp {
for r := range resp {
if r.Error != nil {
return nil, r.Error
}
records = append(records, index.Record{
Cid: r.Cid,
Offset: r.Offset,
Expand All @@ -90,7 +95,15 @@ func (s *Store) GetRecords(ctx context.Context, pieceCid cid.Cid) ([]model.Recor

log.Debugw("get-records", "piece-cid", pieceCid, "records", len(resp))

return resp, nil
var records []model.Record
for r := range resp {
if r.Error != nil {
return nil, r.Error
}
records = append(records, r.Record)
}

return records, nil
}

func (s *Store) GetPieceMetadata(ctx context.Context, pieceCid cid.Cid) (model.Metadata, error) {
Expand All @@ -112,7 +125,14 @@ func (s *Store) AddDealForPiece(ctx context.Context, pieceCid cid.Cid, dealInfo
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
log.Debugw("add-index", "piece-cid", pieceCid, "records", len(records))

return s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
respch := s.client.AddIndex(ctx, pieceCid, records, isCompleteIndex)
for resp := range respch {
if resp.Err != "" {
return fmt.Errorf("add index with piece cid %s: %s", pieceCid, resp.Err)
}
//fmt.Printf("%s: Percent complete: %f%%\n", time.Now(), resp.Progress*100)
}
return nil
}

func (s *Store) IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
Expand Down
54 changes: 35 additions & 19 deletions extern/boostd-data/couchbase/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *Store) PiecesContainingMultihash(ctx context.Context, m mh.Multihash) (
return pcids, normalizeMultihashError(m, err)
}

func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) ([]model.Record, error) {
func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan types.IndexRecord, error) {
log.Debugw("handle.get-index", "pieceCid", pieceCid)

ctx, span := tracing.Tracer.Start(ctx, "store.get_index")
Expand All @@ -152,7 +152,13 @@ func (s *Store) GetIndex(ctx context.Context, pieceCid cid.Cid) ([]model.Record,

log.Debugw("handle.get-index.records", "len(records)", len(records))

return records, nil
recs := make(chan types.IndexRecord, len(records))
for _, r := range records {
recs <- types.IndexRecord{Record: r}
}
close(recs)

return recs, nil
}

func (s *Store) IsIndexed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
Expand Down Expand Up @@ -181,7 +187,7 @@ func (s *Store) IsCompleteIndex(ctx context.Context, pieceCid cid.Cid) (bool, er
return md.CompleteIndex, nil
}

func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) error {
func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.Record, isCompleteIndex bool) <-chan types.AddIndexProgress {
log.Debugw("handle.add-index", "records", len(records))

ctx, span := tracing.Tracer.Start(ctx, "store.add_index")
Expand All @@ -197,22 +203,32 @@ func (s *Store) AddIndex(ctx context.Context, pieceCid cid.Cid, records []model.
mhs = append(mhs, r.Cid.Hash())
}

setMhStart := time.Now()
err := s.db.SetMultihashesToPieceCid(ctx, mhs, pieceCid)
if err != nil {
return fmt.Errorf("failed to add entry from mh to pieceCid: %w", err)
}
log.Debugw("handled.add-index SetMultihashesToPieceCid", "took", time.Since(setMhStart).String())

// Add a mapping from piece cid -> offset / size of each block so that
// clients can get the block info for all blocks in a piece
addOffsetsStart := time.Now()
if err := s.db.AddIndexRecords(ctx, pieceCid, records); err != nil {
return err
}
log.Debugw("handled.add-index AddIndexRecords", "took", time.Since(addOffsetsStart).String())

return s.db.MarkIndexingComplete(ctx, pieceCid, len(records), isCompleteIndex)
progress := make(chan types.AddIndexProgress, 1)
go func() {
defer close(progress)
progress <- types.AddIndexProgress{Progress: 0}

setMhStart := time.Now()
err := s.db.SetMultihashesToPieceCid(ctx, mhs, pieceCid)
if err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
log.Debugw("handled.add-index SetMultihashesToPieceCid", "took", time.Since(setMhStart).String())
progress <- types.AddIndexProgress{Progress: 0.5}

// Add a mapping from piece cid -> offset / size of each block so that
// clients can get the block info for all blocks in a piece
addOffsetsStart := time.Now()
if err := s.db.AddIndexRecords(ctx, pieceCid, records); err != nil {
progress <- types.AddIndexProgress{Err: err.Error()}
return
}
log.Debugw("handled.add-index AddIndexRecords", "took", time.Since(addOffsetsStart).String())
progress <- types.AddIndexProgress{Progress: 1}
}()

return progress
}

func (s *Store) IndexedAt(ctx context.Context, pieceCid cid.Cid) (time.Time, error) {
Expand Down
Loading

0 comments on commit 2b4de7c

Please sign in to comment.