Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL resolvers for LID #1494

Merged
merged 3 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/boost/node/modules/dtypes"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/boost/retrievalmarket/rtvllog"
"github.com/filecoin-project/boost/sectorstatemgr"
"github.com/filecoin-project/boost/storagemanager"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/sealingpipeline"
Expand Down Expand Up @@ -62,14 +63,15 @@ type resolver struct {
legacyProv gfm_storagemarket.StorageProvider
legacyDT dtypes.ProviderDataTransfer
ps piecestore.PieceStore
ssm *sectorstatemgr.SectorStateMgr
sa retrievalmarket.SectorAccessor
piecedirectory *piecedirectory.PieceDirectory
publisher *storageadapter.DealPublisher
spApi sealingpipeline.API
fullNode v1api.FullNode
}

func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode) *resolver {
func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr) *resolver {
return &resolver{
ctx: ctx,
cfg: cfg,
Expand All @@ -91,6 +93,7 @@ func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo
publisher: publisher,
spApi: spApi,
fullNode: fullNode,
ssm: ssm,
}
}

Expand Down
91 changes: 91 additions & 0 deletions gql/resolver_lid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package gql

import (
"context"
"time"

"github.com/filecoin-project/boost/db"
gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/sectorstatemgr"
)

type dealData struct {
Indexed gqltypes.Uint64
FlaggedUnsealed gqltypes.Uint64
FlaggedSealed gqltypes.Uint64
}

type pieces struct {
Indexed int32
FlaggedUnsealed int32
FlaggedSealed int32
}

type sectorUnsealedCopies struct {
Unsealed int32
Sealed int32
}

type sectorProvingState struct {
Active int32
Inactive int32
}

type lidState struct {
DealData dealData
Pieces pieces
SectorUnsealedCopies sectorUnsealedCopies
SectorProvingState sectorProvingState
FlaggedPieces int32
}

// query: lid: [LID]
func (r *resolver) LID(ctx context.Context) (*lidState, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest creating a different resolver method for each of these subsections (one for DealData, one for Pieces etc).

That way each one is loaded separately, and if one of them fails or is slow it doesn't affect the others.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should not fail, as all the data is coming from Boost and should be cached by that point - I'd rather we make 1 API call for a few int.

var lu *sectorstatemgr.SectorStateUpdates
for lu == nil {
r.ssm.LatestUpdateMu.Lock()
lu = r.ssm.LatestUpdate
r.ssm.LatestUpdateMu.Unlock()
if lu == nil {
time.Sleep(2 * time.Second)
}
}

var sealed, unsealed int32
for _, s := range lu.SectorStates { // TODO: consider adding this data directly in SSM
if s == db.SealStateUnsealed {
unsealed++
} else if s == db.SealStateSealed {
sealed++
}
}

fp, err := r.piecedirectory.FlaggedPiecesCount(ctx)
if err != nil {
return nil, err
}

ls := &lidState{
FlaggedPieces: int32(fp),
DealData: dealData{
Indexed: gqltypes.Uint64(12094627905536),
FlaggedUnsealed: gqltypes.Uint64(1094627905536),
FlaggedSealed: gqltypes.Uint64(18094627905536),
},
Pieces: pieces{
Indexed: 360,
FlaggedUnsealed: 33,
FlaggedSealed: 480,
},
SectorUnsealedCopies: sectorUnsealedCopies{
Sealed: sealed,
Unsealed: unsealed,
},
SectorProvingState: sectorProvingState{
Active: int32(len(lu.ActiveSectors)),
Inactive: int32(len(lu.SectorStates) - len(lu.ActiveSectors)), // TODO: add an explicit InactiveSectors in ssm
},
}

return ls, nil
}
33 changes: 33 additions & 0 deletions gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,36 @@ type SealingPipeline {
Workers: [Worker]!
}

type DealData {
Indexed: Uint64!
FlaggedUnsealed: Uint64!
FlaggedSealed: Uint64!
}

type Pieces {
Indexed: Int!
FlaggedUnsealed: Int!
FlaggedSealed: Int!
}

type SectorUnsealedCopies {
Unsealed: Int!
Sealed: Int!
}

type SectorProvingState {
Active: Int!
Inactive: Int!
}

