Browse Source

feat(meta): metadata/sources add kind parameter

Distinguish between scan source and lookup source

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 years ago
parent
commit
9765a3075a

+ 4 - 1
internal/binder/meta/bind.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -25,6 +25,9 @@ func Bind() {
 	if err := meta.ReadSourceMetaDir(func(name string) bool {
 		s, _ := io.Source(name)
 		return s != nil
+	}, func(name string) bool {
+		s, _ := io.LookupSource(name)
+		return s != nil
 	}); nil != err {
 		conf.Log.Errorf("readSourceMetaDir:%v", err)
 	}

+ 2 - 2
internal/meta/connectionMeta_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@ func TestGetMqttConnectionMeta(t *testing.T) {
 		return
 	}
 
-	if err = ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), true); nil != err {
+	if err = ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), true, false); nil != err {
 		return
 	}
 

+ 41 - 16
internal/meta/sourceMeta.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@ package meta
 
 import (
 	"fmt"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"os"
 	"path"
 	"strings"
@@ -37,10 +38,12 @@ type (
 		Libs     []string           `json:"libs"`
 		ConfKeys map[string][]field `json:"properties"`
 		Node     interface{}        `json:"node"`
+		isScan   bool
+		isLookup bool
 	}
 )
 
-func newUiSource(fi *fileSource) (*uiSource, error) {
+func newUiSource(fi *fileSource, isScan bool, isLookup bool) (*uiSource, error) {
 	if nil == fi {
 		return nil, nil
 	}
@@ -55,6 +58,8 @@ func newUiSource(fi *fileSource) (*uiSource, error) {
 			return nil, err
 		}
 	}
+	ui.isScan = isScan
+	ui.isLookup = isLookup
 	return ui, nil
 }
 
@@ -74,7 +79,7 @@ func UninstallSource(name string) {
 	delYamlConf(fmt.Sprintf(SourceCfgOperatorKeyTemplate, name))
 }
 
