Browse Source

fix(config): rename factories method

Signed-off-by: Jianxiang Ran <rxan_embedded@163.com>
Jianxiang Ran 2 years ago
parent
commit
2fceb430cb

+ 6 - 6
internal/binder/factory.go

@@ -22,20 +22,20 @@ import (
 type SourceFactory interface {
 	Source(name string) (api.Source, error)
 	LookupSource(name string) (api.LookupSource, error)
-	// GetSourcePlugin use source type name (mqtt/video) to get source plugin installation information
+	// SourcePluginInfo use source type name (mqtt/video) to get source plugin installation information
 	// first return value is extension type, only native/portable plugin type have installation information
 	// second return value is the plugin name
 	// third is the plugin installation information
-	GetSourcePlugin(name string) (plugin.EXTENSION_TYPE, string, string)
+	SourcePluginInfo(name string) (plugin.EXTENSION_TYPE, string, string)
 }
 
 type SinkFactory interface {
 	Sink(name string) (api.Sink, error)
-	// GetSinkPlugin use sink type name (mqtt/redis) to get sink plugin installation information
+	// SinkPluginInfo use sink type name (mqtt/redis) to get sink plugin installation information
 	// first return value is extension type, only native/portable plugin type have installation information
 	// second return value is the plugin name
 	// third is the plugin installation information
-	GetSinkPlugin(name string) (plugin.EXTENSION_TYPE, string, string)
+	SinkPluginInfo(name string) (plugin.EXTENSION_TYPE, string, string)
 }
 
 type FuncFactory interface {
@@ -46,11 +46,11 @@ type FuncFactory interface {
 	// ConvName Convert the name of the function usually to lowercase.
 	// This is only be used when parsing the SQL statement.
 	ConvName(funcName string) (string, bool)
-	// GetFunctionPlugin Use function name to get the function plugin install script
+	// FunctionPluginInfo Use function name to get the function plugin install script
 	// first return value is extension type, only native/portable plugin type have installation information
 	// second return value is the plugin name
 	// third is the plugin installation information
-	GetFunctionPlugin(funcName string) (plugin.EXTENSION_TYPE, string, string)
+	FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string)
 }
 
 type FactoryEntry struct {

+ 1 - 1
internal/binder/function/binder.go

@@ -66,7 +66,7 @@ func Function(name string) (api.Function, error) {
 
 func GetFunctionPlugin(name string) (plugin.EXTENSION_TYPE, string, string) {
 	for _, sf := range funcFactories {
-		t, s1, s2 := sf.GetFunctionPlugin(name)
+		t, s1, s2 := sf.FunctionPluginInfo(name)
 		if t == plugin.NONE_EXTENSION {
 			continue
 		} else {

+ 1 - 1
internal/binder/function/function.go

@@ -128,7 +128,7 @@ func (m *Manager) HasFunctionSet(name string) bool {
 	return name == "internal"
 }
 
-func (m *Manager) GetFunctionPlugin(funcName string) (plugin.EXTENSION_TYPE, string, string) {
+func (m *Manager) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
 	_, ok := builtins[funcName]
 	if !ok {
 		return plugin.NONE_EXTENSION, "", ""

+ 2 - 2
internal/binder/io/binder.go

@@ -71,7 +71,7 @@ func Source(name string) (api.Source, error) {
 
 func GetSourcePlugin(name string) (plugin.EXTENSION_TYPE, string, string) {
 	for _, sf := range sourceFactories {
-		t, s1, s2 := sf.GetSourcePlugin(name)
+		t, s1, s2 := sf.SourcePluginInfo(name)
 		if t == plugin.NONE_EXTENSION {
 			continue
 		} else {
@@ -97,7 +97,7 @@ func Sink(name string) (api.Sink, error) {
 
 func GetSinkPlugin(name string) (plugin.EXTENSION_TYPE, string, string) {
 	for _, sf := range sinkFactories {
-		t, s1, s2 := sf.GetSinkPlugin(name)
+		t, s1, s2 := sf.SinkPluginInfo(name)
 		if t == plugin.NONE_EXTENSION {
 			continue
 		} else {

+ 2 - 2
internal/binder/io/builtin.go

@@ -62,7 +62,7 @@ func (m *Manager) Source(name string) (api.Source, error) {
 	return nil, nil
 }
 
-func (m *Manager) GetSourcePlugin(name string) (plugin2.EXTENSION_TYPE, string, string) {
+func (m *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
 	if _, ok := sources[name]; ok {
 		return plugin2.INTERNAL, "", ""
 	} else {
@@ -84,7 +84,7 @@ func (m *Manager) Sink(name string) (api.Sink, error) {
 	return nil, nil
 }
 
-func (m *Manager) GetSinkPlugin(name string) (plugin2.EXTENSION_TYPE, string, string) {
+func (m *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
 	if _, ok := sinks[name]; ok {
 		return plugin2.INTERNAL, "", ""
 	} else {

+ 3 - 3
internal/binder/mock/mock_factory.go

@@ -36,7 +36,7 @@ func (f *MockFactory) Source(name string) (api.Source, error) {
 	}
 }
 
-func (f *MockFactory) GetSourcePlugin(_ string) (plugin.EXTENSION_TYPE, string, string) {
+func (f *MockFactory) SourcePluginInfo(_ string) (plugin.EXTENSION_TYPE, string, string) {
 	return plugin.INTERNAL, "", ""
 }
 
@@ -52,7 +52,7 @@ func (f *MockFactory) Sink(name string) (api.Sink, error) {
 	}
 }
 
-func (f *MockFactory) GetSinkPlugin(_ string) (plugin.EXTENSION_TYPE, string, string) {
+func (f *MockFactory) SinkPluginInfo(_ string) (plugin.EXTENSION_TYPE, string, string) {
 	return plugin.INTERNAL, "", ""
 }
 
@@ -76,7 +76,7 @@ func (f *MockFactory) HasFunctionSet(funcName string) bool {
 	}
 }
 
-func (f *MockFactory) GetFunctionPlugin(funcName string) (plugin.EXTENSION_TYPE, string, string) {
+func (f *MockFactory) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
 	return plugin.NONE_EXTENSION, "", ""
 }
 

+ 3 - 3
internal/plugin/native/manager.go

@@ -585,7 +585,7 @@ func (rr *Manager) Source(name string) (api.Source, error) {
 	}
 }
 
-func (rr *Manager) GetSourcePlugin(name string) (plugin2.EXTENSION_TYPE, string, string) {
+func (rr *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
 	_, ok := rr.GetPluginVersionBySymbol(plugin2.SOURCE, name)
 	if ok {
 		pluginName, _ := rr.GetPluginBySymbol(plugin2.SOURCE, name)
@@ -636,7 +636,7 @@ func (rr *Manager) Sink(name string) (api.Sink, error) {
 	return s, nil
 }
 
-func (rr *Manager) GetSinkPlugin(name string) (plugin2.EXTENSION_TYPE, string, string) {
+func (rr *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
 	_, ok := rr.GetPluginVersionBySymbol(plugin2.SINK, name)
 	if ok {
 		pluginName, _ := rr.GetPluginBySymbol(plugin2.SINK, name)
@@ -674,7 +674,7 @@ func (rr *Manager) HasFunctionSet(name string) bool {
 	return ok
 }
 
-func (rr *Manager) GetFunctionPlugin(funcName string) (plugin2.EXTENSION_TYPE, string, string) {
+func (rr *Manager) FunctionPluginInfo(funcName string) (plugin2.EXTENSION_TYPE, string, string) {
 	pluginName, ok := rr.GetPluginBySymbol(plugin2.FUNCTION, funcName)
 	if ok {
 		var installScript = ""

+ 3 - 3
internal/plugin/portable/factory.go

@@ -30,7 +30,7 @@ func (m *Manager) Source(name string) (api.Source, error) {
 	return runtime.NewPortableSource(name, meta), nil
 }
 
-func (m *Manager) GetSourcePlugin(name string) (plugin.EXTENSION_TYPE, string, string) {
+func (m *Manager) SourcePluginInfo(name string) (plugin.EXTENSION_TYPE, string, string) {
 	pluginName, ok := m.reg.GetSymbol(plugin.SOURCE, name)
 	if ok {
 		var installScript = ""
@@ -54,7 +54,7 @@ func (m *Manager) Sink(name string) (api.Sink, error) {
 	return runtime.NewPortableSink(name, meta), nil
 }
 
-func (m *Manager) GetSinkPlugin(name string) (plugin.EXTENSION_TYPE, string, string) {
+func (m *Manager) SinkPluginInfo(name string) (plugin.EXTENSION_TYPE, string, string) {
 	pluginName, ok := m.reg.GetSymbol(plugin.SINK, name)
 	if ok {
 		var installScript = ""
@@ -94,7 +94,7 @@ func (m *Manager) HasFunctionSet(funcName string) bool {
 	return ok
 }
 
-func (m *Manager) GetFunctionPlugin(funcName string) (plugin.EXTENSION_TYPE, string, string) {
+func (m *Manager) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
 	pluginName, ok := m.reg.GetSymbol(plugin.FUNCTION, funcName)
 	if ok {
 		var installScript = ""

+ 1 - 1
internal/plugin/wasm/factory.go

@@ -39,7 +39,7 @@ func (m *Manager) HasFunctionSet(funcName string) bool {
 	return ok
 }
 
-func (m *Manager) GetFunctionPlugin(funcName string) (plugin.EXTENSION_TYPE, string, string) {
+func (m *Manager) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
 	return plugin.NONE_EXTENSION, "", ""
 }
 

+ 5 - 5
internal/processor/ruleset.go

@@ -30,7 +30,7 @@ type RulesetProcessor struct {
 type Ruleset struct {
 	Streams map[string]string `json:"streams"`
 	Tables  map[string]string `json:"tables"`
-	Rules   map[string]string `json:"Rules"`
+	Rules   map[string]string `json:"rules"`
 }
 
 func NewRulesetProcessor(r *RuleProcessor, s *StreamProcessor) *RulesetProcessor {
@@ -50,7 +50,7 @@ func (rs *RulesetProcessor) Export() (io.ReadSeeker, []int, error) {
 	all.Tables = allStreams["tables"]
 	rules, err := rs.r.GetAllRulesJson()
 	if err != nil {
-		return nil, nil, fmt.Errorf("fail to get all Rules: %v", err)
+		return nil, nil, fmt.Errorf("fail to get all rules: %v", err)
 	}
 	all.Rules = rules
 	jsonBytes, err := json.Marshal(all)
@@ -72,7 +72,7 @@ func (rs *RulesetProcessor) ExportRuleSet() *Ruleset {
 	all.Tables = allStreams["tables"]
 	rules, err := rs.r.GetAllRulesJson()
 	if err != nil {
-		conf.Log.Errorf("fail to get all Rules: %v", err)
+		conf.Log.Errorf("fail to get all rules: %v", err)
 		return nil
 	}
 	all.Rules = rules
@@ -128,7 +128,7 @@ func (rs *RulesetProcessor) Import(content []byte) ([]string, []int, error) {
 		}
 	}
 	var rules []string
-	// restore Rules
+	// restore rules
 	for k, v := range all.Rules {
 		_, e := rs.r.ExecCreateWithValidation(k, v)
 		if e != nil {
@@ -178,7 +178,7 @@ func (rs *RulesetProcessor) ImportRuleSet(all Ruleset) Ruleset {
 		}
 	}
 	var rules []string
-	// restore Rules
+	// restore rules
 	for k, v := range all.Rules {
 		_, e := rs.r.ExecCreateWithValidation(k, v)
 		if e != nil {

+ 5 - 5
internal/processor/ruleset_test.go

@@ -25,7 +25,7 @@ import (
 )
 
 func TestIO(t *testing.T) {
-	expected := `{"streams":{"demo":"CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"},"tables":{},"Rules":{"rule1":"{\"id\":\"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}","rule2":"{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}"}}`
+	expected := `{"streams":{"demo":"CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"},"tables":{},"rules":{"rule1":"{\"id\":\"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}","rule2":"{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}"}}`
 	expectedCounts := []int{1, 0, 2}
 	expectedStreams := []string{"demo"}
 	expectedRules := []string{"rule1", "rule2"}
@@ -42,7 +42,7 @@ func TestIO(t *testing.T) {
 	}
 	sort.Strings(names)
 	if !reflect.DeepEqual(names, expectedRules) {
-		t.Errorf("fail to return the imported Rules, expect %v but got %v", expectedRules, names)
+		t.Errorf("fail to return the imported rules, expect %v but got %v", expectedRules, names)
 	}
 	if !reflect.DeepEqual(counts, expectedCounts) {
 		t.Errorf("fail to return the correct counts, expect %v, but got %v", expectedCounts, counts)
@@ -60,12 +60,12 @@ func TestIO(t *testing.T) {
 
 	rules, err := rp.GetAllRules()
 	if err != nil {
-		t.Errorf("fail to get all Rules: %v", err)
+		t.Errorf("fail to get all rules: %v", err)
 		return
 	}
 	sort.Strings(rules)
 	if !reflect.DeepEqual(rules, expectedRules) {
-		t.Errorf("After import, expect Rules %v, but got %v", expectedRules, rules)
+		t.Errorf("After import, expect rules %v, but got %v", expectedRules, rules)
 		return
 	}
 
@@ -92,7 +92,7 @@ func TestIO(t *testing.T) {
 func TestImportError(t *testing.T) {
 	contents := []string{
 		"notjson",
-		`{INvalid"streams":{"demo":"CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"},"tables":{},"Rules":{"rule1":"{\"id\":\"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}","rule2":"{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}"}}`,
+		`{INvalid"streams":{"demo":"CREATE STREAM demo () WITH (DATASOURCE=\"users\", FORMAT=\"JSON\")"},"tables":{},"rules":{"rule1":"{\"id\":\"rule1\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{\"log\": {}}]}","rule2":"{\"id\": \"rule2\",\"sql\": \"SELECT * FROM demo\",\"actions\": [{  \"log\": {}}]}"}}`,
 	}
 	sp := NewStreamProcessor()
 	defer sp.db.Clean()

+ 0 - 36
internal/server/rest.go

@@ -141,7 +141,6 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r.HandleFunc("/config/uploads", fileUploadHandler).Methods(http.MethodPost, http.MethodGet)
 	r.HandleFunc("/config/uploads/{name}", fileDeleteHandler).Methods(http.MethodDelete)
 	r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
-	//r.HandleFunc("/data/partial/import", configurationPartialImportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
 	r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
 	// Register extended routes
@@ -921,41 +920,6 @@ func configurationImportHandler(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 }
 
-//func configurationPartialImportHandler(w http.ResponseWriter, r *http.Request) {
-//	rsi := &configurationInfo{}
-//	err := json.NewDecoder(r.Body).Decode(rsi)
-//	if err != nil {
-//		handleError(w, err, "Invalid body: Error decoding json", logger)
-//		return
-//	}
-//	if rsi.Content != "" && rsi.FilePath != "" {
-//		handleError(w, errors.New("bad request"), "Invalid body: Cannot specify both content and file", logger)
-//		return
-//	} else if rsi.Content == "" && rsi.FilePath == "" {
-//		handleError(w, errors.New("bad request"), "Invalid body: must specify content or file", logger)
-//		return
-//	}
-//	content := []byte(rsi.Content)
-//	if rsi.FilePath != "" {
-//		reader, err := httpx.ReadFile(rsi.FilePath)
-//		if err != nil {
-//			handleError(w, err, "Fail to read file", logger)
-//			return
-//		}
-//		defer reader.Close()
-//		buf := new(bytes.Buffer)
-//		_, err = io.Copy(buf, reader)
-//		if err != nil {
-//			handleError(w, err, "fail to convert file", logger)
-//			return
-//		}
-//		content = buf.Bytes()
-//	}
-//	result := configurationPartialImport(content)
-//
-//	jsonResponse(result, w, logger)
-//}
-
 func configurationStatusExport() Configuration {
 	conf := Configuration{
 		Streams:          make(map[string]string),

+ 1 - 1
internal/service/manager.go

@@ -201,7 +201,7 @@ func (m *Manager) HasFunctionSet(_ string) bool {
 	return false
 }
 
-func (m *Manager) GetFunctionPlugin(funcName string) (plugin.EXTENSION_TYPE, string, string) {
+func (m *Manager) FunctionPluginInfo(funcName string) (plugin.EXTENSION_TYPE, string, string) {
 	funcContainer, ok := m.getFunction(funcName)
 	if ok {
 		var installScript = ""

+ 1 - 1
internal/topo/node/lookup_node_test.go

@@ -108,7 +108,7 @@ func (m *mockFac) Source(_ string) (api.Source, error) {
 	return nil, nil
 }
 
-func (m *mockFac) GetSourcePlugin(_ string) (plugin.EXTENSION_TYPE, string, string) {
+func (m *mockFac) SourcePluginInfo(_ string) (plugin.EXTENSION_TYPE, string, string) {
 	return plugin.INTERNAL, "", ""
 }