Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sealing halt on exec fork #178

Merged
merged 40 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a97d504
added height to error msg for inconsistent blocks
Nov 2, 2020
0b7ae9a
fixed typo in error msg
Nov 2, 2020
8682f5f
refactored temporary sanity checks into its own method to no bloat ma…
Nov 2, 2020
9ad30da
unit tests for consensus.Builder updated and extended
Nov 3, 2020
f8624f7
Merge branch 'master' into alex/add_height_to_multiple_seal_error_msg
Nov 3, 2020
0c84641
Update module/builder/consensus/builder.go
Nov 5, 2020
fb21a73
wip
Nov 8, 2020
8da45d3
cleanup; first algorithmic draft completed; still needs testing
Nov 8, 2020
b82c533
cleanup
Nov 8, 2020
c075e73
code cleaning and polishing
Nov 8, 2020
9662906
restructured unittest fixtures to avoid naming collision
Nov 10, 2020
f1742a6
wip
Nov 10, 2020
f186c66
Merge branch 'master' into alex/add_height_to_multiple_seal_error_msg
Nov 13, 2020
36a3ec5
wip
Nov 13, 2020
7269be8
rough draft for execution fork suppressor: compiles but still needs t…
Nov 13, 2020
94288ff
implementing tests (interim commit)
Nov 16, 2020
bcd4f3d
completed implementation and tests
Nov 19, 2020
5de72e3
shortened naming
Nov 20, 2020
9f4c7ad
Merge branch 'master' into alex/add_height_to_multiple_seal_error_msg
Nov 20, 2020
4bd114e
updated EpochBuilder to new fixtures
Nov 20, 2020
0c1f245
extended Builder tests
Nov 21, 2020
278b470
linted code
Nov 23, 2020
0df7cce
Merge branch 'master' into alex/add_height_to_multiple_seal_error_msg
Nov 25, 2020
4524714
integrated ExecForkSuppressor
Nov 25, 2020
d61f61f
removed logic from builder that now is in ExecStateSupressor
Nov 25, 2020
7c48153
cleanup
Nov 25, 2020
6fc4c6a
removed test for conflicting seals as this is now handled by ExecFork…
Nov 25, 2020
a6f3aa5
Merge branch 'master' into alex/consensus-sealing-halt-on-exec-fork
zhangchiqing Nov 26, 2020
2d0fef0
interim commit
Dec 2, 2020
e1006ad
cleanup
Dec 2, 2020
0e7af9c
fixed main
Dec 2, 2020
ab67a1b
Merge branch 'master' into alex/consensus-sealing-halt-on-exec-fork
Dec 2, 2020
33de9d9
comments on builder unittests
Dec 2, 2020
addcb4b
fixed outdated comment on builder unittest TestPayloadSealOnlyFork
Dec 2, 2020
49fb4a1
changed local executionForkErr to be non-exported
Dec 2, 2020
d14a1e7
Rolled back my changes to incorporated_result.go
Dec 3, 2020
2b6e669
addressed reviewers comments (part 1)
Dec 3, 2020
d798b6b
addressed reviewers comments (part 2)
Dec 3, 2020
3a9d171
Merge branch 'master' into alex/consensus-sealing-halt-on-exec-fork
Dec 3, 2020
3ded2bf
Merge branch 'master' into alex/consensus-sealing-halt-on-exec-fork
Dec 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
chmodule "github.com/onflow/flow-go/module/chunks"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/mempool"
consensusMempools "github.com/onflow/flow-go/module/mempool/consensus"
"github.com/onflow/flow-go/module/mempool/ejectors"
"github.com/onflow/flow-go/module/mempool/stdmap"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -132,7 +133,11 @@ func main() {
// use a custom ejector so we don't eject seals that would break
// the chain of seals
ejector := ejectors.NewLatestIncorporatedResultSeal(node.Storage.Headers)
seals = stdmap.NewIncorporatedResultSeals(sealLimit, stdmap.WithEject(ejector.Eject))
seals = stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(sealLimit), stdmap.WithEject(ejector.Eject))
seals, err = consensusMempools.NewExecStateForkSuppressor(seals, node.DB, node.Logger)
if err != nil {
return fmt.Errorf("failed to wrap seals mempool into ExecStateForkSuppressor: %w", err)
}
return nil
}).
Module("consensus node metrics", func(node *cmd.FlowNodeBuilder) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/block_hash_by_height_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func storeAndIndexHeader(t *testing.T, db *badger.DB, headers *bstorage.Headers,
}

func storeAndIndexSealFor(t *testing.T, db *badger.DB, seals *bstorage.Seals, h *flow.Header) {
seal := unittest.SealFixture()
seal := unittest.Seal.Fixture()
seal.BlockID = h.ID()

err := seals.Store(seal)
Expand Down
2 changes: 1 addition & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func createNode(
guarantees, err := stdmap.NewGuarantees(guaranteeLimit)
require.NoError(t, err)

seals := stdmap.NewIncorporatedResultSeals(sealLimit)
seals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(sealLimit))

// initialize the block builder
build := builder.NewBuilder(metrics, db, state, headersDB, sealsDB, indexDB, guarantees, seals, tracer)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ func (suite *Suite) TestGetAccount() {

// setup the latest sealed block
header := unittest.BlockHeaderFixture() // create a mock header
seal := unittest.SealFixture() // create a mock seal
seal := unittest.Seal.Fixture() // create a mock seal
seal.BlockID = header.ID() // make the seal point to the header

suite.snapshot.
Expand Down
15 changes: 13 additions & 2 deletions engine/consensus/matching/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,19 @@ func (e *Engine) onReceipt(originID flow.Identifier, receipt *flow.ExecutionRece
Logger()

resultFinalState, ok := receipt.ExecutionResult.FinalStateCommitment()
if !ok { // return
if !ok || len(resultFinalState) < 1 { // discard receipt
log.Error().Msg("execution receipt without FinalStateCommit received")
return engine.NewInvalidInputErrorf("execution receipt without FinalStateCommit: %x", receipt.ID())
}
log = log.With().Hex("final_state", resultFinalState).Logger()

resultInitialState, ok := receipt.ExecutionResult.InitialStateCommit()
if !ok || len(resultInitialState) < 1 { // discard receipt
log.Error().Msg("execution receipt without InitialStateCommit received")
return engine.NewInvalidInputErrorf("execution receipt without InitialStateCommit: %x", receipt.ID())
}
log = log.With().Hex("initial_state", resultInitialState).Logger()

// CAUTION INCOMPLETE
// For many other messages, we check that the message's origin (as established by the
// networking layer) is equal to the message's creator as reported by the message itself.
Expand Down Expand Up @@ -281,6 +288,7 @@ func (e *Engine) onReceipt(originID flow.Identifier, receipt *flow.ExecutionRece
added, err := e.incorporatedResults.Add(flow.NewIncorporatedResult(result.BlockID, result))
if err != nil {
log.Err(err).Msg("error inserting incorporated result in mempool")
return fmt.Errorf("error inserting incorporated result in mempool: %w", err)
}
if !added {
log.Debug().Msg("skipping result already in mempool")
Expand Down Expand Up @@ -722,10 +730,13 @@ func (e *Engine) sealResult(incorporatedResult *flow.IncorporatedResult) error {
}

// we don't care if the seal is already in the mempool
_ = e.seals.Add(&flow.IncorporatedResultSeal{
_, err = e.seals.Add(&flow.IncorporatedResultSeal{
IncorporatedResult: incorporatedResult,
Seal: seal,
})
if err != nil {
return fmt.Errorf("failed to store IncorporatedResultSeal in mempool: %w", err)
}

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func ConsensusNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit
approvals, err := stdmap.NewApprovals(1000)
require.NoError(t, err)

seals := stdmap.NewIncorporatedResultSeals(1000)
seals := stdmap.NewIncorporatedResultSeals(stdmap.WithLimit(1000))

// receive collections
ingestionEngine, err := consensusingest.New(node.Log, node.Tracer, node.Metrics, node.Metrics, node.Metrics, node.Net, node.State, node.Headers, node.Me, guarantees)
Expand Down
2 changes: 1 addition & 1 deletion model/flow/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestNilProducesSameHashAsEmptySlice(t *testing.T) {

func TestOrderingChangesHash(t *testing.T) {

seals := unittest.BlockSealsFixture(5)
seals := unittest.Seal.Fixtures(5)

payload1 := flow.Payload{
Seals: seals,
Expand Down
13 changes: 11 additions & 2 deletions model/flow/incorporated_result.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package flow

import "github.com/onflow/flow-go/crypto"
import (
"github.com/onflow/flow-go/crypto"
)

// IncorporatedResult is a wrapper around an ExecutionResult which contains the
// ID of the first block on its fork in which it was incorporated.
Expand Down Expand Up @@ -35,7 +37,14 @@ func NewIncorporatedResult(incorporatedBlockID Identifier, result *ExecutionResu
// ID implements flow.Entity.ID for IncorporatedResult to make it capable of
// being stored directly in mempools and storage.
func (ir *IncorporatedResult) ID() Identifier {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IncorporatedResult is used to store approval signatures when they are received:

aggregatedSignatures map[uint64]*AggregatedSignature
When including aggregatedSignatures in the ID computation, this changes the ID of the entity, which should not be the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it changes the ID of the IncorporatedResult because aggregatedSignatures is a private field and is ignored in the RLP encoding which is used to calculate the ID

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 thanks for pointing this out @arrivets . I was not aware that RLP did skip private fields.
Obviously, I didn't read the test you wrote:

// TestIncorporatedResultID checks that the ID and Checksum of the Incorporated
// Result do not depend on the aggregatedSignatures placeholder.
func TestIncorporatedResultID(t *testing.T) {
🤦 Apologies.

Rolled back my changes to incorporated_result.go

return MakeID(ir)
body := struct {
IncorporatedBlockID Identifier
Result *ExecutionResult
}{
IncorporatedBlockID: ir.IncorporatedBlockID,
Result: ir.Result,
}
return MakeID(body)
}

// CheckSum implements flow.Entity.CheckSum for IncorporatedResult to make it
Expand Down
37 changes: 1 addition & 36 deletions module/builder/consensus/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package consensus

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -234,43 +233,9 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
// roadmap (https://github.com/dapperlabs/flow-go/issues/4872)

// create a mapping of block to seal for all seals in our pool
// We consider two seals as inconsistent, if they have different start or end states
encounteredInconsistentSealsForSameBlock := false
byBlock := make(map[flow.Identifier]*flow.IncorporatedResultSeal)
for _, irSeal := range b.sealPool.All() {
if len(irSeal.IncorporatedResult.Result.Chunks) < 1 {
return nil, fmt.Errorf("ExecutionResult without chunks: %v", irSeal.IncorporatedResult.Result.ID())
}
if len(irSeal.Seal.FinalState) < 1 {
// respective Execution Result should have been rejected by matching engine
return nil, fmt.Errorf("seal with empty state commitment: %v", irSeal.ID())
}
if irSeal2, found := byBlock[irSeal.Seal.BlockID]; found {
sc1json, err := json.Marshal(irSeal)
if err != nil {
return nil, err
}
sc2json, err := json.Marshal(irSeal2)
if err != nil {
return nil, err
}

// check whether seals are inconsistent:
if !bytes.Equal(irSeal.Seal.FinalState, irSeal2.Seal.FinalState) ||
!bytes.Equal(irSeal.IncorporatedResult.Result.Chunks[0].StartState, irSeal2.IncorporatedResult.Result.Chunks[0].StartState) {
fmt.Printf("ERROR: inconsistent seals for the same block %v: %s and %s", irSeal.Seal.BlockID, string(sc1json), string(sc2json))
encounteredInconsistentSealsForSameBlock = true
} else {
fmt.Printf("WARNING: multiple seals with different IDs for the same block %v: %s and %s", irSeal.Seal.BlockID, string(sc1json), string(sc2json))
}

} else {
byBlock[irSeal.Seal.BlockID] = irSeal
}
}
if encounteredInconsistentSealsForSameBlock {
// in case we find inconsistent seals, do not seal anything
byBlock = make(map[flow.Identifier]*flow.IncorporatedResultSeal)
byBlock[irSeal.Seal.BlockID] = irSeal
}

// get the parent's block seal, which constitutes the beginning of the
Expand Down
85 changes: 66 additions & 19 deletions module/builder/consensus/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (bs *BuilderSuite) createAndRecordBlock(parentBlock *flow.Block) *flow.Bloc

incorporatedResultForPrevBlock = unittest.IncorporatedResult.Fixture(
unittest.IncorporatedResult.WithResult(previousResult),
unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()),
//unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()),
unittest.IncorporatedResult.WithIncorporatedBlockID(parentBlock.ID()),
)
result := unittest.ExecutionResultFixture(
unittest.WithBlock(&block),
Expand Down Expand Up @@ -140,21 +141,11 @@ func (bs *BuilderSuite) createAndRecordBlock(parentBlock *flow.Block) *flow.Bloc
// IncorporatedResultSeal, which ties the seal to the incorporated result it
// seals, is also recorded for future access.
func (bs *BuilderSuite) chainSeal(incorporatedResult *flow.IncorporatedResult) {

finalState, _ := incorporatedResult.Result.FinalStateCommitment()

seal := &flow.Seal{
BlockID: incorporatedResult.Result.BlockID,
ResultID: incorporatedResult.Result.ID(),
FinalState: finalState,
}
bs.chain = append(bs.chain, seal)

incorporatedResultSeal := &flow.IncorporatedResultSeal{
IncorporatedResult: incorporatedResult,
Seal: seal,
}

incorporatedResultSeal := unittest.IncorporatedResultSeal.Fixture(
unittest.IncorporatedResultSeal.WithResult(incorporatedResult.Result),
unittest.IncorporatedResultSeal.WithIncorporatedBlockID(incorporatedResult.IncorporatedBlockID),
)
bs.chain = append(bs.chain, incorporatedResultSeal.Seal)
bs.irsMap[incorporatedResultSeal.ID()] = incorporatedResultSeal
bs.irsList = append(bs.irsList, incorporatedResultSeal)
}
Expand Down Expand Up @@ -524,15 +515,23 @@ func (bs *BuilderSuite) TestPayloadSealAllValid() {
bs.Assert().ElementsMatch(bs.chain, bs.assembled.Seals, "should have included valid chain of seals")
}

func (bs *BuilderSuite) TestPayloadSealSomeValid() {
// TestPayloadSealSomeValidOnFork verifies that builder only includes seals whose
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems incomplete

Copy link
Member Author

@AlexHentschel AlexHentschel Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this entire test was an outdated artifact from work-in-progress. Removed it

//
// if an IncorporatedResultSeal is included in mempool, whose final state does not match the execution result
func (bs *BuilderSuite) TestPayloadSealSomeValidOnFork_old() {
// skipping this test:
// * currently, the builder relies on the IncorporatedResultSeal to be constructed
// correctly by the matching engine
// * with the implementation of this test, the matching engine
bs.T().Skip()

bs.pendingSeals = bs.irsMap

// add some invalid seals to the mempool
for i := 0; i < 8; i++ {
invalid := &flow.IncorporatedResultSeal{
IncorporatedResult: unittest.IncorporatedResultFixture(),
Seal: unittest.SealFixture(),
IncorporatedResult: unittest.IncorporatedResult.Fixture(),
Seal: unittest.Seal.Fixture(),
}
bs.pendingSeals[invalid.ID()] = invalid
}
Expand All @@ -543,6 +542,54 @@ func (bs *BuilderSuite) TestPayloadSealSomeValid() {
bs.Assert().ElementsMatch(bs.chain, bs.assembled.Seals, "should have included only valid chain of seals")
}

// TestPayloadSealSomeValidOnFork verifies that builder only includes seals whose
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems incomplete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for noticing. Fixed the test's doc.

//
// if an IncorporatedResultSeal is included in mempool, whose final state does not match the execution result
AlexHentschel marked this conversation as resolved.
Show resolved Hide resolved
func (bs *BuilderSuite) TestPayloadSealOnlyFork() {
// skipping this test:
// * currently, the builder relies on the IncorporatedResultSeal to be constructed
// correctly by the matching engine
// * with the implementation of this test, the matching engine
//bs.T().Skip()
AlexHentschel marked this conversation as resolved.
Show resolved Hide resolved

// in the test setup, we already create a single fork
// [first] <- [F0] <- [F1] <- [F2] <- [F3] <- [A0] <- [A1] <- [A2] <- [A3]
// Where block
// * [first] is sealed and finalized
// * [F0] ... [F3] are finalized but _not_ sealed
// * [A0] ... [A3] are _not_ finalized and _not_ sealed
// We now create an additional fork: [F3] <- [B0] <- [B1] <- ... <- [B7]
Comment on lines +525 to +530
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite clear how this tests work.
But if you need help creating such forks in tests, I have an example here.
https://github.com/onflow/flow-go/blob/master/engine/execution/ingestion/engine_test.go#L816-L831

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the reference. The test for the Builder already contained its own implementation:

// createAndRecordBlock creates a new block chained to the previous block (if it
// is not nil). The new block contains a receipt for a result of the previous
// block, which is also used to create a seal for the previous block. The seal
// and the result are combined in an IncorporatedResultSeal which is a candidate
// for the seals mempool.
func (bs *BuilderSuite) createAndRecordBlock(parentBlock *flow.Block) *flow.Block {

it has some specialized functionality for testing the payload Builder:

  • creates execution receipts and seals for the blocks and includes them into the block's payloads
  • tracks the created seals and blocks in the mempool mocks

I am inclined to keep the current test structure as I just added a test. 😅

var forkHead *flow.Block
for f := 0; f < 100; f++ {
AlexHentschel marked this conversation as resolved.
Show resolved Hide resolved
forkHead = bs.blocks[bs.finalID]
for i := 0; i < 8; i++ {
forkHead = bs.createAndRecordBlock(forkHead)
// Method createAndRecordBlock adds a seal for every block into the mempool.
}
}

bs.pendingSeals = bs.irsMap
_, err := bs.build.BuildOn(forkHead.ID(), bs.setter)
bs.Require().NoError(err)

// expected seals: [F0] <- ... <- [F3] <- [B0] <- ... <- [B7]
bs.Assert().Equal(12, len(bs.assembled.Seals), "should have included only valid chain of seals")
bs.Assert().ElementsMatch(bs.chain[:4], bs.assembled.Seals[:4], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-1], bs.assembled.Seals[len(bs.assembled.Seals)-1], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-2], bs.assembled.Seals[len(bs.assembled.Seals)-2], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-3], bs.assembled.Seals[len(bs.assembled.Seals)-3], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-4], bs.assembled.Seals[len(bs.assembled.Seals)-4], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-5], bs.assembled.Seals[len(bs.assembled.Seals)-5], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-6], bs.assembled.Seals[len(bs.assembled.Seals)-6], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-7], bs.assembled.Seals[len(bs.assembled.Seals)-7], "should have included only valid chain of seals")
bs.Assert().Equal(bs.chain[len(bs.chain)-8], bs.assembled.Seals[len(bs.assembled.Seals)-8], "should have included only valid chain of seals")
//bs.Assert().ElementsMatch(bs.chain[len(bs.chain)-8:], bs.assembled.Seals[4:], "should have included only valid chain of seals")

//bs.Assert().ElementsMatch(bs.chain, bs.assembled.Seals, "should have included only valid chain of seals")
bs.Assert().Empty(bs.assembled.Guarantees, "should have no guarantees in payload with empty mempool")

}

func (bs *BuilderSuite) TestPayloadSealCutoffChain() {

// remove the seal at the start
Expand Down
8 changes: 8 additions & 0 deletions module/mempool/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package mempool

import "github.com/onflow/flow-go/model/flow"

// OnEjection is a callback which a mempool executes on ejecting
// one of its elements. The callbacks are executed from within the thread
// that serves the mempool. Implementations should be non-blocking.
type OnEjection func(flow.Entity)
Loading