Browse Source

Merge pull request #177 from emqx/edgex

Edgex
jinfahua 5 years atrás
parent
commit
9ce1eb22a6

+ 1 - 1
README.md

@@ -112,7 +112,7 @@ It can be run at various IoT edge use scenarios, such as real-time processing of
 | Raspberry Pi 3B+                               | 12k                  | sys+user: 70% | 20M          |
 | AWS t2.micro( 1 Core * 1 GB) <br />Ubuntu18.04 | 10k                  | sys+user: 25% | 20M          |
 
-### Max support rule support
+### Max number of rules support
 
 - 8000 rules with 800 message/second
 - Configurations

+ 1 - 0
common/plugin_manager/manager.go

@@ -33,6 +33,7 @@ func GetPlugin(t string, ptype string) (plugin.Symbol, error) {
 		if err != nil {
 			return nil, fmt.Errorf("cannot find symbol %s, please check if it is exported", t)
 		}
+		registry[key] = nf
 	}
 	return nf, nil
 }

+ 2 - 2
docs/en_US/edgex/edgex_rule_engine_tutorial.md

@@ -78,7 +78,7 @@ There are two approaches to manage stream, you can use your preferred approach.
 
 #### Option 1: Use Rest API
 
-The next step is to create a stream that can consuming data from EdgeX message bus. Please change ``127.0.0.1`` to your local Kuiper docker IP address.
+The next step is to create a stream that can consume data from EdgeX message bus. Please change ``$your_server`` to Kuiper docker instance IP address.
 
 ```shell
 curl -X POST \
@@ -211,7 +211,7 @@ Since all of the analysis result are published to  ``tcp://broker.emqx.io:1883``
 
 You'll find that only those randomnumber larger than 30 will be published to ``result`` topic.
 
-You can also type below command to look at the rule execution status. The corresponding REST API is also available for getting rule status, please check [related docuement](../restapi/overview.md).
+You can also type below command to look at the rule execution status. The corresponding REST API is also available for getting rule status, please check [related document](../restapi/overview.md).
 
 ```shell
 # bin/cli getstatus rule rule1

+ 334 - 0
plugins/manager.go