-func ReadSourceMetaFile(filePath string, installed bool) error {
+func ReadSourceMetaFile(filePath string, isScan bool, isLookup bool) error {
 	fileName := path.Base(filePath)
 	if "mqtt_source.json" == fileName {
 		fileName = "mqtt.json"
@@ -84,10 +89,11 @@ func ReadSourceMetaFile(filePath string, installed bool) error {
 	if nil == ptrMeta.About {
 		return fmt.Errorf("not found about of %s", filePath)
 	} else {
-		ptrMeta.About.Installed = installed
+		// TODO currently, only show installed source in ui
+		ptrMeta.About.Installed = true
 	}
 
-	meta, err := newUiSource(ptrMeta)
+	meta, err := newUiSource(ptrMeta, isScan, isLookup)
 	if nil != err {
 		return err
 	}
@@ -101,7 +107,7 @@ func ReadSourceMetaFile(filePath string, installed bool) error {
 	return err
 }
 
-func ReadSourceMetaDir(checker InstallChecker) error {
+func ReadSourceMetaDir(scanChecker InstallChecker, lookupChecker InstallChecker) error {
 	//load etc/sources meta data
 	confDir, err := conf.GetConfLoc()
 	if nil != err {
@@ -114,7 +120,7 @@ func ReadSourceMetaDir(checker InstallChecker) error {
 		return err
 	}
 
-	if err = ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), true); nil != err {
+	if err = ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), true, false); nil != err {
 		return err
 	}
 	conf.Log.Infof("Loading metadata file for source : %s", "mqtt_source.json")
@@ -122,11 +128,18 @@ func ReadSourceMetaDir(checker InstallChecker) error {
 	for _, entry := range dirEntries {
 		fileName := entry.Name()
 		if strings.HasSuffix(fileName, ".json") {
-			filePath := path.Join(dir, fileName)
-			if err = ReadSourceMetaFile(filePath, checker(strings.TrimSuffix(fileName, ".json"))); nil != err {
-				return err
+			name := strings.TrimSuffix(fileName, ".json")
+			isScan := scanChecker(name)
+			isLookup := lookupChecker(name)
+			if isScan || isLookup {
+				filePath := path.Join(dir, fileName)
+				if err = ReadSourceMetaFile(filePath, isScan, isLookup); nil != err {
+					return err
+				}
+				conf.Log.Infof("Loading metadata file for source : %s", fileName)
+			} else {
+				conf.Log.Warnf("Find source metadata file but not installed : %s", fileName)
 			}
-			conf.Log.Infof("Loading metadata file for source : %s", fileName)
 		}
 	}
 
@@ -145,11 +158,18 @@ func ReadSourceMetaDir(checker InstallChecker) error {
 	for _, entry := range dirEntries {
 		fileName := entry.Name()
 		if strings.HasSuffix(fileName, ".json") {
-			filePath := path.Join(dir, fileName)
-			if err = ReadSourceMetaFile(filePath, checker(strings.TrimSuffix(fileName, ".json"))); nil != err {
-				return err
+			name := strings.TrimSuffix(fileName, ".json")
+			isScan := scanChecker(name)
+			isLookup := lookupChecker(name)
+			if isScan || isLookup {
+				filePath := path.Join(dir, fileName)
+				if err = ReadSourceMetaFile(filePath, isScan, isLookup); nil != err {
+					return err
+				}
+				conf.Log.Infof("Loading metadata file for source : %s", fileName)
+			} else {
+				conf.Log.Warnf("Find source metadata file but not installed : %s", fileName)
 			}
-			conf.Log.Infof("Loading metadata file for source : %s", fileName)
 		}
 	}
 
@@ -171,11 +191,16 @@ func GetSourceMeta(sourceName, language string) (ptrSourceProperty *uiSource, er
 	return ui, nil
 }
 
-func GetSourcesPlugins() (sources []*pluginfo) {
+func GetSourcesPlugins(kind string) (sources []*pluginfo) {
 	gSourcemetaLock.RLock()
 	defer gSourcemetaLock.RUnlock()
 
 	for fileName, v := range gSourcemetadata {
+		if kind == ast.StreamKindLookup && !v.isLookup {
+			continue
+		} else if kind == ast.StreamKindScan && !v.isScan {
+			continue
+		}
 		node := new(pluginfo)
 		node.Name = strings.TrimSuffix(fileName, `.json`)
 		if nil == v {

+ 3 - 3
internal/meta/sourceMeta_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -27,7 +27,7 @@ func TestGetMqttSourceMeta(t *testing.T) {
 		return
 	}
 
-	if err = ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), true); nil != err {
+	if err = ReadSourceMetaFile(path.Join(confDir, "mqtt_source.json"), true, false); nil != err {
 		t.Error(err)
 		return
 	}
@@ -51,7 +51,7 @@ func TestGetSqlSourceMeta(t *testing.T) {
 		return
 	}
 
-	if err = ReadSourceMetaFile(path.Join(confDir, "sources", "httppull.json"), true); nil != err {
+	if err = ReadSourceMetaFile(path.Join(confDir, "sources", "httppull.json"), true, false); nil != err {
 		t.Error(err)
 		return
 	}

+ 11 - 1
internal/plugin/native/manager.go

@@ -286,7 +286,17 @@ func (rr *Manager) Register(t plugin2.PluginType, j plugin2.Plugin) error {
 			conf.Log.Errorf("readSinkFile:%v", err)
 		}
 	case plugin2.SOURCE:
-		if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), true); nil != err {
+		isScan := true
+		isLookup := true
+		_, err := rr.Source(name)
+		if err != nil {
+			isScan = false
+		}
+		_, err = rr.LookupSource(name)
+		if err != nil {
+			isLookup = false
+		}
+		if err := meta.ReadSourceMetaFile(path.Join(rr.pluginConfDir, plugin2.PluginTypes[t], name+`.json`), isScan, isLookup); nil != err {
 			conf.Log.Errorf("readSourceFile:%v", err)
 		}
 	}

+ 1 - 1
internal/plugin/portable/manager.go

@@ -133,7 +133,7 @@ func (m *Manager) doRegister(name string, pi *PluginInfo, isInit bool) error {
 
 	if !isInit {
 		for _, s := range pi.Sources {
-			if err := meta.ReadSourceMetaFile(path.Join(m.pluginConfDir, plugin.PluginTypes[plugin.SOURCE], s+`.json`), true); nil != err {
+			if err := meta.ReadSourceMetaFile(path.Join(m.pluginConfDir, plugin.PluginTypes[plugin.SOURCE], s+`.json`), true, false); nil != err {
 				conf.Log.Errorf("read source json file:%v", err)
 			}
 		}

+ 9 - 1
internal/server/meta_init.go

@@ -21,6 +21,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 	"io"
 	"net/http"
 	"strings"
@@ -109,7 +110,14 @@ func operatorsMetaHandler(w http.ResponseWriter, r *http.Request) {
 // list source plugin
 func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()
-	ret := meta.GetSourcesPlugins()
+	kind := r.URL.Query().Get("kind")
+	switch strings.ToLower(kind) {
+	case "lookup":
+		kind = ast.StreamKindLookup
+	default:
+		kind = ast.StreamKindScan
+	}
+	ret := meta.GetSourcesPlugins(kind)
 	if nil != ret {
 		jsonResponse(ret, w, logger)
 		return