Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

algod importer: Update sync on WaitForBlock error. #122

Merged
merged 11 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 113 additions & 54 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"bufio"
"context"
_ "embed" // used to embed config
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -40,9 +41,8 @@
followerMode
)

// Retry
const (
retries = 5
var (
waitForRoundTimeout = 5 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

A more conservative timeout would be 45 seconds. I agree that we want to endow conduit with a greater amount of determinism regarding the outcome of each call to the waitForBlock endpoint. So it's a good idea to make the call to the endpoint timeout on it's own terms rather than the endpoint's as is being done in the PR. On the other hand, we might still want the ability to keep 10 threads of the algod importer running concurrently after we've all caught up, and a 45 sec timeout would allow for that. If we narrow the timeout to 5 secs, we essentially only allow one or two algod importer threads to run at a time (probably only one due to round time variability).

On the other hand, we can change the value as aggressively as in the PR, and if the need arises in the future to raise it back to 45 secs we can do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The low timeout was intended for responsiveness, basically when the node is stalled the timeout needs to elapse before the first recovery attempt. If there's a timeout I'm expecting the pipeline to retry the call.

The default retry count is 5, now I'm wondering if it should be unlimited.

The old Indexer had a package called fetcher, I wonder if we should bring that back to manage more optimal round caching: https://github.com/algorand/indexer/blob/master/fetcher/fetcher.go#L1

Copy link
Contributor

@tzaffi tzaffi Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A worthwhile thought for a future PR or even the pipelining effort. (suggest keeping this thread unresolved for future reference)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the default retry timeout to 0 in a followup PR, it's probably a good default anyway since people have expressed appreciation for Indexer working that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand All @@ -68,14 +68,14 @@
}

func (algodImp *algodImporter) OnComplete(input data.BlockData) error {
if algodImp.mode == followerMode {
syncRound := input.Round() + 1
_, err := algodImp.aclient.SetSyncRound(syncRound).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.OnComplete(BlockData) called SetSyncRound(syncRound=%d) err: %v", syncRound, err)
return err

Check warning on line 75 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L71-L75

Added lines #L71 - L75 were not covered by tests
}

return nil

Check warning on line 78 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L78

Added line #L78 was not covered by tests
}

func (algodImp *algodImporter) Metadata() plugins.Metadata {
Expand Down Expand Up @@ -165,11 +165,11 @@
func getMissingCatchpointLabel(URL string, nextRound uint64) (string, error) {
resp, err := http.Get(URL)
if err != nil {
return "", err
}

Check warning on line 169 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L168-L169

Added lines #L168 - L169 were not covered by tests
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read catchpoint label response: %w", err)

Check warning on line 172 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L172

Added line #L172 was not covered by tests
}

if resp.StatusCode != 200 {
Expand All @@ -184,7 +184,7 @@
line := scanner.Text()
round, err := parseCatchpointRound(line)
if err != nil {
return "", err

Check warning on line 187 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L187

Added line #L187 was not covered by tests
}
// TODO: Change >= to > after go-algorand#5352 is fixed.
if uint64(round) >= nextRound {
Expand Down Expand Up @@ -320,11 +320,14 @@
}
}

_, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx)
status, err := algodImp.aclient.StatusAfterBlock(targetRound).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.catchupNode() called StatusAfterBlock(targetRound=%d) err: %v", targetRound, err)
if err != nil {
err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: %w", err)
}
if status.LastRound < targetRound {
err = fmt.Errorf("received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round %d != %d", status.LastRound, targetRound)
}
return err
}

Expand Down Expand Up @@ -367,7 +370,7 @@
algodImp.aclient = client
genesisResponse, err := algodImp.aclient.GetGenesis().Do(algodImp.ctx)
if err != nil {
return err

Check warning on line 373 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L373

Added line #L373 was not covered by tests
}

genesis := sdk.Genesis{}
Expand All @@ -375,7 +378,7 @@
// Don't fail on unknown properties here since the go-algorand and SDK genesis types differ slightly
err = json.LenientDecode([]byte(genesisResponse), &genesis)
if err != nil {
return err

Check warning on line 381 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L381

Added line #L381 was not covered by tests
}
if reflect.DeepEqual(genesis, sdk.Genesis{}) {
return fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr)
Expand Down Expand Up @@ -414,69 +417,125 @@
return delta, nil
}

