Browse Source

style: enable error-return check and fix (#1734)

* enable error-return check and fix

Signed-off-by: yisaer <disxiaofei@163.com>

* fix

Signed-off-by: yisaer <disxiaofei@163.com>

---------

Signed-off-by: yisaer <disxiaofei@163.com>
Song Gao 2 years atrás
parent
commit
8891f8bb32

+ 3 - 3
internal/meta/yamlConfigMeta.go

@@ -42,9 +42,9 @@ func InitYamlConfigManager() {
 		lock:         sync.RWMutex{},
 		cfgOperators: make(map[string]conf.ConfigOperator),
 	}
-	_, ConfigManager.sourceConfigStatusDb = store.GetKV("sourceConfigStatus")
-	_, ConfigManager.sinkConfigStatusDb = store.GetKV("sinkConfigStatus")
-	_, ConfigManager.connectionConfigStatusDb = store.GetKV("connectionConfigStatus")
+	ConfigManager.sourceConfigStatusDb, _ = store.GetKV("sourceConfigStatus")
+	ConfigManager.sinkConfigStatusDb, _ = store.GetKV("sinkConfigStatus")
+	ConfigManager.connectionConfigStatusDb, _ = store.GetKV("connectionConfigStatus")
 }
 
 const SourceCfgOperatorKeyTemplate = "sources.%s"

+ 1 - 1
internal/pkg/store/definition/store.go

@@ -23,5 +23,5 @@ type StoreBuilder interface {
 }
 
 type TsBuilder interface {
-	CreateTs(table string) (error, kv.Tskv)
+	CreateTs(table string) (kv.Tskv, error)
 }

+ 3 - 3
internal/pkg/store/encoding/encoding.go

@@ -20,13 +20,13 @@ import (
 	"time"
 )
 