type LID {
DealData: DealData!
Pieces: Pieces!
SectorUnsealedCopies: SectorUnsealedCopies!
SectorProvingState: SectorProvingState!
FlaggedPieces: Int!
}

type FundsEscrow {
Available: BigInt!
Locked: BigInt!
Expand Down Expand Up @@ -476,6 +506,9 @@ type RootQuery {
"""Get sealing pipeline state"""
sealingpipeline: SealingPipeline!

"""Get LID state"""
lid: LID!

"""Get funds available"""
funds: Funds!

Expand Down
7 changes: 4 additions & 3 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/boost/piecedirectory"
brm "github.com/filecoin-project/boost/retrievalmarket/lib"
"github.com/filecoin-project/boost/retrievalmarket/rtvllog"
"github.com/filecoin-project/boost/sectorstatemgr"
"github.com/filecoin-project/boost/storagemanager"
"github.com/filecoin-project/boost/storagemarket"
"github.com/filecoin-project/boost/storagemarket/logs"
Expand Down Expand Up @@ -628,14 +629,14 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(
}
}

func NewGraphqlServer(cfg *config.Boost) func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter) *gql.Server {
func NewGraphqlServer(cfg *config.Boost) func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr) *gql.Server {
return func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, prov *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager,
storageMgr *storagemanager.StorageManager, publisher *storageadapter.DealPublisher, spApi sealingpipeline.API,
legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer,
ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter) *gql.Server {
ps dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, fullNode v1api.FullNode, bg gql.BlockGetter, ssm *sectorstatemgr.SectorStateMgr) *gql.Server {

resolverCtx, cancel := context.WithCancel(context.Background())
resolver := gql.NewResolver(resolverCtx, cfg, r, h, dealsDB, logsDB, retDB, plDB, fundsDB, fundMgr, storageMgr, spApi, prov, legacyProv, legacyDT, ps, sa, piecedirectory, publisher, fullNode)
resolver := gql.NewResolver(resolverCtx, cfg, r, h, dealsDB, logsDB, retDB, plDB, fundsDB, fundMgr, storageMgr, spApi, prov, legacyProv, legacyDT, ps, sa, piecedirectory, publisher, fullNode, ssm)
server := gql.NewServer(resolver, bg)

lc.Append(fx.Hook{
Expand Down
32 changes: 3 additions & 29 deletions piecedirectory/doctor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/filecoin-project/boost/db"
Expand All @@ -27,9 +26,6 @@ var doclog = logging.Logger("piecedoc")
type Doctor struct {
store *bdclient.Store
ssm *sectorstatemgr.SectorStateMgr

latestUpdateMu sync.Mutex
latestUpdate *sectorstatemgr.SectorStateUpdates
}

func NewDoctor(store *bdclient.Store, ssm *sectorstatemgr.SectorStateMgr) *Doctor {
Expand All @@ -42,28 +38,6 @@ const avgCheckInterval = 30 * time.Second
func (d *Doctor) Run(ctx context.Context) {
doclog.Info("piece doctor: running")

go func() {
sub := d.ssm.PubSub.Subscribe()

for {
select {
case u, ok := <-sub:
if !ok {
log.Debugw("state updates subscription closed")
return
}
log.Debugw("got state updates from SectorStateMgr", "len(u.updates)", len(u.Updates), "len(u.active)", len(u.ActiveSectors), "u.updatedAt", u.UpdatedAt)

d.latestUpdateMu.Lock()
d.latestUpdate = u
d.latestUpdateMu.Unlock()

case <-ctx.Done():
return
}
}
}()

timer := time.NewTimer(0)
defer timer.Stop()

Expand All @@ -76,9 +50,9 @@ func (d *Doctor) Run(ctx context.Context) {

err := func() error {
var lu *sectorstatemgr.SectorStateUpdates
d.latestUpdateMu.Lock()
lu = d.latestUpdate
d.latestUpdateMu.Unlock()
d.ssm.LatestUpdateMu.Lock()
lu = d.ssm.LatestUpdate
d.ssm.LatestUpdateMu.Unlock()
if lu == nil {
doclog.Warn("sector state manager not yet updated")
return nil
Expand Down
Loading