@@ -0,0 +1,334 @@
+package plugins
+
+import (
+	"archive/zip"
+	"errors"
+	"fmt"
+	"github.com/emqx/kuiper/common"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+	"strings"
+	"sync"
+	"unicode"
+)
+
+type Plugin struct {
+	Name     string `json:"name"`
+	File     string `json:"file"`
+	Callback string `json:"callback"`
+}
+
+type PluginType int
+
+const (
+	SOURCE PluginType = iota
+	SINK
+	FUNCTION
+)
+
+var (
+	PluginTypes = []string{"sources", "sinks", "functions"}
+	once        sync.Once
+	singleton   *Manager
+)
+
+//Registry is append only because plugin cannot delete or reload. To delete a plugin, restart the server to reindex
+type Registry struct {
+	sync.RWMutex
+	internal [][]string
+}
+
+func (rr *Registry) Store(t PluginType, value string) {
+	rr.Lock()
+	rr.internal[t] = append(rr.internal[t], value)
+	rr.Unlock()
+}
+
+func (rr *Registry) List(t PluginType) (values []string) {
+	rr.RLock()
+	result := rr.internal[t]
+	rr.RUnlock()
+	return result
+}
+
+//func (rr *Registry) Delete(t PluginType, value string) {
+//	rr.Lock()
+//	s := rr.internal[t]
+//	for i, f := range s{
+//		if f == value{
+//			s[len(s)-1], s[i] = s[i], s[len(s)-1]
+//			rr.internal[t] = s
+//			break
+//		}
+//	}
+//	rr.Unlock()
+//}
+
+type Manager struct {
+	pluginDir string
+	etcDir    string
+	registry  *Registry
+}
+
+func NewPluginManager() (*Manager, error) {
+	var err error
+	once.Do(func() {
+		dir, err := common.GetLoc("/plugins")
+		if err != nil {
+			err = fmt.Errorf("cannot find plugins folder: %s", err)
+			return
+		}
+		etcDir, err := common.GetLoc("/etc")
+		if err != nil {
+			err = fmt.Errorf("cannot find etc folder: %s", err)
+			return
+		}
+
+		plugins := make([][]string, 3)
+		for i := 0; i < 3; i++ {
+			names, err := findAll(PluginType(i), dir)
+			if err != nil {
+				err = fmt.Errorf("fail to find existing plugins: %s", err)
+				return
+			}
+			plugins[i] = names
+		}
+		registry := &Registry{internal: plugins}
+
+		singleton = &Manager{
+			pluginDir: dir,
+			etcDir:    etcDir,
+			registry:  registry,
+		}
+	})
+	return singleton, err
+}
+
+func findAll(t PluginType, pluginDir string) (result []string, err error) {
+	dir := path.Join(pluginDir, PluginTypes[t])
+	files, err := ioutil.ReadDir(dir)
+	if err != nil {
+		return
+	}
+
+	for _, file := range files {
+		baseName := filepath.Base(file.Name())
+		if strings.HasSuffix(baseName, ".so") {
+			result = append(result, lcFirst(baseName[0:len(baseName)-3]))
+		}
+	}
+	return
+}
+
+func (m *Manager) List(t PluginType) (result []string, err error) {
+	return m.registry.List(t), nil
+}
+
+func (m *Manager) Register(t PluginType, j *Plugin) error {
+	name, uri, cb := j.Name, j.File, j.Callback
+	//Validation
+	name = strings.Trim(name, " ")
+	if name == "" {
+		return fmt.Errorf("invalid name %s: should not be empty", name)
+	}
+	if !isValidUrl(uri) || !strings.HasSuffix(uri, ".zip") {
+		return fmt.Errorf("invalid uri %s", uri)
+	}
+
+	for _, n := range m.registry.List(t) {
+		if n == name {
+			return fmt.Errorf("invalid name %s: duplicate", name)
+		}
+	}
+	zipPath := path.Join(m.pluginDir, name+".zip")
+	var unzipFiles []string
+	//clean up: delete zip file and unzip files in error
+	defer os.Remove(zipPath)
+	//download
+	err := downloadFile(zipPath, uri)
+	if err != nil {
+		return fmt.Errorf("fail to download file %s: %s", uri, err)
+	}
+	//unzip and copy to destination
+	unzipFiles, err = m.unzipAndCopy(t, name, zipPath)
+	if err != nil {
+		if t == SOURCE && len(unzipFiles) == 1 { //source that only copy so file
+			os.Remove(unzipFiles[0])
+		}
+		return fmt.Errorf("fail to unzip file %s: %s", uri, err)
+	}
+
+	m.registry.Store(t, name)
+	return callback(cb)
+}
+
+func (m *Manager) Delete(t PluginType, name string) (result error) {
+	name = strings.Trim(name, " ")
+	if name == "" {
+		return fmt.Errorf("invalid name %s: should not be empty", name)
+	}
+	found := false
+	for _, n := range m.registry.List(t) {
+		if n == name {
+			found = true
+		}
+	}
+	if !found {
+		return fmt.Errorf("invalid name %s: not exist", name)
+	}
+	var results []string
+	paths := []string{
+		path.Join(m.pluginDir, PluginTypes[t], ucFirst(name)+".so"),
+	}
+	if t == SOURCE {
+		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], name+".yaml"))
+	}
+	for _, p := range paths {
+		_, err := os.Stat(p)
+		if err == nil {
+			err = os.Remove(p)
+			if err != nil {
+				results = append(results, err.Error())
+			}
+		} else {
+			results = append(results, fmt.Sprintf("can't find %s", p))
+		}
+	}
+
+	if len(results) > 0 {
+		return errors.New(strings.Join(results, "\n"))
+	} else {
+		return nil
+	}
+}
+
+func (m *Manager) unzipAndCopy(t PluginType, name string, src string) ([]string, error) {
+	var filenames []string
+	r, err := zip.OpenReader(src)
+	if err != nil {
+		return filenames, err
+	}
+	defer r.Close()
+
+	files := []string{
+		ucFirst(name) + ".so",
+	}
+	paths := []string{
+		path.Join(m.pluginDir, PluginTypes[t], files[0]),
+	}
+	if t == SOURCE {
+		files = append(files, name+".yaml")
+		paths = append(paths, path.Join(m.etcDir, PluginTypes[t], files[1]))
+	}
+	for i, d := range files {
+		var z *zip.File
+		for _, file := range r.File {
+			fileName := file.Name
+			if fileName == d {
+				z = file
+			}
+		}
+		if z == nil {
+			return filenames, fmt.Errorf("invalid zip file: so file or conf file is missing")
+		}
+
+		err = unzipTo(z, paths[i])
+		if err != nil {
+			return filenames, err
+		}
+		filenames = append(filenames, paths[i])
+	}
+	return filenames, nil
+}
+
+func unzipTo(f *zip.File, fpath string) error {
+	_, err := os.Stat(fpath)
+	if err == nil || !os.IsNotExist(err) {
+		return fmt.Errorf("%s already exist", fpath)
+	}
+
+	if f.FileInfo().IsDir() {
+		return fmt.Errorf("%s: not a file, but a directory", fpath)
+	}
+
+	if err := os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
+		return err
+	}
+
+	outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
+	if err != nil {
+		return err
+	}
+
+	rc, err := f.Open()
+	if err != nil {
+		return err
+	}
+
+	_, err = io.Copy(outFile, rc)
+
+	outFile.Close()
+	rc.Close()
+	return err
+}
+
+func isValidUrl(uri string) bool {
+	_, err := url.ParseRequestURI(uri)
+	if err != nil {
+		return false
+	}
+
+	u, err := url.Parse(uri)
+	if err != nil || u.Scheme == "" || u.Host == "" {
+		return false
+	}
+
+	return true
+}
+
+func downloadFile(filepath string, url string) error {
+
+	// Get the data
+	resp, err := http.Get(url)
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return fmt.Errorf("cannot download the file with status: %d %s", resp.StatusCode, resp.Status)
+	}
+	defer resp.Body.Close()
+
+	// Create the file
+	out, err := os.Create(filepath)
+	if err != nil {
+		return err
+	}
+	defer out.Close()
+
+	// Write the body to file
+	_, err = io.Copy(out, resp.Body)
+	return err
+}
+
+func ucFirst(str string) string {
+	for i, v := range str {
+		return string(unicode.ToUpper(v)) + str[i+1:]
+	}
+	return ""
+}
+
+func lcFirst(str string) string {
+	for i, v := range str {
+		return string(unicode.ToLower(v)) + str[i+1:]
+	}
+	return ""
+}
+
+func callback(u string) error {
+	return nil
+}

