From 421ceb13ab97e6c27278dde4b0bcbb6371977b2b Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Mon, 9 Sep 2024 17:55:19 -0400 Subject: [PATCH 01/12] simplify PR --- schema/appdata/filter.go | 42 ++++++++++ schema/decoding/resolver.go | 6 +- schema/decoding/resolver_test.go | 4 +- schema/decoding/sync.go | 2 +- schema/indexer/filter.go | 31 +++++++ schema/indexer/indexer.go | 26 +----- schema/indexer/manager.go | 138 +++++++++++++++++++++++++++++-- schema/indexer/sync.go | 42 ++++++++++ schema/logutil/logger.go | 4 + 9 files changed, 256 insertions(+), 39 deletions(-) create mode 100644 schema/appdata/filter.go create mode 100644 schema/indexer/filter.go create mode 100644 schema/indexer/sync.go diff --git a/schema/appdata/filter.go b/schema/appdata/filter.go new file mode 100644 index 000000000000..7c5f47c813c5 --- /dev/null +++ b/schema/appdata/filter.go @@ -0,0 +1,42 @@ +package appdata + +// ModuleFilter returns an updated listener that filters state updates based on the module name. +func ModuleFilter(listener Listener, filter func(moduleName string) bool) Listener { + if initModData := listener.InitializeModuleData; initModData != nil { + listener.InitializeModuleData = func(data ModuleInitializationData) error { + if !filter(data.ModuleName) { + return nil + } + + return initModData(data) + } + } + + if onKVPair := listener.OnKVPair; onKVPair != nil { + listener.OnKVPair = func(data KVPairData) error { + for _, update := range data.Updates { + if !filter(update.ModuleName) { + continue + } + + if err := onKVPair(KVPairData{Updates: []ModuleKVPairUpdate{update}}); err != nil { + return err + } + } + + return nil + } + } + + if onObjectUpdate := listener.OnObjectUpdate; onObjectUpdate != nil { + listener.OnObjectUpdate = func(data ObjectUpdateData) error { + if !filter(data.ModuleName) { + return nil + } + + return onObjectUpdate(data) + } + } + + return listener +} diff --git a/schema/decoding/resolver.go b/schema/decoding/resolver.go index cb022dbb6947..5478573401f6 100644 --- a/schema/decoding/resolver.go +++ b/schema/decoding/resolver.go @@ -15,8 +15,8 @@ type DecoderResolver interface { // EncodeModuleName encodes a module name into a byte slice that can be used as the actor in a KVPairUpdate. EncodeModuleName(string) ([]byte, error) - // IterateAll iterates over all available module decoders. - IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error + // AllDecoders iterates over all available module decoders. + AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error) error // LookupDecoder looks up a specific module decoder. LookupDecoder(moduleName string) (decoder schema.ModuleCodec, found bool, err error) @@ -48,7 +48,7 @@ func (a moduleSetDecoderResolver) EncodeModuleName(s string) ([]byte, error) { return nil, fmt.Errorf("module %s not found", s) } -func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error { +func (a moduleSetDecoderResolver) AllDecoders(f func(string, schema.ModuleCodec) error) error { keys := make([]string, 0, len(a.moduleSet)) for k := range a.moduleSet { keys = append(keys, k) diff --git a/schema/decoding/resolver_test.go b/schema/decoding/resolver_test.go index 397b97bd6c33..35ead29d2d7b 100644 --- a/schema/decoding/resolver_test.go +++ b/schema/decoding/resolver_test.go @@ -43,7 +43,7 @@ var testResolver = ModuleSetDecoderResolver(moduleSet) func TestModuleSetDecoderResolver_IterateAll(t *testing.T) { objectTypes := map[string]bool{} - err := testResolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error { + err := testResolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error { cdc.Schema.AllTypes(func(t schema.Type) bool { objTyp, ok := t.(schema.ObjectType) if ok { @@ -128,7 +128,7 @@ func TestModuleSetDecoderResolver_IterateAll_Error(t *testing.T) { resolver := ModuleSetDecoderResolver(map[string]interface{}{ "modD": modD{}, }) - err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error { + err := resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error { if moduleName == "modD" { t.Fatalf("expected error") } diff --git a/schema/decoding/sync.go b/schema/decoding/sync.go index d8aee9884c6a..68690b5716c9 100644 --- a/schema/decoding/sync.go +++ b/schema/decoding/sync.go @@ -27,7 +27,7 @@ func Sync(listener appdata.Listener, source SyncSource, resolver DecoderResolver return nil } - return resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error { + return resolver.AllDecoders(func(moduleName string, cdc schema.ModuleCodec) error { if opts.ModuleFilter != nil && !opts.ModuleFilter(moduleName) { // ignore this module return nil diff --git a/schema/indexer/filter.go b/schema/indexer/filter.go new file mode 100644 index 000000000000..60a27264458a --- /dev/null +++ b/schema/indexer/filter.go @@ -0,0 +1,31 @@ +package indexer + +type FilterConfig struct { + // ExcludeState specifies that the indexer will not receive state updates. + ExcludeState bool `json:"exclude_state"` + + // ExcludeEvents specifies that the indexer will not receive events. + ExcludeEvents bool `json:"exclude_events"` + + // ExcludeTxs specifies that the indexer will not receive transaction's. + ExcludeTxs bool `json:"exclude_txs"` + + // ExcludeBlockHeaders specifies that the indexer will not receive block headers, + // although it will still receive StartBlock and Commit callbacks, just without + // the header data. + ExcludeBlockHeaders bool `json:"exclude_block_headers"` + + Modules ModuleFilterConfig `json:"modules"` +} + +type ModuleFilterConfig struct { + // Include specifies a list of modules whose state the indexer will + // receive state updates for. + // Only one of include or exclude modules should be specified. + Include []string `json:"include"` + + // Exclude specifies a list of modules whose state the indexer will not + // receive state updates for. + // Only one of include or exclude modules should be specified. + Exclude []string `json:"exclude"` +} diff --git a/schema/indexer/indexer.go b/schema/indexer/indexer.go index 3b82e3254e5a..1a14919761d0 100644 --- a/schema/indexer/indexer.go +++ b/schema/indexer/indexer.go @@ -22,31 +22,9 @@ type Config struct { Type string `json:"type"` // Config are the indexer specific config options specified by the user. - Config map[string]interface{} `json:"config"` + Config map[string]interface{} `json:"config,omitempty"` - // ExcludeState specifies that the indexer will not receive state updates. - ExcludeState bool `json:"exclude_state"` - - // ExcludeEvents specifies that the indexer will not receive events. - ExcludeEvents bool `json:"exclude_events"` - - // ExcludeTxs specifies that the indexer will not receive transaction's. - ExcludeTxs bool `json:"exclude_txs"` - - // ExcludeBlockHeaders specifies that the indexer will not receive block headers, - // although it will still receive StartBlock and Commit callbacks, just without - // the header data. - ExcludeBlockHeaders bool `json:"exclude_block_headers"` - - // IncludeModules specifies a list of modules whose state the indexer will - // receive state updates for. - // Only one of include or exclude modules should be specified. - IncludeModules []string `json:"include_modules"` - - // ExcludeModules specifies a list of modules whose state the indexer will not - // receive state updates for. - // Only one of include or exclude modules should be specified. - ExcludeModules []string `json:"exclude_modules"` + Filter *FilterConfig `json:"filter,omitempty"` } type InitFunc = func(InitParams) (InitResult, error) diff --git a/schema/indexer/manager.go b/schema/indexer/manager.go index 60c19b4dd5c7..005a7cf6ae04 100644 --- a/schema/indexer/manager.go +++ b/schema/indexer/manager.go @@ -2,6 +2,9 @@ package indexer import ( "context" + "encoding/json" + "fmt" + "sync" "cosmossdk.io/schema/addressutil" "cosmossdk.io/schema/appdata" @@ -9,10 +12,10 @@ import ( "cosmossdk.io/schema/logutil" ) -// ManagerOptions are the options for starting the indexer manager. -type ManagerOptions struct { - // Config is the user configuration for all indexing. It should generally be an instance of map[string]interface{} - // and match the json structure of ManagerConfig. The manager will attempt to convert it to ManagerConfig. +// IndexingOptions are the options for starting the indexer manager. +type IndexingOptions struct { + // Config is the user configuration for all indexing. It should generally be an instance map[string]interface{} + // or json.RawMessage and match the json structure of IndexingConfig. The manager will attempt to convert it to IndexingConfig. Config interface{} // Resolver is the decoder resolver that will be used to decode the data. It is required. @@ -35,16 +38,133 @@ type ManagerOptions struct { // provided, but if it is omitted, the indexer manager will use a default codec which encodes and decodes addresses // as hex strings. AddressCodec addressutil.AddressCodec + + // DoneWaitGroup is a wait group that all indexer manager go routines will wait on before returning when the context + // is done. + // It is optional. + DoneWaitGroup *sync.WaitGroup } -// ManagerConfig is the configuration of the indexer manager and contains the configuration for each indexer target. -type ManagerConfig struct { +// IndexingConfig is the configuration of the indexer manager and contains the configuration for each indexer target. +type IndexingConfig struct { // Target is a map of named indexer targets to their configuration. Target map[string]Config + + // ChannelBufferSize is the buffer size of the channels used for buffering data sent to indexer go routines. + // It defaults to 1024. + ChannelBufferSize *int `json:"channel_buffer_size,omitempty"` +} + +type IndexingTarget struct { + Listener appdata.Listener + ModuleFilter ModuleFilterConfig } -// StartManager starts the indexer manager with the given options. The state machine should write all relevant app data to +// StartIndexing starts the indexer manager with the given options. The state machine should write all relevant app data to // the returned listener. -func StartManager(opts ManagerOptions) (appdata.Listener, error) { - panic("TODO: this will be implemented in a follow-up PR, this function is just a stub to demonstrate the API") +func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { + logger := opts.Logger + if logger == nil { + logger = logutil.NoopLogger{} + } + + logger.Info("Starting indexer manager") + + scopeableLogger, canScopeLogger := logger.(logutil.ScopeableLogger) + + cfg, err := unmarshalConfig(opts.Config) + if err != nil { + return IndexingTarget{}, err + } + + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } + + listeners := make([]appdata.Listener, 0, len(cfg.Target)) + + for targetName, targetCfg := range cfg.Target { + init, ok := indexerRegistry[targetCfg.Type] + if !ok { + return IndexingTarget{}, fmt.Errorf("indexer type %q not found", targetCfg.Type) + } + + logger.Info("Starting indexer", "target", targetName, "type", targetCfg.Type) + + if targetCfg.Filter != nil { + return IndexingTarget{}, fmt.Errorf("indexer filter options are not supported yet") + } + + childLogger := logger + if canScopeLogger { + childLogger = scopeableLogger.WithContext("indexer", targetName).(logutil.Logger) + } + + initRes, err := init(InitParams{ + Config: targetCfg, + Context: ctx, + Logger: childLogger, + }) + if err != nil { + return IndexingTarget{}, err + } + + listener := initRes.Listener + if initRes.View != nil { + // only do a sanity-check and catch-up sync if we have a view which tracks blocks committed + lastBlock, err := initRes.View.BlockNum() + if err != nil { + return IndexingTarget{}, fmt.Errorf("failed to get block number from view: %w", err) + } + listener = addSanityCheckAndSync(lastBlock, listener, opts, targetCfg.Filter.Modules) + } + listeners = append(listeners, listener) + } + + bufSize := 1024 + if cfg.ChannelBufferSize != nil { + bufSize = *cfg.ChannelBufferSize + } + asyncOpts := appdata.AsyncListenerOptions{ + Context: ctx, + DoneWaitGroup: opts.DoneWaitGroup, + BufferSize: bufSize, + } + + rootListener := appdata.AsyncListenerMux( + asyncOpts, + listeners..., + ) + + rootListener, err = decoding.Middleware(rootListener, opts.Resolver, decoding.MiddlewareOptions{}) + if err != nil { + return IndexingTarget{}, err + } + rootListener = appdata.AsyncListener(asyncOpts, rootListener) + + return IndexingTarget{ + Listener: rootListener, + }, nil +} + +func unmarshalConfig(cfg interface{}) (*IndexingConfig, error) { + var jsonBz []byte + var err error + + switch cfg := cfg.(type) { + case map[string]interface{}: + jsonBz, err = json.Marshal(cfg) + if err != nil { + return nil, err + } + case json.RawMessage: + jsonBz = cfg + default: + return nil, fmt.Errorf("can't convert %T to %T", cfg, IndexingConfig{}) + } + + var res IndexingConfig + err = json.Unmarshal(jsonBz, &res) + return &res, err } diff --git a/schema/indexer/sync.go b/schema/indexer/sync.go new file mode 100644 index 000000000000..e065fb0f34ba --- /dev/null +++ b/schema/indexer/sync.go @@ -0,0 +1,42 @@ +package indexer + +import ( + "fmt" + + "cosmossdk.io/schema/appdata" + "cosmossdk.io/schema/decoding" +) + +func addSanityCheckAndSync(lastBlockPersisted uint64, listener appdata.Listener, mgrOpts IndexingOptions, moduleFilter ModuleFilterConfig) appdata.Listener { + startBlock := listener.StartBlock + initialized := false + listener.StartBlock = func(data appdata.StartBlockData) error { + if !initialized { + if err := doSyncAndSanityCheck(lastBlockPersisted, data, listener, mgrOpts, moduleFilter); err != nil { + return err + } + initialized = true + } + if startBlock != nil { + return startBlock(data) + } + return nil + } + return listener +} + +func doSyncAndSanityCheck(lastBlockPersisted uint64, data appdata.StartBlockData, listener appdata.Listener, mgrOpts IndexingOptions, moduleFilter ModuleFilterConfig) error { + if lastBlockPersisted == 0 { + if data.Height == 1 { + // this is the first block anyway so nothing to sync + return nil + } + // need to do a catch-up sync + return decoding.Sync(listener, mgrOpts.SyncSource, mgrOpts.Resolver, decoding.SyncOptions{}) + } else if lastBlockPersisted+1 != data.Height { + // we are out of sync and there is existing data, so return an error + return fmt.Errorf("fatal error: indexer is out of sync, last block persisted: %d, current block height: %d", lastBlockPersisted, data.Height) + } + // all good + return nil +} diff --git a/schema/logutil/logger.go b/schema/logutil/logger.go index cb6b34ebfd2b..26e0088c37bc 100644 --- a/schema/logutil/logger.go +++ b/schema/logutil/logger.go @@ -21,6 +21,10 @@ type Logger interface { Debug(msg string, keyVals ...interface{}) } +type ScopeableLogger interface { + WithContext(keyVals ...interface{}) interface{} +} + // NoopLogger is a logger that doesn't do anything. type NoopLogger struct{} From 6e204487da2c8e314075747da9ea1903b559d114 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Mon, 9 Sep 2024 17:57:42 -0400 Subject: [PATCH 02/12] feat(schema/indexer)!: implement indexing initialization --- schema/appdata/filter.go | 42 --------------------------------------- schema/indexer/manager.go | 8 -------- schema/indexer/sync.go | 42 --------------------------------------- 3 files changed, 92 deletions(-) delete mode 100644 schema/appdata/filter.go delete mode 100644 schema/indexer/sync.go diff --git a/schema/appdata/filter.go b/schema/appdata/filter.go deleted file mode 100644 index 7c5f47c813c5..000000000000 --- a/schema/appdata/filter.go +++ /dev/null @@ -1,42 +0,0 @@ -package appdata - -// ModuleFilter returns an updated listener that filters state updates based on the module name. -func ModuleFilter(listener Listener, filter func(moduleName string) bool) Listener { - if initModData := listener.InitializeModuleData; initModData != nil { - listener.InitializeModuleData = func(data ModuleInitializationData) error { - if !filter(data.ModuleName) { - return nil - } - - return initModData(data) - } - } - - if onKVPair := listener.OnKVPair; onKVPair != nil { - listener.OnKVPair = func(data KVPairData) error { - for _, update := range data.Updates { - if !filter(update.ModuleName) { - continue - } - - if err := onKVPair(KVPairData{Updates: []ModuleKVPairUpdate{update}}); err != nil { - return err - } - } - - return nil - } - } - - if onObjectUpdate := listener.OnObjectUpdate; onObjectUpdate != nil { - listener.OnObjectUpdate = func(data ObjectUpdateData) error { - if !filter(data.ModuleName) { - return nil - } - - return onObjectUpdate(data) - } - } - - return listener -} diff --git a/schema/indexer/manager.go b/schema/indexer/manager.go index 005a7cf6ae04..12b4974b0e8c 100644 --- a/schema/indexer/manager.go +++ b/schema/indexer/manager.go @@ -111,14 +111,6 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { } listener := initRes.Listener - if initRes.View != nil { - // only do a sanity-check and catch-up sync if we have a view which tracks blocks committed - lastBlock, err := initRes.View.BlockNum() - if err != nil { - return IndexingTarget{}, fmt.Errorf("failed to get block number from view: %w", err) - } - listener = addSanityCheckAndSync(lastBlock, listener, opts, targetCfg.Filter.Modules) - } listeners = append(listeners, listener) } diff --git a/schema/indexer/sync.go b/schema/indexer/sync.go deleted file mode 100644 index e065fb0f34ba..000000000000 --- a/schema/indexer/sync.go +++ /dev/null @@ -1,42 +0,0 @@ -package indexer - -import ( - "fmt" - - "cosmossdk.io/schema/appdata" - "cosmossdk.io/schema/decoding" -) - -func addSanityCheckAndSync(lastBlockPersisted uint64, listener appdata.Listener, mgrOpts IndexingOptions, moduleFilter ModuleFilterConfig) appdata.Listener { - startBlock := listener.StartBlock - initialized := false - listener.StartBlock = func(data appdata.StartBlockData) error { - if !initialized { - if err := doSyncAndSanityCheck(lastBlockPersisted, data, listener, mgrOpts, moduleFilter); err != nil { - return err - } - initialized = true - } - if startBlock != nil { - return startBlock(data) - } - return nil - } - return listener -} - -func doSyncAndSanityCheck(lastBlockPersisted uint64, data appdata.StartBlockData, listener appdata.Listener, mgrOpts IndexingOptions, moduleFilter ModuleFilterConfig) error { - if lastBlockPersisted == 0 { - if data.Height == 1 { - // this is the first block anyway so nothing to sync - return nil - } - // need to do a catch-up sync - return decoding.Sync(listener, mgrOpts.SyncSource, mgrOpts.Resolver, decoding.SyncOptions{}) - } else if lastBlockPersisted+1 != data.Height { - // we are out of sync and there is existing data, so return an error - return fmt.Errorf("fatal error: indexer is out of sync, last block persisted: %d, current block height: %d", lastBlockPersisted, data.Height) - } - // all good - return nil -} From bbc9743f7e92c222524c009784e37d31b59ab32c Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Mon, 9 Sep 2024 17:58:50 -0400 Subject: [PATCH 03/12] refactor --- schema/indexer/{filter.go => config.go} | 18 ++++++++++++++++++ schema/indexer/indexer.go | 18 ------------------ schema/indexer/{manager.go => init.go} | 0 3 files changed, 18 insertions(+), 18 deletions(-) rename schema/indexer/{filter.go => config.go} (59%) rename schema/indexer/{manager.go => init.go} (100%) diff --git a/schema/indexer/filter.go b/schema/indexer/config.go similarity index 59% rename from schema/indexer/filter.go rename to schema/indexer/config.go index 60a27264458a..d6122b8c3a51 100644 --- a/schema/indexer/filter.go +++ b/schema/indexer/config.go @@ -1,5 +1,23 @@ package indexer +// Config species the configuration passed to an indexer initialization function. +// It includes both common configuration options related to include or excluding +// parts of the data stream as well as indexer specific options under the config +// subsection. +// +// NOTE: it is an error for an indexer to change its common options, such as adding +// or removing indexed modules, after the indexer has been initialized because this +// could result in an inconsistent state. +type Config struct { + // Type is the name of the indexer type as registered with Register. + Type string `json:"type"` + + // Config are the indexer specific config options specified by the user. + Config map[string]interface{} `json:"config,omitempty"` + + Filter *FilterConfig `json:"filter,omitempty"` +} + type FilterConfig struct { // ExcludeState specifies that the indexer will not receive state updates. ExcludeState bool `json:"exclude_state"` diff --git a/schema/indexer/indexer.go b/schema/indexer/indexer.go index 1a14919761d0..987ee4a0e795 100644 --- a/schema/indexer/indexer.go +++ b/schema/indexer/indexer.go @@ -9,24 +9,6 @@ import ( "cosmossdk.io/schema/view" ) -// Config species the configuration passed to an indexer initialization function. -// It includes both common configuration options related to include or excluding -// parts of the data stream as well as indexer specific options under the config -// subsection. -// -// NOTE: it is an error for an indexer to change its common options, such as adding -// or removing indexed modules, after the indexer has been initialized because this -// could result in an inconsistent state. -type Config struct { - // Type is the name of the indexer type as registered with Register. - Type string `json:"type"` - - // Config are the indexer specific config options specified by the user. - Config map[string]interface{} `json:"config,omitempty"` - - Filter *FilterConfig `json:"filter,omitempty"` -} - type InitFunc = func(InitParams) (InitResult, error) // InitParams is the input to the indexer initialization function. diff --git a/schema/indexer/manager.go b/schema/indexer/init.go similarity index 100% rename from schema/indexer/manager.go rename to schema/indexer/init.go From 7c563397b6eb841a1b5873fbc654810edc2326d0 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Mon, 9 Sep 2024 18:02:05 -0400 Subject: [PATCH 04/12] docs --- schema/indexer/config.go | 2 +- schema/indexer/init.go | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/schema/indexer/config.go b/schema/indexer/config.go index d6122b8c3a51..97d0d56ec387 100644 --- a/schema/indexer/config.go +++ b/schema/indexer/config.go @@ -33,7 +33,7 @@ type FilterConfig struct { // the header data. ExcludeBlockHeaders bool `json:"exclude_block_headers"` - Modules ModuleFilterConfig `json:"modules"` + Modules *ModuleFilterConfig `json:"modules"` } type ModuleFilterConfig struct { diff --git a/schema/indexer/init.go b/schema/indexer/init.go index 12b4974b0e8c..d7ea5c4bfea9 100644 --- a/schema/indexer/init.go +++ b/schema/indexer/init.go @@ -55,9 +55,16 @@ type IndexingConfig struct { ChannelBufferSize *int `json:"channel_buffer_size,omitempty"` } +// IndexingTarget returns the indexing target listener and associated data. +// The returned listener is the root listener to which app data should be sent. type IndexingTarget struct { - Listener appdata.Listener - ModuleFilter ModuleFilterConfig + // Listener is the root listener to which app data should be sent. + // It will do all processing in the background so updates should be sent synchronously. + Listener appdata.Listener + + // ModuleFilter returns the root module filter which an app can use to exclude modules at the storage level, + // if such a filter is set. + ModuleFilter *ModuleFilterConfig } // StartIndexing starts the indexer manager with the given options. The state machine should write all relevant app data to From 309eb616339bca239180b5db66261b3470dcd52e Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Tue, 10 Sep 2024 11:08:14 -0400 Subject: [PATCH 05/12] update postgres indexer to use indexer.Start --- indexer/postgres/indexer.go | 34 +++++------- indexer/postgres/tests/config.go | 26 --------- indexer/postgres/tests/init_schema_test.go | 23 ++++---- indexer/postgres/tests/postgres_test.go | 28 ++++++---- schema/indexer/config.go | 2 +- schema/indexer/indexer.go | 9 ++++ schema/indexer/registry.go | 10 ++-- schema/indexer/{init.go => start.go} | 61 +++++++++++++++++++--- 8 files changed, 113 insertions(+), 80 deletions(-) delete mode 100644 indexer/postgres/tests/config.go rename schema/indexer/{init.go => start.go} (75%) diff --git a/indexer/postgres/indexer.go b/indexer/postgres/indexer.go index bfaac25842e5..007d33d19043 100644 --- a/indexer/postgres/indexer.go +++ b/indexer/postgres/indexer.go @@ -3,8 +3,8 @@ package postgres import ( "context" "database/sql" - "encoding/json" "errors" + "fmt" "cosmossdk.io/schema/indexer" "cosmossdk.io/schema/logutil" @@ -21,8 +21,6 @@ type Config struct { DisableRetainDeletions bool `json:"disable_retain_deletions"` } -type SqlLogger = func(msg, sql string, params ...interface{}) - type indexerImpl struct { ctx context.Context db *sql.DB @@ -32,10 +30,17 @@ type indexerImpl struct { logger logutil.Logger } -func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) { - config, err := decodeConfig(params.Config.Config) - if err != nil { - return indexer.InitResult{}, err +func init() { + indexer.Register("postgres", indexer.InitDescriptor{ + InitFunc: startIndexer, + ConfigType: Config{}, + }) +} + +func startIndexer(params indexer.InitParams) (indexer.InitResult, error) { + config, ok := params.Config.Config.(Config) + if !ok { + return indexer.InitResult{}, fmt.Errorf("invalid config type, expected %T got %T", Config{}, params.Config.Config) } ctx := params.Context @@ -89,18 +94,3 @@ func StartIndexer(params indexer.InitParams) (indexer.InitResult, error) { View: idx, }, nil } - -func decodeConfig(rawConfig map[string]interface{}) (*Config, error) { - bz, err := json.Marshal(rawConfig) - if err != nil { - return nil, err - } - - var config Config - err = json.Unmarshal(bz, &config) - if err != nil { - return nil, err - } - - return &config, nil -} diff --git a/indexer/postgres/tests/config.go b/indexer/postgres/tests/config.go deleted file mode 100644 index 78e41f6059b5..000000000000 --- a/indexer/postgres/tests/config.go +++ /dev/null @@ -1,26 +0,0 @@ -package tests - -import ( - "encoding/json" - - "cosmossdk.io/indexer/postgres" - "cosmossdk.io/schema/indexer" -) - -func postgresConfigToIndexerConfig(cfg postgres.Config) (indexer.Config, error) { - cfgBz, err := json.Marshal(cfg) - if err != nil { - return indexer.Config{}, err - } - - var cfgMap map[string]interface{} - err = json.Unmarshal(cfgBz, &cfgMap) - if err != nil { - return indexer.Config{}, err - } - - return indexer.Config{ - Type: "postgres", - Config: cfgMap, - }, nil -} diff --git a/indexer/postgres/tests/init_schema_test.go b/indexer/postgres/tests/init_schema_test.go index 4257f37d5ab7..e3fef06fc265 100644 --- a/indexer/postgres/tests/init_schema_test.go +++ b/indexer/postgres/tests/init_schema_test.go @@ -33,17 +33,20 @@ func testInitSchema(t *testing.T, disableRetainDeletions bool, goldenFileName st connectionUrl := createTestDB(t) buf := &strings.Builder{} - - cfg, err := postgresConfigToIndexerConfig(postgres.Config{ - DatabaseURL: connectionUrl, - DisableRetainDeletions: disableRetainDeletions, - }) - require.NoError(t, err) - - res, err := postgres.StartIndexer(indexer.InitParams{ - Config: cfg, + res, err := indexer.StartIndexing(indexer.IndexingOptions{ + Config: indexer.IndexingConfig{ + Target: map[string]indexer.Config{ + "postgres": { + Type: "postgres", + Config: postgres.Config{ + DatabaseURL: connectionUrl, + DisableRetainDeletions: disableRetainDeletions, + }, + }, + }, + }, Context: context.Background(), - Logger: &prettyLogger{buf}, + Logger: prettyLogger{buf}, }) require.NoError(t, err) listener := res.Listener diff --git a/indexer/postgres/tests/postgres_test.go b/indexer/postgres/tests/postgres_test.go index fc725f9cc1cf..919c4dd61c59 100644 --- a/indexer/postgres/tests/postgres_test.go +++ b/indexer/postgres/tests/postgres_test.go @@ -50,24 +50,29 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) { require.NoError(t, err) }) - cfg, err := postgresConfigToIndexerConfig(postgres.Config{ - DatabaseURL: dbUrl, - DisableRetainDeletions: !retainDeletions, - }) - require.NoError(t, err) - debugLog := &strings.Builder{} - pgIndexer, err := postgres.StartIndexer(indexer.InitParams{ - Config: cfg, + res, err := indexer.StartIndexing(indexer.IndexingOptions{ + Config: indexer.IndexingConfig{ + Target: map[string]indexer.Config{ + "postgres": { + Type: "postgres", + Config: postgres.Config{ + DatabaseURL: dbUrl, + DisableRetainDeletions: !retainDeletions, + }, + }, + }, + }, Context: ctx, Logger: &prettyLogger{debugLog}, AddressCodec: addressutil.HexAddressCodec{}, }) require.NoError(t, err) + require.NoError(t, err) sim, err := appdatasim.NewSimulator(appdatasim.Options{ - Listener: pgIndexer.Listener, + Listener: res.Listener, AppSchema: indexertesting.ExampleAppSchema, StateSimOptions: statesim.Options{ CanRetainDeletions: retainDeletions, @@ -75,6 +80,9 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) { }) require.NoError(t, err) + pgIndexerView := res.IndexerInfos["postgres"].View + require.NotNil(t, pgIndexerView) + blockDataGen := sim.BlockDataGenN(10, 100) numBlocks := 200 if testing.Short() { @@ -91,7 +99,7 @@ func testPostgresIndexer(t *testing.T, retainDeletions bool) { require.NoError(t, sim.ProcessBlockData(blockData), debugLog.String()) // compare the expected state in the simulator to the actual state in the indexer and expect the diff to be empty - require.Empty(t, appdatasim.DiffAppData(sim, pgIndexer.View), debugLog.String()) + require.Empty(t, appdatasim.DiffAppData(sim, pgIndexerView), debugLog.String()) // reset the debug log after each successful block so that it doesn't get too long when debugging debugLog.Reset() diff --git a/schema/indexer/config.go b/schema/indexer/config.go index 97d0d56ec387..14c8d603b51d 100644 --- a/schema/indexer/config.go +++ b/schema/indexer/config.go @@ -13,7 +13,7 @@ type Config struct { Type string `json:"type"` // Config are the indexer specific config options specified by the user. - Config map[string]interface{} `json:"config,omitempty"` + Config interface{} `json:"config,omitempty"` Filter *FilterConfig `json:"filter,omitempty"` } diff --git a/schema/indexer/indexer.go b/schema/indexer/indexer.go index 987ee4a0e795..2e3636cbab9a 100644 --- a/schema/indexer/indexer.go +++ b/schema/indexer/indexer.go @@ -9,6 +9,15 @@ import ( "cosmossdk.io/schema/view" ) +// InitDescriptor describes an indexer initialization function and other metadata. +type InitDescriptor struct { + // InitFunc is the function that initializes the indexer. + InitFunc InitFunc + + // ConfigType is the type of the configuration object that the indexer expects. + ConfigType interface{} +} + type InitFunc = func(InitParams) (InitResult, error) // InitParams is the input to the indexer initialization function. diff --git a/schema/indexer/registry.go b/schema/indexer/registry.go index 445f56876add..d4fa8b6a9750 100644 --- a/schema/indexer/registry.go +++ b/schema/indexer/registry.go @@ -3,12 +3,16 @@ package indexer import "fmt" // Register registers an indexer type with the given initialization function. -func Register(indexerType string, initFunc InitFunc) { +func Register(indexerType string, descriptor InitDescriptor) { if _, ok := indexerRegistry[indexerType]; ok { panic(fmt.Sprintf("indexer %s already registered", indexerType)) } - indexerRegistry[indexerType] = initFunc + if descriptor.InitFunc == nil { + panic(fmt.Sprintf("indexer %s has no initialization function", indexerType)) + } + + indexerRegistry[indexerType] = descriptor } -var indexerRegistry = map[string]InitFunc{} +var indexerRegistry = map[string]InitDescriptor{} diff --git a/schema/indexer/init.go b/schema/indexer/start.go similarity index 75% rename from schema/indexer/init.go rename to schema/indexer/start.go index d7ea5c4bfea9..d514a2ee6525 100644 --- a/schema/indexer/init.go +++ b/schema/indexer/start.go @@ -4,18 +4,21 @@ import ( "context" "encoding/json" "fmt" + "reflect" "sync" "cosmossdk.io/schema/addressutil" "cosmossdk.io/schema/appdata" "cosmossdk.io/schema/decoding" "cosmossdk.io/schema/logutil" + "cosmossdk.io/schema/view" ) // IndexingOptions are the options for starting the indexer manager. type IndexingOptions struct { // Config is the user configuration for all indexing. It should generally be an instance map[string]interface{} - // or json.RawMessage and match the json structure of IndexingConfig. The manager will attempt to convert it to IndexingConfig. + // or json.RawMessage and match the json structure of IndexingConfig, or it can be an instance of IndexingConfig. + // The manager will attempt to convert it to IndexingConfig. Config interface{} // Resolver is the decoder resolver that will be used to decode the data. It is required. @@ -65,6 +68,14 @@ type IndexingTarget struct { // ModuleFilter returns the root module filter which an app can use to exclude modules at the storage level, // if such a filter is set. ModuleFilter *ModuleFilterConfig + + IndexerInfos map[string]IndexerInfo +} + +// IndexerInfo contains data returned by a specific indexer after initialization that maybe useful for the app. +type IndexerInfo struct { + // View is the view returned by the indexer in its InitResult. It is optional and may be nil. + View view.AppData } // StartIndexing starts the indexer manager with the given options. The state machine should write all relevant app data to @@ -79,7 +90,7 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { scopeableLogger, canScopeLogger := logger.(logutil.ScopeableLogger) - cfg, err := unmarshalConfig(opts.Config) + cfg, err := unmarshalIndexingConfig(opts.Config) if err != nil { return IndexingTarget{}, err } @@ -90,6 +101,7 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { } listeners := make([]appdata.Listener, 0, len(cfg.Target)) + indexerInfos := make(map[string]IndexerInfo, len(cfg.Target)) for targetName, targetCfg := range cfg.Target { init, ok := indexerRegistry[targetCfg.Type] @@ -108,10 +120,16 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { childLogger = scopeableLogger.WithContext("indexer", targetName).(logutil.Logger) } - initRes, err := init(InitParams{ - Config: targetCfg, - Context: ctx, - Logger: childLogger, + targetCfg.Config, err = unmarshalIndexerCustomConfig(targetCfg.Config, init.ConfigType) + if err != nil { + return IndexingTarget{}, fmt.Errorf("failed to unmarshal indexer config for target %q: %w", targetName, err) + } + + initRes, err := init.InitFunc(InitParams{ + Config: targetCfg, + Context: ctx, + Logger: childLogger, + AddressCodec: opts.AddressCodec, }) if err != nil { return IndexingTarget{}, err @@ -119,6 +137,10 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { listener := initRes.Listener listeners = append(listeners, listener) + + indexerInfos[targetName] = IndexerInfo{ + View: initRes.View, + } } bufSize := 1024 @@ -143,11 +165,19 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { rootListener = appdata.AsyncListener(asyncOpts, rootListener) return IndexingTarget{ - Listener: rootListener, + Listener: rootListener, + IndexerInfos: indexerInfos, }, nil } -func unmarshalConfig(cfg interface{}) (*IndexingConfig, error) { +func unmarshalIndexingConfig(cfg interface{}) (*IndexingConfig, error) { + if x, ok := cfg.(*IndexingConfig); ok { + return x, nil + } + if x, ok := cfg.(IndexingConfig); ok { + return &x, nil + } + var jsonBz []byte var err error @@ -167,3 +197,18 @@ func unmarshalConfig(cfg interface{}) (*IndexingConfig, error) { err = json.Unmarshal(jsonBz, &res) return &res, err } + +func unmarshalIndexerCustomConfig(cfg interface{}, expectedType interface{}) (interface{}, error) { + typ := reflect.TypeOf(expectedType) + if reflect.TypeOf(cfg).AssignableTo(typ) { + return cfg, nil + } + + res := reflect.New(typ).Interface() + bz, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + err = json.Unmarshal(bz, res) + return reflect.ValueOf(res).Elem(), err +} From 2d854e8a902b69476a0a8af901f26594479ddee4 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Tue, 10 Sep 2024 11:12:53 -0400 Subject: [PATCH 06/12] docs --- schema/indexer/config.go | 3 +++ schema/logutil/logger.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/schema/indexer/config.go b/schema/indexer/config.go index 14c8d603b51d..dcc6b9211792 100644 --- a/schema/indexer/config.go +++ b/schema/indexer/config.go @@ -15,9 +15,11 @@ type Config struct { // Config are the indexer specific config options specified by the user. Config interface{} `json:"config,omitempty"` + // Filter is the filter configuration for the indexer. Filter *FilterConfig `json:"filter,omitempty"` } +// FilterConfig specifies the configuration for filtering the data stream type FilterConfig struct { // ExcludeState specifies that the indexer will not receive state updates. ExcludeState bool `json:"exclude_state"` @@ -36,6 +38,7 @@ type FilterConfig struct { Modules *ModuleFilterConfig `json:"modules"` } +// ModuleFilterConfig specifies the configuration for filtering modules. type ModuleFilterConfig struct { // Include specifies a list of modules whose state the indexer will // receive state updates for. diff --git a/schema/logutil/logger.go b/schema/logutil/logger.go index 26e0088c37bc..a93b91567df2 100644 --- a/schema/logutil/logger.go +++ b/schema/logutil/logger.go @@ -21,7 +21,10 @@ type Logger interface { Debug(msg string, keyVals ...interface{}) } +// ScopeableLogger is a logger that can be scoped with key/value pairs. +// It is implemented by all the loggers in cosmossdk.io/log. type ScopeableLogger interface { + // WithContext returns a new logger with the provided key/value pairs set. WithContext(keyVals ...interface{}) interface{} } From dc7f0a867ddc4588e429616f2fb802847941fe3f Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 12 Sep 2024 13:51:06 -0400 Subject: [PATCH 07/12] WIP on testing --- indexer/postgres/indexer.go | 2 +- schema/indexer/indexer.go | 4 +- schema/indexer/registry.go | 4 +- schema/indexer/registry_test.go | 16 +++++--- schema/indexer/start.go | 4 +- schema/indexer/start_test.go | 69 +++++++++++++++++++++++++++++++++ 6 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 schema/indexer/start_test.go diff --git a/indexer/postgres/indexer.go b/indexer/postgres/indexer.go index 007d33d19043..a4a35d730b67 100644 --- a/indexer/postgres/indexer.go +++ b/indexer/postgres/indexer.go @@ -31,7 +31,7 @@ type indexerImpl struct { } func init() { - indexer.Register("postgres", indexer.InitDescriptor{ + indexer.Register("postgres", indexer.Initializer{ InitFunc: startIndexer, ConfigType: Config{}, }) diff --git a/schema/indexer/indexer.go b/schema/indexer/indexer.go index 2e3636cbab9a..6954dee082c7 100644 --- a/schema/indexer/indexer.go +++ b/schema/indexer/indexer.go @@ -9,8 +9,8 @@ import ( "cosmossdk.io/schema/view" ) -// InitDescriptor describes an indexer initialization function and other metadata. -type InitDescriptor struct { +// Initializer describes an indexer initialization function and other metadata. +type Initializer struct { // InitFunc is the function that initializes the indexer. InitFunc InitFunc diff --git a/schema/indexer/registry.go b/schema/indexer/registry.go index d4fa8b6a9750..0345ed6ad7ef 100644 --- a/schema/indexer/registry.go +++ b/schema/indexer/registry.go @@ -3,7 +3,7 @@ package indexer import "fmt" // Register registers an indexer type with the given initialization function. -func Register(indexerType string, descriptor InitDescriptor) { +func Register(indexerType string, descriptor Initializer) { if _, ok := indexerRegistry[indexerType]; ok { panic(fmt.Sprintf("indexer %s already registered", indexerType)) } @@ -15,4 +15,4 @@ func Register(indexerType string, descriptor InitDescriptor) { indexerRegistry[indexerType] = descriptor } -var indexerRegistry = map[string]InitDescriptor{} +var indexerRegistry = map[string]Initializer{} diff --git a/schema/indexer/registry_test.go b/schema/indexer/registry_test.go index b9f46910c8fd..0cd26b9629d5 100644 --- a/schema/indexer/registry_test.go +++ b/schema/indexer/registry_test.go @@ -3,15 +3,17 @@ package indexer import "testing" func TestRegister(t *testing.T) { - Register("test", func(params InitParams) (InitResult, error) { - return InitResult{}, nil + Register("test", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + return InitResult{}, nil + }, }) - if indexerRegistry["test"] == nil { + if _, ok := indexerRegistry["test"]; !ok { t.Fatalf("expected to find indexer") } - if indexerRegistry["test2"] != nil { + if _, ok := indexerRegistry["test2"]; ok { t.Fatalf("expected not to find indexer") } @@ -20,7 +22,9 @@ func TestRegister(t *testing.T) { t.Fatalf("expected to panic") } }() - Register("test", func(params InitParams) (InitResult, error) { - return InitResult{}, nil + Register("test", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + return InitResult{}, nil + }, }) } diff --git a/schema/indexer/start.go b/schema/indexer/start.go index d514a2ee6525..f23698f99834 100644 --- a/schema/indexer/start.go +++ b/schema/indexer/start.go @@ -88,8 +88,6 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { logger.Info("Starting indexer manager") - scopeableLogger, canScopeLogger := logger.(logutil.ScopeableLogger) - cfg, err := unmarshalIndexingConfig(opts.Config) if err != nil { return IndexingTarget{}, err @@ -116,7 +114,7 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { } childLogger := logger - if canScopeLogger { + if scopeableLogger, ok := logger.(logutil.ScopeableLogger); ok { childLogger = scopeableLogger.WithContext("indexer", targetName).(logutil.Logger) } diff --git a/schema/indexer/start_test.go b/schema/indexer/start_test.go new file mode 100644 index 000000000000..d0615a912a50 --- /dev/null +++ b/schema/indexer/start_test.go @@ -0,0 +1,69 @@ +package indexer + +import ( + "encoding/json" + "reflect" + "testing" +) + +func TestUnmarshalIndexingConfig(t *testing.T) { + cfg := &IndexingConfig{Target: map[string]Config{"target": {Type: "type"}}} + jsonBz, err := json.Marshal(cfg) + if err != nil { + t.Fatal(err) + } + + t.Run("json", func(t *testing.T) { + res, err := unmarshalIndexingConfig(json.RawMessage(jsonBz)) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res, cfg) { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) + + t.Run("map", func(t *testing.T) { + var m map[string]interface{} + err := json.Unmarshal(jsonBz, &m) + if err != nil { + t.Fatal(err) + } + + res, err := unmarshalIndexingConfig(m) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res, cfg) { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) + + t.Run("ptr", func(t *testing.T) { + res, err := unmarshalIndexingConfig(cfg) + if err != nil { + t.Fatal(err) + } + if res != cfg { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) + + t.Run("struct", func(t *testing.T) { + res, err := unmarshalIndexingConfig(*cfg) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res, cfg) { + t.Fatalf("expected %v, got %v", cfg, res) + } + }) +} + +func TestUnmarshalIndexerConfig(t *testing.T) { + +} + +type testConfig struct { + SomeParam string `json:"some_param"` +} From de3c622c0cdfa6ab7f90dcc009b0005188167a5c Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 12 Sep 2024 14:04:54 -0400 Subject: [PATCH 08/12] WIP on testing --- schema/indexer/start.go | 2 +- schema/indexer/start_test.go | 60 ++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/schema/indexer/start.go b/schema/indexer/start.go index f23698f99834..3da454833208 100644 --- a/schema/indexer/start.go +++ b/schema/indexer/start.go @@ -208,5 +208,5 @@ func unmarshalIndexerCustomConfig(cfg interface{}, expectedType interface{}) (in return nil, err } err = json.Unmarshal(bz, res) - return reflect.ValueOf(res).Elem(), err + return reflect.ValueOf(res).Elem().Interface(), err } diff --git a/schema/indexer/start_test.go b/schema/indexer/start_test.go index d0615a912a50..becd4a22ea95 100644 --- a/schema/indexer/start_test.go +++ b/schema/indexer/start_test.go @@ -61,7 +61,67 @@ func TestUnmarshalIndexingConfig(t *testing.T) { } func TestUnmarshalIndexerConfig(t *testing.T) { + t.Run("struct", func(t *testing.T) { + cfg := testConfig{SomeParam: "foobar"} + cfg2, err := unmarshalIndexerCustomConfig(cfg, testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) + + t.Run("ptr", func(t *testing.T) { + cfg := &testConfig{SomeParam: "foobar"} + cfg2, err := unmarshalIndexerCustomConfig(cfg, &testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) + + t.Run("map -> struct", func(t *testing.T) { + cfg := testConfig{SomeParam: "foobar"} + jzonBz, err := json.Marshal(cfg) + if err != nil { + t.Fatal(err) + } + var m map[string]interface{} + err = json.Unmarshal(jzonBz, &m) + if err != nil { + t.Fatal(err) + } + cfg2, err := unmarshalIndexerCustomConfig(m, testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) + t.Run("map -> ptr", func(t *testing.T) { + cfg := &testConfig{SomeParam: "foobar"} + jzonBz, err := json.Marshal(cfg) + if err != nil { + t.Fatal(err) + } + var m map[string]interface{} + err = json.Unmarshal(jzonBz, &m) + if err != nil { + t.Fatal(err) + } + cfg2, err := unmarshalIndexerCustomConfig(m, &testConfig{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cfg, cfg2) { + t.Fatalf("expected %v, got %v", cfg, cfg2) + } + }) } type testConfig struct { From 5459a2652889486c84f3f3056b3e4f949d659612 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 12 Sep 2024 14:18:24 -0400 Subject: [PATCH 09/12] more tests --- schema/indexer/start_test.go | 90 ++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/schema/indexer/start_test.go b/schema/indexer/start_test.go index becd4a22ea95..e7efc9f62322 100644 --- a/schema/indexer/start_test.go +++ b/schema/indexer/start_test.go @@ -1,11 +1,97 @@ package indexer import ( + "context" "encoding/json" "reflect" + "sync" "testing" + + "cosmossdk.io/schema/appdata" ) +func TestStart(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + var test1CommitCalled, test2CommitCalled int + Register("test1", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + if params.Config.Config.(testConfig).SomeParam != "foobar" { + t.Fatalf("expected %q, got %q", "foobar", params.Config.Config.(testConfig).SomeParam) + } + return InitResult{ + Listener: appdata.Listener{ + Commit: func(data appdata.CommitData) (completionCallback func() error, err error) { + test1CommitCalled++ + return nil, nil + }, + }, + }, nil + }, + ConfigType: testConfig{}, + }) + Register("test2", Initializer{ + InitFunc: func(params InitParams) (InitResult, error) { + if params.Config.Config.(testConfig2).Foo != "bar" { + t.Fatalf("expected %q, got %q", "bar", params.Config.Config.(testConfig2).Foo) + } + return InitResult{ + Listener: appdata.Listener{ + Commit: func(data appdata.CommitData) (completionCallback func() error, err error) { + test2CommitCalled++ + return nil, nil + }, + }, + }, nil + }, + ConfigType: testConfig2{}, + }) + + var wg sync.WaitGroup + target, err := StartIndexing(IndexingOptions{ + Config: IndexingConfig{Target: map[string]Config{ + "t1": {Type: "test1", Config: testConfig{SomeParam: "foobar"}}, + "t2": {Type: "test2", Config: testConfig2{Foo: "bar"}}, + }}, + Resolver: nil, + SyncSource: nil, + Logger: nil, + Context: ctx, + AddressCodec: nil, + DoneWaitGroup: &wg, + }) + if err != nil { + t.Fatal(err) + } + + const COMMIT_COUNT = 10 + for i := 0; i < COMMIT_COUNT; i++ { + callCommit(t, target.Listener) + } + + cancelFn() + wg.Wait() + + if test1CommitCalled != COMMIT_COUNT { + t.Fatalf("expected %d, got %d", COMMIT_COUNT, test1CommitCalled) + } + if test2CommitCalled != COMMIT_COUNT { + t.Fatalf("expected %d, got %d", COMMIT_COUNT, test2CommitCalled) + } +} + +func callCommit(t *testing.T, listener appdata.Listener) { + cb, err := listener.Commit(appdata.CommitData{}) + if err != nil { + t.Fatal(err) + } + if cb != nil { + err = cb() + if err != nil { + t.Fatal(err) + } + } +} + func TestUnmarshalIndexingConfig(t *testing.T) { cfg := &IndexingConfig{Target: map[string]Config{"target": {Type: "type"}}} jsonBz, err := json.Marshal(cfg) @@ -127,3 +213,7 @@ func TestUnmarshalIndexerConfig(t *testing.T) { type testConfig struct { SomeParam string `json:"some_param"` } + +type testConfig2 struct { + Foo string `json:"foo"` +} From c967f789c051586488de27e18a1d8f6a0ee8c584 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Tue, 17 Sep 2024 10:41:06 -0400 Subject: [PATCH 10/12] update struct tags --- schema/indexer/config.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/schema/indexer/config.go b/schema/indexer/config.go index dcc6b9211792..2b2ea13485bd 100644 --- a/schema/indexer/config.go +++ b/schema/indexer/config.go @@ -10,32 +10,32 @@ package indexer // could result in an inconsistent state. type Config struct { // Type is the name of the indexer type as registered with Register. - Type string `json:"type"` + Type string `mapstructure:"type" toml:"type" json:"type" comment:"The name of the registered indexer type."` // Config are the indexer specific config options specified by the user. - Config interface{} `json:"config,omitempty"` + Config interface{} `mapstructure:"config" toml:"config" json:"config,omitempty" comment:"Indexer specific configuration options."` // Filter is the filter configuration for the indexer. - Filter *FilterConfig `json:"filter,omitempty"` + Filter *FilterConfig `mapstructure:"filter" toml:"filter" json:"filter,omitempty" comment:"Filter configuration for the indexer. Currently UNSUPPORTED!"` } // FilterConfig specifies the configuration for filtering the data stream type FilterConfig struct { // ExcludeState specifies that the indexer will not receive state updates. - ExcludeState bool `json:"exclude_state"` + ExcludeState bool `mapstructure:"exclude_state" toml:"exclude_state" json:"exclude_state" comment:"Exclude all state updates."` // ExcludeEvents specifies that the indexer will not receive events. - ExcludeEvents bool `json:"exclude_events"` + ExcludeEvents bool `mapstructure:"exclude_events" toml:"exclude_events" json:"exclude_events" comment:"Exclude all events."` // ExcludeTxs specifies that the indexer will not receive transaction's. - ExcludeTxs bool `json:"exclude_txs"` + ExcludeTxs bool `mapstructure:"exclude_txs" toml:"exclude_txs" json:"exclude_txs" comment:"Exclude all transactions."` // ExcludeBlockHeaders specifies that the indexer will not receive block headers, // although it will still receive StartBlock and Commit callbacks, just without // the header data. - ExcludeBlockHeaders bool `json:"exclude_block_headers"` + ExcludeBlockHeaders bool `mapstructure:"exclude_block_headers" toml:"exclude_block_headers" json:"exclude_block_headers" comment:"Exclude all block headers."` - Modules *ModuleFilterConfig `json:"modules"` + Modules *ModuleFilterConfig `mapstructure:"modules" toml:"modules" json:"modules,omitempty" comment:"Module filter configuration."` } // ModuleFilterConfig specifies the configuration for filtering modules. @@ -43,10 +43,10 @@ type ModuleFilterConfig struct { // Include specifies a list of modules whose state the indexer will // receive state updates for. // Only one of include or exclude modules should be specified. - Include []string `json:"include"` + Include []string `mapstructure:"include" toml:"include" json:"include" comment:"List of modules to include. Only one of include or exclude should be specified."` // Exclude specifies a list of modules whose state the indexer will not // receive state updates for. // Only one of include or exclude modules should be specified. - Exclude []string `json:"exclude"` + Exclude []string `mapstructure:"exclude" toml:"exclude" json:"exclude" comment:"List of modules to exclude. Only one of include or exclude should be specified."` } From 0bb6485727014ceb90803064e8123606391073c0 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Mon, 23 Sep 2024 11:30:59 -0400 Subject: [PATCH 11/12] fix go 1.12 build --- schema/indexer/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema/indexer/start.go b/schema/indexer/start.go index 3da454833208..9228a04ddf47 100644 --- a/schema/indexer/start.go +++ b/schema/indexer/start.go @@ -120,7 +120,7 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { targetCfg.Config, err = unmarshalIndexerCustomConfig(targetCfg.Config, init.ConfigType) if err != nil { - return IndexingTarget{}, fmt.Errorf("failed to unmarshal indexer config for target %q: %w", targetName, err) + return IndexingTarget{}, fmt.Errorf("failed to unmarshal indexer config for target %q: %v", targetName, err) } initRes, err := init.InitFunc(InitParams{ From 544003a5ff30a602225c4614f782a003bfa8b8a0 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Mon, 23 Sep 2024 11:59:14 -0400 Subject: [PATCH 12/12] update golden tests --- indexer/postgres/tests/testdata/init_schema.txt | 4 ++++ .../postgres/tests/testdata/init_schema_no_retain_delete.txt | 4 ++++ schema/indexer/start.go | 4 ++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/indexer/postgres/tests/testdata/init_schema.txt b/indexer/postgres/tests/testdata/init_schema.txt index 4d18d2fb23b1..43b91a6a7a83 100644 --- a/indexer/postgres/tests/testdata/init_schema.txt +++ b/indexer/postgres/tests/testdata/init_schema.txt @@ -1,3 +1,7 @@ +INFO: Starting indexing +INFO: Starting indexer + target_name: postgres + type: postgres DEBUG: Creating enum type sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c'); DEBUG: Creating enum type diff --git a/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt b/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt index 0ec17ae1ea1d..71dfd4d08290 100644 --- a/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt +++ b/indexer/postgres/tests/testdata/init_schema_no_retain_delete.txt @@ -1,3 +1,7 @@ +INFO: Starting indexing +INFO: Starting indexer + target_name: postgres + type: postgres DEBUG: Creating enum type sql: CREATE TYPE "test_my_enum" AS ENUM ('a', 'b', 'c'); DEBUG: Creating enum type diff --git a/schema/indexer/start.go b/schema/indexer/start.go index 9228a04ddf47..48ab3f7fc7e0 100644 --- a/schema/indexer/start.go +++ b/schema/indexer/start.go @@ -86,7 +86,7 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { logger = logutil.NoopLogger{} } - logger.Info("Starting indexer manager") + logger.Info("Starting indexing") cfg, err := unmarshalIndexingConfig(opts.Config) if err != nil { @@ -107,7 +107,7 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { return IndexingTarget{}, fmt.Errorf("indexer type %q not found", targetCfg.Type) } - logger.Info("Starting indexer", "target", targetName, "type", targetCfg.Type) + logger.Info("Starting indexer", "target_name", targetName, "type", targetCfg.Type) if targetCfg.Filter != nil { return IndexingTarget{}, fmt.Errorf("indexer filter options are not supported yet")