-func Encode(value interface{}) (error, []byte) {
+func Encode(value interface{}) ([]byte, error) {
 	var buff bytes.Buffer
 	gob.Register(time.Time{})
 	gob.Register(value)
 	enc := gob.NewEncoder(&buff)
 	if err := enc.Encode(value); err != nil {
-		return err, nil
+		return nil, err
 	}
-	return nil, buff.Bytes()
+	return buff.Bytes(), nil
 }

+ 2 - 2
internal/pkg/store/redis/redisKv.go

@@ -45,7 +45,7 @@ func createRedisKvStore(redis *redis.Client, table string) (*redisKvStore, error
 }
 
 func (kv redisKvStore) Setnx(key string, value interface{}) error {
-	err, b := kvEncoding.Encode(value)
+	b, err := kvEncoding.Encode(value)
 	if nil != err {
 		return err
 	}
@@ -60,7 +60,7 @@ func (kv redisKvStore) Setnx(key string, value interface{}) error {
 }
 
 func (kv redisKvStore) Set(key string, value interface{}) error {
-	err, b := kvEncoding.Encode(value)
+	b, err := kvEncoding.Encode(value)
 	if nil != err {
 		return err
 	}

+ 4 - 4
internal/pkg/store/redis/redisTs.go

@@ -47,11 +47,11 @@ func init() {
 	gob.Register(make(map[string]interface{}))
 }
 
-func createRedisTs(redis *redis.Client, table string) (error, *ts) {
+func createRedisTs(redis *redis.Client, table string) (*ts, error) {
 	key := fmt.Sprintf("%s:%s", TsPrefix, table)
 	lastTs, err := getLast(redis, key, nil)
 	if err != nil {
-		return err, nil
+		return nil, err
 	}
 	s := &ts{
 		db:    redis,
@@ -59,14 +59,14 @@ func createRedisTs(redis *redis.Client, table string) (error, *ts) {
 		last:  lastTs,
 		key:   key,
 	}
-	return nil, s
+	return s, nil
 }
 
 func (t *ts) Set(key int64, value interface{}) (bool, error) {
 	if key <= t.last {
 		return false, nil
 	}
-	err, b := kvEncoding.Encode(value)
+	b, err := kvEncoding.Encode(value)
 	if err != nil {
 		return false, err
 	}

+ 1 - 1
internal/pkg/store/redis/redisTsBuilder.go

@@ -32,6 +32,6 @@ func NewTsBuilder(d *redis.Client) TsBuilder {
 	}
 }
 
-func (b TsBuilder) CreateTs(table string) (error, st.Tskv) {
+func (b TsBuilder) CreateTs(table string) (st.Tskv, error) {
 	return createRedisTs(b.redis, table)
 }

+ 1 - 1
internal/pkg/store/redis/redisTs_test.go

@@ -70,7 +70,7 @@ func setupTRedisKv() (ts2.Tskv, *redis.Client, *miniredis.Miniredis) {
 
 	builder := NewTsBuilder(redisDB)
 	var ks ts2.Tskv
-	err, ks = builder.CreateTs("test")
+	ks, err = builder.CreateTs("test")
 	if err != nil {
 		panic(err)
 	}

+ 2 - 2
internal/pkg/store/sql/sqlKv.go

@@ -47,7 +47,7 @@ func createSqlKvStore(database Database, table string) (*sqlKvStore, error) {
 
 func (kv *sqlKvStore) Setnx(key string, value interface{}) error {
 	return kv.database.Apply(func(db *sql.DB) error {
-		err, b := kvEncoding.Encode(value)
+		b, err := kvEncoding.Encode(value)
 		if nil != err {
 			return err
 		}
@@ -65,7 +65,7 @@ func (kv *sqlKvStore) Setnx(key string, value interface{}) error {
 }
 
 func (kv *sqlKvStore) Set(key string, value interface{}) error {
-	err, b := kvEncoding.Encode(value)
+	b, err := kvEncoding.Encode(value)
 	if nil != err {
 		return err
 	}

+ 4 - 4
internal/pkg/store/sql/sqlTs.go

@@ -32,7 +32,7 @@ func init() {
 	gob.Register(make(map[string]interface{}))
 }
 
-func createSqlTs(database Database, table string) (error, *ts) {
+func createSqlTs(database Database, table string) (*ts, error) {
 	store := &ts{
 		database: database,
 		table:    table,
@@ -44,16 +44,16 @@ func createSqlTs(database Database, table string) (error, *ts) {
 		return err
 	})
 	if err != nil {
-		return err, nil
+		return nil, err
 	}
-	return nil, store
+	return store, nil
 }
 
 func (t *ts) Set(key int64, value interface{}) (bool, error) {
 	if key <= t.last {
 		return false, nil
 	}
-	err, b := kvEncoding.Encode(value)
+	b, err := kvEncoding.Encode(value)
 	if err != nil {
 		return false, err
 	}

+ 1 - 1
internal/pkg/store/sql/sqlTsBuilder.go

@@ -28,6 +28,6 @@ func NewTsBuilder(d Database) TsBuilder {
 	}
 }
 
-func (b TsBuilder) CreateTs(table string) (error, kv.Tskv) {
+func (b TsBuilder) CreateTs(table string) (kv.Tskv, error) {
 	return createSqlTs(b.database, table)
 }

+ 1 - 1
internal/pkg/store/sql/sqlTs_test.go

@@ -100,7 +100,7 @@ func setupTSqlKv() (ts2.Tskv, definition.Database, string) {
 		panic(err)
 	}
 	var store ts2.Tskv
-	err, store = builder.CreateTs(TTable)
+	store, err = builder.CreateTs(TTable)
 	if err != nil {
 		panic(err)
 	}

+ 15 - 15
internal/pkg/store/stores.go

@@ -62,18 +62,18 @@ func newStores(c definition.Config, name string) (*stores, error) {
 	}
 }
 
-func (s *stores) GetKV(table string) (error, kv.KeyValue) {
+func (s *stores) GetKV(table string) (kv.KeyValue, error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if ks, contains := s.kv[table]; contains {
-		return nil, ks
+		return ks, nil
 	}
 	ks, err := s.kvBuilder.CreateStore(table)
 	if err != nil {
-		return err, nil
+		return nil, err
 	}
 	s.kv[table] = ks
-	return nil, ks
+	return ks, nil
 }
 
 func (s *stores) DropKV(table string) {
@@ -98,18 +98,18 @@ func (s *stores) DropRefKVs(tablePrefix string) {
 	}
 }
 
-func (s *stores) GetTS(table string) (error, kv.Tskv) {
+func (s *stores) GetTS(table string) (kv.Tskv, error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if tts, contains := s.ts[table]; contains {
-		return nil, tts
+		return tts, nil
 	}
-	err, tts := s.tsBuilder.CreateTs(table)
+	tts, err := s.tsBuilder.CreateTs(table)
 	if err != nil {
-		return err, nil
+		return nil, err
 	}
 	s.ts[table] = tts
-	return nil, tts
+	return tts, nil
 }
 
 func (s *stores) DropTS(table string) {
@@ -122,16 +122,16 @@ func (s *stores) DropTS(table string) {
 	}
 }
 
-func GetKV(table string) (error, kv.KeyValue) {
+func GetKV(table string) (kv.KeyValue, error) {
 	if globalStores == nil {
-		return fmt.Errorf("global stores are not initialized"), nil
+		return nil, fmt.Errorf("global stores are not initialized")
 	}
 	return globalStores.GetKV(table)
 }
 
-func GetTS(table string) (error, kv.Tskv) {
+func GetTS(table string) (kv.Tskv, error) {
 	if globalStores == nil {
-		return fmt.Errorf("global stores are not initialized"), nil
+		return nil, fmt.Errorf("global stores are not initialized")
 	}
 	return globalStores.GetTS(table)
 }
@@ -152,9 +152,9 @@ func DropKV(table string) error {
 	return nil
 }
 
-func GetCacheKV(table string) (error, kv.KeyValue) {
+func GetCacheKV(table string) (kv.KeyValue, error) {
 	if cacheStores == nil {
-		return fmt.Errorf("cache stores are not initialized"), nil
+		return nil, fmt.Errorf("cache stores are not initialized")
 	}
 	return cacheStores.GetKV(table)
 }

+ 3 - 3
internal/plugin/native/manager.go

@@ -78,15 +78,15 @@ func InitManager() (*Manager, error) {
 	if err != nil {
 		return nil, fmt.Errorf("cannot find data folder: %s", err)
 	}
-	err, func_db := store.GetKV("pluginFuncs")
+	func_db, err := store.GetKV("pluginFuncs")
 	if err != nil {
 		return nil, fmt.Errorf("error when opening funcSymbolsdb: %v", err)
 	}
-	err, plg_db := store.GetKV("nativePlugin")
+	plg_db, err := store.GetKV("nativePlugin")
 	if err != nil {
 		return nil, fmt.Errorf("error when opening nativePlugin: %v", err)
 	}
-	err, plg_status_db := store.GetKV("nativePluginStatus")
+	plg_status_db, err := store.GetKV("nativePluginStatus")
 	if err != nil {
 		return nil, fmt.Errorf("error when opening nativePluginStatus: %v", err)
 	}

+ 2 - 2
internal/plugin/portable/manager.go

@@ -77,11 +77,11 @@ func InitManager() (*Manager, error) {
 	if err != nil {
 		return nil, err
 	}
-	err, plg_db := store.GetKV("portablePlugin")
+	plg_db, err := store.GetKV("portablePlugin")
 	if err != nil {
 		return nil, fmt.Errorf("error when opening portablePlugin: %v", err)
 	}
-	err, plg_status_db := store.GetKV("portablePluginStatus")
+	plg_status_db, err := store.GetKV("portablePluginStatus")
 	if err != nil {
 		return nil, fmt.Errorf("error when opening portablePluginStatus: %v", err)
 	}

+ 2 - 2
internal/processor/rule.go

@@ -32,11 +32,11 @@ type RuleProcessor struct {
 }
 
 func NewRuleProcessor() *RuleProcessor {
-	err, db := store.GetKV("rule")
+	db, err := store.GetKV("rule")
 	if err != nil {
 		panic(fmt.Sprintf("Can not initialize store for the rule processor at path 'rule': %v", err))
 	}
-	err, ruleStatusDb := store.GetKV("ruleStatus")
+	ruleStatusDb, err := store.GetKV("ruleStatus")
 	if err != nil {
 		panic(fmt.Sprintf("Can not initialize store for the rule processor at path 'rule': %v", err))
 	}

+ 3 - 3
internal/processor/stream.go

@@ -40,15 +40,15 @@ type StreamProcessor struct {
 }
 
 func NewStreamProcessor() *StreamProcessor {
-	err, db := store.GetKV("stream")
+	db, err := store.GetKV("stream")
 	if err != nil {
 		panic(fmt.Sprintf("Can not initialize store for the stream processor at path 'stream': %v", err))
 	}
-	err, streamDb := store.GetKV("streamStatus")
+	streamDb, err := store.GetKV("streamStatus")
 	if err != nil {
 		panic(fmt.Sprintf("Can not initialize store for the stream processor at path 'stream': %v", err))
 	}
-	err, tableDb := store.GetKV("tableStatus")
+	tableDb, err := store.GetKV("tableStatus")
 	if err != nil {
 		panic(fmt.Sprintf("Can not initialize store for the stream processor at path 'stream': %v", err))
 	}

+ 2 - 2
internal/schema/registry.go

@@ -59,11 +59,11 @@ func InitRegistry() error {
 	if err != nil {
 		return fmt.Errorf("cannot find etc folder: %s", err)
 	}
-	err, schemaDb = store.GetKV("schema")
+	schemaDb, err = store.GetKV("schema")
 	if err != nil {
 		return fmt.Errorf("cannot open schema db: %s", err)
 	}
-	err, schemaStatusDb = store.GetKV("schemaStatus")
+	schemaStatusDb, err = store.GetKV("schemaStatus")
 	if err != nil {
 		return fmt.Errorf("cannot open schemaStatus db: %s", err)
 	}

+ 3 - 3
internal/server/plugin_init.go

@@ -224,7 +224,7 @@ func prebuildPluginsHandler(w http.ResponseWriter, _ *http.Request, t plugin.Plu
 		}
 
 		hosts := conf.Config.Basic.PluginHosts
-		if err, plugins := fetchPluginList(t, hosts, os, runtime.GOARCH); err != nil {
+		if plugins, err := fetchPluginList(t, hosts, os, runtime.GOARCH); err != nil {
 			handleError(w, err, "", logger)
 		} else {
 			jsonResponse(plugins, w, logger)
@@ -238,7 +238,7 @@ var NativeSourcePlugin = []string{"random", "zmq", "sql", "video"}
 var NativeSinkPlugin = []string{"image", "influx", "influx2", "tdengine", "zmq", "sql"}
 var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage", "tfLite"}
 
-func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, result map[string]string) {
+func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (result map[string]string, err error) {
 	ptype := "sources"
 	plugins := NativeSourcePlugin
 	if t == plugin.SINK {
@@ -251,7 +251,7 @@ func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, re
 
 	if hosts == "" || ptype == "" || os == "" {
 		logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
-		return fmt.Errorf("invalid configruation for plugin host in kuiper.yaml"), nil
+		return nil, fmt.Errorf("invalid configruation for plugin host in kuiper.yaml")
 	}
 	result = make(map[string]string)
 	hostsArr := strings.Split(hosts, ",")

+ 1 - 1
internal/server/plugin_init_test.go

@@ -70,7 +70,7 @@ func Test_fetchPluginList(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			gotErr, gotResult := fetchPluginList(tt.args.t, tt.args.hosts, tt.args.os, tt.args.arch)
+			gotResult, gotErr := fetchPluginList(tt.args.t, tt.args.hosts, tt.args.os, tt.args.arch)
 			if !reflect.DeepEqual(gotErr, tt.wantErr) {
 				t.Errorf("fetchPluginList() gotErr = %v, want %v", gotErr, tt.wantErr)
 			}

+ 1 - 1
internal/server/rule_migration.go

@@ -73,7 +73,7 @@ func ruleTraverse(rule *api.Rule, de *dependencies) {
 		if err != nil {
 			return
 		}
-		err, store := store2.GetKV("stream")
+		store, err := store2.GetKV("stream")
 		if err != nil {
 			return
 		}

+ 4 - 4
internal/service/manager.go

@@ -64,19 +64,19 @@ func InitManager() (*Manager, error) {
 		if err != nil {
 			return nil, fmt.Errorf("cannot find etc/services folder: %s", err)
 		}
-		err, sdb := store.GetKV("services")
+		sdb, err := store.GetKV("services")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open service db: %s", err)
 		}
-		err, fdb := store.GetKV("serviceFuncs")
+		fdb, err := store.GetKV("serviceFuncs")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open function db: %s", err)
 		}
-		err, sInstallDb := store.GetKV("serviceInstall")
+		sInstallDb, err := store.GetKV("serviceInstall")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open service db: %s", err)
 		}
-		err, statusDb := store.GetKV("serviceInstallStatus")
+		statusDb, err := store.GetKV("serviceInstallStatus")
 		if err != nil {
 			return nil, fmt.Errorf("cannot open service db: %s", err)
 		}

+ 1 - 1
internal/topo/node/cache/sync_cache.go

@@ -350,7 +350,7 @@ func (c *SyncCache) initStore(ctx api.StreamContext) {
 		store.DropCacheKV(kvTable)
 	}
 	var err error
-	err, c.store = store.GetCacheKV(kvTable)
+	c.store, err = store.GetCacheKV(kvTable)
 	if err != nil {
 		infra.DrainError(ctx, err, c.errorCh)
 	}

+ 2 - 2
internal/topo/planner/analyzer_test.go

@@ -130,7 +130,7 @@ var tests = []struct {
 }
 
 func Test_validation(t *testing.T) {
-	err, store := store.GetKV("stream")
+	store, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -192,7 +192,7 @@ func Test_validation(t *testing.T) {
 }
 
 func Test_validationSchemaless(t *testing.T) {
-	err, store := store.GetKV("stream")
+	store, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return

+ 1 - 1
internal/topo/planner/planner.go

@@ -53,7 +53,7 @@ func PlanSQLWithSourcesAndSinks(rule *api.Rule, sources []*node.SourceNode, sink
 	if rule.Options.SendMetaToSink && (len(streamsFromStmt) > 1 || stmt.Dimensions != nil) {
 		return nil, fmt.Errorf("Invalid option sendMetaToSink, it can not be applied to window")
 	}
-	err, store := store2.GetKV("stream")
+	store, err := store2.GetKV("stream")
 	if err != nil {
 		return nil, err
 	}

+ 3 - 3
internal/topo/planner/planner_test.go

@@ -33,7 +33,7 @@ func init() {
 }
 
 func Test_createLogicalPlan(t *testing.T) {
-	err, store := store.GetKV("stream")
+	store, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -1512,7 +1512,7 @@ func Test_createLogicalPlan(t *testing.T) {
 }
 
 func Test_createLogicalPlanSchemaless(t *testing.T) {
-	err, store := store.GetKV("stream")
+	store, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return
@@ -2484,7 +2484,7 @@ func Test_createLogicalPlanSchemaless(t *testing.T) {
 }
 
 func Test_createLogicalPlan4Lookup(t *testing.T) {
-	err, store := store.GetKV("stream")
+	store, err := store.GetKV("stream")
 	if err != nil {
 		t.Error(err)
 		return

+ 1 - 1
internal/topo/state/kv_store.go

@@ -49,7 +49,7 @@ type KVStore struct {
 // "$checkpointId":A map with key of checkpoint id and value of snapshot(gob serialized)
 // Assume each operator only has one instance
 func getKVStore(ruleId string) (*KVStore, error) {
-	err, db := ts.GetTS(ruleId)
+	db, err := ts.GetTS(ruleId)
 	if err != nil {
 		return nil, err
 	}

+ 1 - 1
tools/check/revive.toml

@@ -7,7 +7,7 @@ warningCode = -1
 [rule.blank-imports]
 [rule.context-as-argument]
 [rule.dot-imports]
-#[rule.error-return]
+[rule.error-return]
 #[rule.error-strings]
 [rule.error-naming]
 #[rule.exported]