Some checks are pending
Build SimApp / build (amd64) (push) Waiting to run
Build SimApp / build (arm64) (push) Waiting to run
CodeQL / Analyze (push) Waiting to run
Build & Push / build (push) Waiting to run
Run Gosec / Gosec (push) Waiting to run
Lint / golangci-lint (push) Waiting to run
Checks dependencies and mocks generation / Check go mod tidy (push) Waiting to run
Checks dependencies and mocks generation / Check up to date mocks (push) Waiting to run
System Tests / setup (push) Waiting to run
System Tests / test-system (push) Blocked by required conditions
System Tests / test-system-legacy (push) Blocked by required conditions
Tests / Code Coverage / split-test-files (push) Waiting to run
Tests / Code Coverage / tests (00) (push) Blocked by required conditions
Tests / Code Coverage / tests (01) (push) Blocked by required conditions
Tests / Code Coverage / tests (02) (push) Blocked by required conditions
Tests / Code Coverage / tests (03) (push) Blocked by required conditions
Tests / Code Coverage / test-integration (push) Waiting to run
Tests / Code Coverage / test-e2e (push) Waiting to run
Tests / Code Coverage / repo-analysis (push) Blocked by required conditions
Tests / Code Coverage / test-sim-nondeterminism (push) Waiting to run
Tests / Code Coverage / test-clientv2 (push) Waiting to run
Tests / Code Coverage / test-core (push) Waiting to run
Tests / Code Coverage / test-depinject (push) Waiting to run
Tests / Code Coverage / test-errors (push) Waiting to run
Tests / Code Coverage / test-math (push) Waiting to run
Tests / Code Coverage / test-schema (push) Waiting to run
Tests / Code Coverage / test-collections (push) Waiting to run
Tests / Code Coverage / test-cosmovisor (push) Waiting to run
Tests / Code Coverage / test-confix (push) Waiting to run
Tests / Code Coverage / test-store (push) Waiting to run
Tests / Code Coverage / test-log (push) Waiting to run
Tests / Code Coverage / test-x-tx (push) Waiting to run
Tests / Code Coverage / test-x-nft (push) Waiting to run
Tests / Code Coverage / test-x-circuit (push) Waiting to run
Tests / Code Coverage / test-x-feegrant (push) Waiting to run
Tests / Code Coverage / test-x-evidence (push) Waiting to run
Tests / Code Coverage / test-x-upgrade (push) Waiting to run
Tests / Code Coverage / test-tools-benchmark (push) Waiting to run
108 lines
3.4 KiB
Go
108 lines
3.4 KiB
Go
package baseapp
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/spf13/cast"
|
|
|
|
"cosmossdk.io/store/streaming"
|
|
storetypes "cosmossdk.io/store/types"
|
|
|
|
"git.cw.tr/mukan-network/mukan-sdk/client/flags"
|
|
servertypes "git.cw.tr/mukan-network/mukan-sdk/server/types"
|
|
)
|
|
|
|
const (
|
|
StreamingTomlKey = "streaming"
|
|
StreamingABCITomlKey = "abci"
|
|
StreamingABCIPluginTomlKey = "plugin"
|
|
StreamingABCIKeysTomlKey = "keys"
|
|
StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err"
|
|
)
|
|
|
|
// RegisterStreamingServices registers streaming services with the BaseApp.
|
|
func (app *BaseApp) RegisterStreamingServices(appOpts servertypes.AppOptions, keys map[string]*storetypes.KVStoreKey) error {
|
|
// register streaming services
|
|
streamingCfg := cast.ToStringMap(appOpts.Get(StreamingTomlKey))
|
|
for service := range streamingCfg {
|
|
pluginKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, service, StreamingABCIPluginTomlKey)
|
|
pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey)))
|
|
if len(pluginName) > 0 {
|
|
logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel))
|
|
plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load streaming plugin: %w", err)
|
|
}
|
|
if err := app.registerStreamingPlugin(appOpts, keys, plugin); err != nil {
|
|
return fmt.Errorf("failed to register streaming plugin %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// registerStreamingPlugin registers streaming plugins with the BaseApp.
|
|
func (app *BaseApp) registerStreamingPlugin(
|
|
appOpts servertypes.AppOptions,
|
|
keys map[string]*storetypes.KVStoreKey,
|
|
streamingPlugin any,
|
|
) error {
|
|
v, ok := streamingPlugin.(storetypes.ABCIListener)
|
|
if !ok {
|
|
return fmt.Errorf("unexpected plugin type %T", v)
|
|
}
|
|
|
|
app.registerABCIListenerPlugin(appOpts, keys, v)
|
|
return nil
|
|
}
|
|
|
|
// registerABCIListenerPlugin registers plugins that implement the ABCIListener interface.
|
|
func (app *BaseApp) registerABCIListenerPlugin(
|
|
appOpts servertypes.AppOptions,
|
|
keys map[string]*storetypes.KVStoreKey,
|
|
abciListener storetypes.ABCIListener,
|
|
) {
|
|
stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey)
|
|
stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey))
|
|
keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey)
|
|
exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey))
|
|
exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys)
|
|
app.cms.AddListeners(exposedKeys)
|
|
app.SetStreamingManager(
|
|
storetypes.StreamingManager{
|
|
ABCIListeners: []storetypes.ABCIListener{abciListener},
|
|
StopNodeOnErr: stopNodeOnErr,
|
|
},
|
|
)
|
|
}
|
|
|
|
func exposeAll(list []string) bool {
|
|
return slices.Contains(list, "*")
|
|
}
|
|
|
|
func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStoreKey) []storetypes.StoreKey {
|
|
var exposeStoreKeys []storetypes.StoreKey
|
|
if exposeAll(keysStr) {
|
|
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keys))
|
|
for key := range keys {
|
|
exposeStoreKeys = append(exposeStoreKeys, keys[key])
|
|
}
|
|
} else {
|
|
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keysStr))
|
|
for _, keyStr := range keysStr {
|
|
if storeKey, ok := keys[keyStr]; ok {
|
|
exposeStoreKeys = append(exposeStoreKeys, storeKey)
|
|
}
|
|
}
|
|
}
|
|
// sort storeKeys for deterministic output
|
|
sort.SliceStable(exposeStoreKeys, func(i, j int) bool {
|
|
return exposeStoreKeys[i].Name() < exposeStoreKeys[j].Name()
|
|
})
|
|
|
|
return exposeStoreKeys
|
|
}
|