+ 140 - 0
plugins/manager_test.go

@@ -0,0 +1,140 @@
+package plugins
+
+import (
+	"errors"
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"path"
+	"reflect"
+	"testing"
+)
+
+func TestManager_Register(t *testing.T) {
+	s := httptest.NewServer(
+		http.FileServer(http.Dir("testzips")),
+	)
+	defer s.Close()
+	endpoint := s.URL
+
+	data := []struct {
+		t   PluginType
+		n   string
+		u   string
+		err error
+	}{
+		{
+			t:   SOURCE,
+			n:   "",
+			u:   "",
+			err: errors.New("invalid name : should not be empty"),
+		}, {
+			t:   SOURCE,
+			n:   "zipMissConf",
+			u:   endpoint + "/sources/zipMissConf.zip",
+			err: errors.New("fail to unzip file " + endpoint + "/sources/zipMissConf.zip: invalid zip file: so file or conf file is missing"),
+		}, {
+			t:   SINK,
+			n:   "urlerror",
+			u:   endpoint + "/sinks/nozip",
+			err: errors.New("invalid uri " + endpoint + "/sinks/nozip"),
+		}, {
+			t:   SINK,
+			n:   "zipWrongname",
+			u:   endpoint + "/sinks/zipWrongName.zip",
+			err: errors.New("fail to unzip file " + endpoint + "/sinks/zipWrongName.zip: invalid zip file: so file or conf file is missing"),
+		}, {
+			t:   FUNCTION,
+			n:   "zipMissSo",
+			u:   endpoint + "/functions/zipMissSo.zip",
+			err: errors.New("fail to unzip file " + endpoint + "/functions/zipMissSo.zip: invalid zip file: so file or conf file is missing"),
+		}, {
+			t: SOURCE,
+			n: "random2",
+			u: endpoint + "/sources/random2.zip",
+		}, {
+			t: SINK,
+			n: "file2",
+			u: endpoint + "/sinks/file2.zip",
+		}, {
+			t: FUNCTION,
+			n: "echo2",
+			u: endpoint + "/functions/echo2.zip",
+		}, {
+			t:   FUNCTION,
+			n:   "echo2",
+			u:   endpoint + "/functions/echo2.zip",
+			err: errors.New("invalid name echo2: duplicate"),
+		},
+	}
+	manager, err := NewPluginManager()
+	if err != nil {
+		t.Error(err)
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(data))
+	for i, tt := range data {
+		err = manager.Register(tt.t, &Plugin{
+			Name:     tt.n,
+			File:     tt.u,
+			Callback: "",
+		})
+		if !reflect.DeepEqual(tt.err, err) {
+			t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+		} else if tt.err == nil {
+			err := checkFile(manager.pluginDir, manager.etcDir, tt.t, tt.n)
+			if err != nil {
+				t.Errorf("%d: error : %s\n\n", i, err)
+			}
+		}
+	}
+
+}
+
+func TestManager_Delete(t *testing.T) {
+	data := []struct {
+		t   PluginType
+		n   string
+		err error
+	}{
+		{
+			t: SOURCE,
+			n: "random2",
+		}, {
+			t: SINK,
+			n: "file2",
+		}, {
+			t: FUNCTION,
+			n: "echo2",
+		},
+	}
+	manager, err := NewPluginManager()
+	if err != nil {
+		t.Error(err)
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(data))
+
+	for i, p := range data {
+		err = manager.Delete(p.t, p.n)
+		if err != nil {
+			t.Errorf("%d: delete error : %s\n\n", i, err)
+		}
+	}
+}
+
+func checkFile(pluginDir string, etcDir string, t PluginType, name string) error {
+	soPath := path.Join(pluginDir, PluginTypes[t], ucFirst(name)+".so")
+	_, err := os.Stat(soPath)
+	if err != nil {
+		return err
+	}
+	if t == SOURCE {
+		etcPath := path.Join(etcDir, PluginTypes[t], name+".yaml")
+		_, err = os.Stat(etcPath)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 38 - 0
plugins/plugins.http

@@ -0,0 +1,38 @@
+###
+
+POST http://127.0.0.1:9081/plugins/sources
+Content-Type: application/json
+
+{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":""}
+
+###
+GET http://127.0.0.1:9081/plugins/sources
+
+###
+DELETE http://127.0.0.1:9081/plugins/sources/random2
+
+###
+POST http://127.0.0.1:9081/plugins/sinks
+Content-Type: application/json
+
+{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":""}
+
+###
+GET http://127.0.0.1:9081/plugins/sinks
+
+###
+DELETE http://127.0.0.1:9081/plugins/sinks/random2
+
+###
+POST http://127.0.0.1:9081/plugins/functions
+Content-Type: application/json
+
+{"name":"random2","file":"http://127.0.0.1/plugins/sources/random2.zip","callback":""}
+
+###
+GET http://127.0.0.1:9081/plugins/functions
+
+###
+DELETE http://127.0.0.1:9081/plugins/functions/random2
+
+###2217

BIN
plugins/testzips/functions/echo2.zip


BIN
plugins/testzips/functions/zipMissSo.zip


BIN
plugins/testzips/sinks/file2.zip


BIN
plugins/testzips/sinks/zipWrongName.zip


BIN
plugins/testzips/sources/random2.zip


BIN
plugins/testzips/sources/zipMissConf.zip


+ 1 - 1
xsql/functions.go

@@ -77,7 +77,7 @@ func (*FunctionValuer) Call(name string, args []interface{}) (interface{}, bool)
 	} else {
 		common.Log.Debugf("run func %s", name)
 		if nf, err := plugin_manager.GetPlugin(name, "functions"); err != nil {
-			return nil, false
+			return err, false
 		} else {
 			f, ok := nf.(api.Function)
 			if !ok {

+ 2 - 0
xsql/processors/extension_test.go

@@ -1,3 +1,5 @@
+// +build !windows
+
 package processors
 
 import (

+ 3 - 3
xsql/processors/xsql_processor_test.go

@@ -2948,9 +2948,9 @@ func getMetric(tp *xstream.TopologyNew, name string) int {
 
 func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
 	keys, values := tp.GetMetrics()
-	for i, k := range keys {
-		log.Printf("%s:%v", k, values[i])
-	}
+	//for i, k := range keys {
+	//	log.Printf("%s:%v", k, values[i])
+	//}
 	for k, v := range m {
 		var (
 			index   int

+ 83 - 0
xstream/server/server/rest.go

@@ -3,6 +3,7 @@ package server
 import (
 	"encoding/json"
 	"fmt"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/gorilla/mux"
 	"io"
@@ -60,6 +61,13 @@ func createRestServer(port int) *http.Server {
 	r.HandleFunc("/rules/{name}/stop", stopRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
 
+	r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete)
+	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete)
+
 	server := &http.Server{
 		Addr: fmt.Sprintf("0.0.0.0:%d", port),
 		// Good practice to set timeouts to avoid Slowloris attacks.
@@ -248,3 +256,78 @@ func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(fmt.Sprintf("Rule %s was restarted", name)))
 }
+
+func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
+	defer r.Body.Close()
+	switch r.Method {
+	case http.MethodGet:
+		content, err := pluginManager.List(t)
+		if err != nil {
+			handleError(w, fmt.Errorf("%s plugins list command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
+			return
+		}
+		jsonResponse(content, w, logger)
+	case http.MethodPost:
+		sd := plugins.Plugin{}
+		err := json.NewDecoder(r.Body).Decode(&sd)
+		// Problems decoding
+		if err != nil {
+			handleError(w, fmt.Errorf("Invalid body: Error decoding the %s plugin json: %v", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
+			return
+		}
+		err = pluginManager.Register(t, &sd)
+		if err != nil {
+			handleError(w, fmt.Errorf("%s plugins create command error: %s", plugins.PluginTypes[t], err), http.StatusBadRequest, logger)
+			return
+		}
+		w.WriteHeader(http.StatusCreated)
+		w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugins.PluginTypes[t], sd.Name)))
+	}
+}
+
+func pluginHandler(w http.ResponseWriter, r *http.Request, t plugins.PluginType) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	name := vars["name"]
+
+	switch r.Method {
+	case http.MethodDelete:
+		err := pluginManager.Delete(t, name)
+		if err != nil {
+			handleError(w, fmt.Errorf("delete %s plugin %s error: %s", plugins.PluginTypes[t], name, err), http.StatusBadRequest, logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(fmt.Sprintf("%s plugin %s is deleted", plugins.PluginTypes[t], name)))
+	}
+}
+
+//list or create source plugin
+func sourcesHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugins.SOURCE)
+}
+
+//delete a source plugin
+func sourceHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugins.SOURCE)
+}
+
+//list or create sink plugin
+func sinksHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugins.SINK)
+}
+
+//delete a sink plugin
+func sinkHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugins.SINK)
+}
+
+//list or create function plugin
+func functionsHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugins.FUNCTION)
+}
+
+//delete a function plugin
+func functionHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugins.FUNCTION)
+}