func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
var blockbytes []byte
var err error
var status models.NodeStatus
var blk data.BlockData
// SyncError is used to indicate algod and conduit are not synchronized.
type SyncError struct {
rnd uint64
expected uint64
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

for r := 0; r < retries; r++ {
status, err = algodImp.aclient.StatusAfterBlock(rnd - 1).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.GetBlock() called StatusAfterBlock(%d) err: %v", rnd-1, err)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
err = fmt.Errorf("error getting status for round: %w", err)
algodImp.logger.Errorf("error getting status for round %d (attempt %d): %s", rnd, r, err.Error())
continue
func (e *SyncError) Error() string {
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
}

func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
ctxWithTimeout, cf := context.WithTimeout(ctx, to)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
defer cf()
status, err := c.StatusAfterBlock(rnd - 1).Do(ctxWithTimeout)
l.Tracef("importer algod.waitForRoundWithTimeout() called StatusAfterBlock(%d) err: %v", rnd-1, err)

if err == nil {
// When c.StatusAfterBlock has a server-side timeout it returns the current status.
// We use a context with timeout and the algod default timeout is 1 minute, so technically
// with the current versions, this check should never be required.
if rnd <= status.LastRound {
return status.LastRound, nil
}
start := time.Now()
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.GetBlock() called BlockRaw(%d) err: %v", rnd, err)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d (attempt %d): %s", rnd, r, err.Error())
continue
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
status2, err2 := c.Status().Do(ctx)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
l.Tracef("importer algod.waitForRoundWithTimeout() called Status() err: %v", err2)
if err2 != nil {
// If there was an error getting status, return the original error.
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
}
if status2.LastRound < rnd {
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
}

Check warning on line 461 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L456-L461

Added lines #L456 - L461 were not covered by tests

// This is probably a connection error, not a SyncError.
return 0, fmt.Errorf("unknown errors: StatusAfterBlock(%w), Status(%w)", err, err2)

Check warning on line 464 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L464

Added line #L464 was not covered by tests
}

func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error) {
var blockbytes []byte
var blk data.BlockData

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
start := time.Now()
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
algodImp.logger.Tracef("importer algod.GetBlock() called BlockRaw(%d) err: %v", rnd, err)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

Check warning on line 493 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L492-L493

Added lines #L492 - L493 were not covered by tests

blk.BlockHeader = tmpBlk.Block.BlockHeader
blk.Payset = tmpBlk.Block.Payset
blk.Certificate = tmpBlk.Cert

blk.BlockHeader = tmpBlk.Block.BlockHeader
blk.Payset = tmpBlk.Block.Payset
blk.Certificate = tmpBlk.Cert

if algodImp.mode == followerMode {
// Round 0 has no delta associated with it
if rnd != 0 {
var delta sdk.LedgerStateDelta
delta, err = algodImp.getDelta(rnd)
if err != nil {
if status.LastRound < rnd {
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", status.LastRound, rnd, err)
} else {
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", status.LastRound, rnd, err)
}
algodImp.logger.Error(err.Error())
return data.BlockData{}, err
if algodImp.mode == followerMode {
// Round 0 has no delta associated with it
if rnd != 0 {
var delta sdk.LedgerStateDelta
delta, err = algodImp.getDelta(rnd)
if err != nil {
if nodeRound < rnd {
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)

Check warning on line 506 in conduit/plugins/importers/algod/algod_importer.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/importers/algod/algod_importer.go#L506

Added line #L506 was not covered by tests
} else {
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
}
blk.Delta = &delta
algodImp.logger.Error(err.Error())
return data.BlockData{}, err
}
blk.Delta = &delta
}

return blk, err
}

err = fmt.Errorf("failed to get block for round %d after %d attempts, check node configuration: %s", rnd, retries, err)
algodImp.logger.Errorf(err.Error())
return blk, err
}

func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
blk, err := algodImp.getBlockInner(rnd)

if err != nil {
target := &SyncError{}
if errors.As(err, &target) {
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
} else {
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
algodImp.logger.Errorf(err.Error())
}
return data.BlockData{}, err
}

return blk, nil

}

func (algodImp *algodImporter) ProvideMetrics(subsystem string) []prometheus.Collector {
getAlgodRawBlockTimeSeconds = initGetAlgodRawBlockTimeSeconds(subsystem)
return []prometheus.Collector{
Expand Down
66 changes: 51 additions & 15 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package algodimporter
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
Expand Down Expand Up @@ -277,10 +277,11 @@ func TestInitCatchup(t *testing.T) {
algodServer: NewAlgodServer(
GenesisResponder,
MakePostSyncRoundResponder(http.StatusOK),
MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
MakeJsonResponderSeries("/v2/status", []int{http.StatusOK, http.StatusOK, http.StatusBadRequest}, []interface{}{models.NodeStatus{LastRound: 1235}}),
MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, nil)),
netAddr: "",
errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: HTTP 400",
errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round",
errGetGen: "",
logs: []string{},
},
Expand All @@ -291,6 +292,7 @@ func TestInitCatchup(t *testing.T) {
catchpoint: "1236#abcd",
algodServer: NewAlgodServer(
GenesisResponder,
MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
MakePostSyncRoundResponder(http.StatusOK),
MakeJsonResponderSeries("/v2/status", []int{http.StatusOK}, []interface{}{
models.NodeStatus{LastRound: 1235},
Expand All @@ -302,6 +304,33 @@ func TestInitCatchup(t *testing.T) {
}),
MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, "")),
netAddr: "",
errInit: "received unexpected error (StatusAfterBlock) waiting for node to catchup: did not reach expected round",
errGetGen: "",
logs: []string{
"catchup phase Processed Accounts: 1 / 1",
"catchup phase Verified Accounts: 1 / 1",
"catchup phase Acquired Blocks: 1 / 1",
"catchup phase Verified Blocks",
}},
{
name: "monitor catchup success",
adminToken: "admin",
targetRound: 1237,
catchpoint: "1236#abcd",
algodServer: NewAlgodServer(
GenesisResponder,
MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
MakePostSyncRoundResponder(http.StatusOK),
MakeJsonResponderSeries("/v2/status", []int{http.StatusOK}, []interface{}{
models.NodeStatus{LastRound: 1235},
models.NodeStatus{Catchpoint: "1236#abcd", CatchpointProcessedAccounts: 1, CatchpointTotalAccounts: 1},
models.NodeStatus{Catchpoint: "1236#abcd", CatchpointVerifiedAccounts: 1, CatchpointTotalAccounts: 1},
models.NodeStatus{Catchpoint: "1236#abcd", CatchpointAcquiredBlocks: 1, CatchpointTotalBlocks: 1},
models.NodeStatus{Catchpoint: "1236#abcd"},
models.NodeStatus{LastRound: 1237}, // this is the only difference from the previous test
}),
MakeMsgpStatusResponder("post", "/v2/catchup/", http.StatusOK, "")),
netAddr: "",
errInit: "",
errGetGen: "",
logs: []string{
Expand Down Expand Up @@ -581,6 +610,10 @@ netaddr: %s
}

func TestGetBlockFailure(t *testing.T) {
// Note: There are panics in the log because the init function in these tests calls the
// delta endpoint and causes a panic in most cases. This causes the "needs catchup"
// function to send out a sync request at which point logic continues as normal and
// the GetBlock function is able to run for the test.
tests := []struct {
name string
algodServer *httptest.Server
Expand Down Expand Up @@ -634,6 +667,7 @@ func TestAlgodImporter_ProvideMetrics(t *testing.T) {
}

func TestGetBlockErrors(t *testing.T) {
waitForRoundTimeout = time.Hour
testcases := []struct {
name string
rnd uint64
Expand All @@ -644,28 +678,29 @@ func TestGetBlockErrors(t *testing.T) {
err string
}{
{
name: "Cannot get status",
name: "Cannot wait for block",
rnd: 123,
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{}}),
err: fmt.Sprintf("error getting status for round"),
logs: []string{"error getting status for round 123", "failed to get block for round 123 "},
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}),
err: fmt.Sprintf("error getting block for round 123"),
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get block",
rnd: 123,
blockAfterResponder: BlockAfterResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""),
err: fmt.Sprintf("failed to get block"),
logs: []string{"error getting block for round 123", "failed to get block for round 123 "},
err: fmt.Sprintf("error getting block for round 123"),
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get delta (node behind)",
name: "Cannot get delta (node behind, re-send sync)",
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("ledger state delta not found: node round (50) is behind required round (200)"),
logs: []string{"ledger state delta not found: node round (50) is behind required round (200)"},
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
},
{
name: "Cannot get delta (caught up)",
Expand Down Expand Up @@ -721,6 +756,7 @@ func TestGetBlockErrors(t *testing.T) {
for _, log := range tc.logs {
found := false
for _, entry := range hook.AllEntries() {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
}
noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log)
Expand Down
12 changes: 11 additions & 1 deletion conduit/plugins/importers/algod/mock_algod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,17 @@ func MakeNodeStatusResponder(status models.NodeStatus) algodCustomHandler {
return MakeJsonResponder("/v2/status", status)
}

var BlockAfterResponder = MakeBlockAfterResponder(models.NodeStatus{})
// BlockAfterResponder handles /v2/status requests and returns a NodeStatus object with the provided last round
func BlockAfterResponder(r *http.Request, w http.ResponseWriter) bool {
if strings.Contains(r.URL.Path, "/wait-for-block-after") {
rnd, _ := strconv.Atoi(path.Base(r.URL.Path))
w.WriteHeader(http.StatusOK)
status := models.NodeStatus{LastRound: uint64(rnd + 1)}
_, _ = w.Write(json.Encode(status))
return true
}
return false
}

func MakeLedgerStateDeltaResponder(delta types.LedgerStateDelta) algodCustomHandler {
return MakeMsgpResponder("/v2/deltas/", delta)
Expand Down
Loading