Skip to content

Commit

Permalink
Add DA Check For Data Columns (#13938)
Browse files Browse the repository at this point in the history
* Add new DA check

* Exit early in the event no commitments exist.

* Gazelle

* Fix Mock Broadcaster

* Fix Test Setup

* Update beacon-chain/blockchain/process_block.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Manu's Review

* Fix Build

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
  • Loading branch information
nisdas and nalepae committed Aug 14, 2024
1 parent 91584d4 commit 807c157
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 19 deletions.
4 changes: 3 additions & 1 deletion beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ go_library(
"receive_attestation.go",
"receive_blob.go",
"receive_block.go",
"receive_sidecar.go",
"receive_data_column.go",
"service.go",
"tracked_proposer.go",
"weak_subjectivity_checks.go",
Expand All @@ -49,6 +49,7 @@ go_library(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
Expand Down Expand Up @@ -160,6 +161,7 @@ go_test(
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/blockchain/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
)

var errMaxBlobsExceeded = errors.New("Expected commitments in block exceeds MAX_BLOBS_PER_BLOCK")
var errMaxDataColumnsExceeded = errors.New("Expected data columns for node exceeds NUMBER_OF_COLUMNS")

// An invalid block is the block that fails state transition based on the core protocol rules.
// The beacon node shall not be accepting nor building blocks that branch off from an invalid block.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func WithBLSToExecPool(p blstoexec.PoolManager) Option {
}

// WithP2PBroadcaster to broadcast messages after appropriate processing.
func WithP2PBroadcaster(p p2p.Broadcaster) Option {
func WithP2PBroadcaster(p p2p.Acceser) Option {
return func(s *Service) error {
s.cfg.P2p = p
s.cfg.P2P = p
return nil
}
}
Expand Down
109 changes: 107 additions & 2 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
Expand All @@ -29,8 +33,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

// A custom slot deadline for processing state slots in our cache.
Expand Down Expand Up @@ -513,12 +515,35 @@ func missingIndices(bs *filesystem.BlobStorage, root [32]byte, expected [][]byte
return missing, nil
}

func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[uint64]bool) (map[uint64]bool, error) {
if len(expected) == 0 {
return nil, nil
}
if len(expected) > int(params.BeaconConfig().NumberOfColumns) {
return nil, errMaxDataColumnsExceeded
}
indices, err := bs.ColumnIndices(root)
if err != nil {
return nil, err
}
missing := make(map[uint64]bool, len(expected))
for col := range expected {
if !indices[col] {
missing[col] = true
}
}
return missing, nil
}

// isDataAvailable blocks until all BlobSidecars committed to in the block are available,
// or an error or context cancellation occurs. A nil result means that the data availability check is successful.
// The function will first check the database to see if all sidecars have been persisted. If any
// sidecars are missing, it will then read from the blobNotifier channel for the given root until the channel is
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if features.Get().EnablePeerDAS {
return s.isDataAvailableDataColumns(ctx, root, signed)
}
if signed.Version() < version.Deneb {
return nil
}
Expand Down Expand Up @@ -590,6 +615,86 @@ func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed int
}
}

func (s *Service) isDataAvailableDataColumns(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if signed.Version() < version.Deneb {
return nil
}

block := signed.Block()
if block == nil {
return errors.New("invalid nil beacon block")
}
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(block.Slot()), slots.ToEpoch(s.CurrentSlot())) {
return nil
}
body := block.Body()
if body == nil {
return errors.New("invalid nil beacon block body")
}
kzgCommitments, err := body.BlobKzgCommitments()
if err != nil {
return errors.Wrap(err, "could not get KZG commitments")
}
// If block has not commitments there is nothing to wait for.
if len(kzgCommitments) == 0 {
return nil
}

colMap, err := peerdas.CustodyColumns(s.cfg.P2P.NodeID(), params.BeaconConfig().CustodyRequirement)
if err != nil {
return err
}
// expected is the number of custodied data columnns a node is expected to have.
expected := len(colMap)
if expected == 0 {
return nil
}
// get a map of data column indices that are not currently available.
missing, err := missingDataColumns(s.blobStorage, root, colMap)
if err != nil {
return err
}
// If there are no missing indices, all data column sidecars are available.
if len(missing) == 0 {
return nil
}

// The gossip handler for data columns writes the index of each verified data column referencing the given
// root to the channel returned by blobNotifiers.forRoot.
nc := s.blobNotifiers.forRoot(root)

// Log for DA checks that cross over into the next slot; helpful for debugging.
nextSlot := slots.BeginsAt(signed.Block().Slot()+1, s.genesisTime)
// Avoid logging if DA check is called after next slot start.
if nextSlot.After(time.Now()) {
nst := time.AfterFunc(time.Until(nextSlot), func() {
if len(missing) == 0 {
return
}
log.WithFields(daCheckLogFields(root, signed.Block().Slot(), expected, len(missing))).
Error("Still waiting for DA check at slot end.")
})
defer nst.Stop()
}
for {
select {
case idx := <-nc:
// Delete each index seen in the notification channel.
delete(missing, idx)
// Read from the channel until there are no more missing sidecars.
if len(missing) > 0 {
continue
}
// Once all sidecars have been observed, clean up the notification channel.
s.blobNotifiers.delete(root)
return nil
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "context deadline waiting for blob sidecars slot: %d, BlockRoot: %#x", block.Slot(), root)
}
}
}

func daCheckLogFields(root [32]byte, slot primitives.Slot, expected, missing int) logrus.Fields {
return logrus.Fields{
"slot": slot,
Expand Down
18 changes: 18 additions & 0 deletions beacon-chain/blockchain/receive_data_column.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package blockchain

import (
"context"

"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
)

func (s *Service) ReceiveDataColumn(ctx context.Context, ds blocks.VerifiedRODataColumn) error {
if err := s.blobStorage.SaveDataColumn(ds); err != nil {
return err
}

// TODO use a custom event or new method of for data columns. For speed
// we are simply reusing blob paths here.
s.sendNewBlobEvent(ds.BlockRoot(), uint64(ds.SignedBlockHeader.Header.Slot))
return nil
}
12 changes: 0 additions & 12 deletions beacon-chain/blockchain/receive_sidecar.go

This file was deleted.

2 changes: 1 addition & 1 deletion beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type config struct {
ExitPool voluntaryexits.PoolManager
SlashingPool slashings.PoolManager
BLSToExecPool blstoexec.PoolManager
P2p p2p.Broadcaster
P2P p2p.Acceser
MaxRoutines int
StateNotifier statefeed.Notifier
ForkChoiceStore f.ForkChoicer
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
WithAttestationPool(attestations.NewPool()),
WithSlashingPool(slashings.NewPool()),
WithExitPool(voluntaryexits.NewPool()),
WithP2PBroadcaster(&mockBroadcaster{}),
WithP2PBroadcaster(&mockAccesser{}),
WithStateNotifier(&mockBeaconNode{}),
WithForkChoiceStore(fc),
WithAttestationService(attService),
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2pTesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
Expand All @@ -45,6 +46,11 @@ type mockBroadcaster struct {
broadcastCalled bool
}

type mockAccesser struct {
mockBroadcaster
p2pTesting.MockPeerManager
}

func (mb *mockBroadcaster) Broadcast(_ context.Context, _ proto.Message) error {
mb.broadcastCalled = true
return nil
Expand Down
Loading

0 comments on commit 807c157

Please sign in to comment.