+ 6 - 0
xstream/server/server/server.go

@@ -3,6 +3,7 @@ package server
 import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/plugins"
 	"github.com/emqx/kuiper/xsql/processors"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"net"
@@ -17,6 +18,7 @@ var (
 
 	ruleProcessor   *processors.RuleProcessor
 	streamProcessor *processors.StreamProcessor
+	pluginManager   *plugins.Manager
 )
 
 func StartUp(Version string) {
@@ -31,6 +33,10 @@ func StartUp(Version string) {
 	}
 	ruleProcessor = processors.NewRuleProcessor(path.Dir(dataDir))
 	streamProcessor = processors.NewStreamProcessor(path.Join(path.Dir(dataDir), "stream"))
+	pluginManager, err = plugins.NewPluginManager()
+	if err != nil {
+		logger.Panic(err)
+	}
 
 	registry = &RuleRegistry{internal: make(map[string]*RuleState)}
 

+ 3 - 3
xstream/sinks/rest_sink.go

@@ -280,13 +280,13 @@ func (ms *RestSink) send(v interface{}, logger api.Logger) error {
 	}
 	logger.Debugf("do request: %s %s with %s", ms.method, ms.url, req.Body)
 	resp, err := ms.client.Do(req)
-	if resp.StatusCode < 200 || resp.StatusCode > 299 {
-		return fmt.Errorf("rest sink fails to err http return code: %d.", resp.StatusCode)
-	}
 	if err != nil {
 		return fmt.Errorf("rest sink fails to send out the data")
 	} else {
 		logger.Debugf("rest sink got response %v", resp)
+		if resp.StatusCode < 200 || resp.StatusCode > 299 {
+			return fmt.Errorf("rest sink fails to err http return code: %d.", resp.StatusCode)
+		}
 	}
 	return nil
 }

+ 9 - 0
xstream/util_test.go

@@ -36,3 +36,12 @@ func TestConf(t *testing.T) {
 	}
 
 }
+
+func TestConf2(t *testing.T) {
+	var file = "test/testconf.json"
+
+	if v, e := GetConfAsString(file, "conf_string"); e != nil || (v != "test") {
+		t.Errorf("Expect %s, actual %s; error is %s. \n", "test", v, e)
+	}
+
+}