Преглед изворни кода

feat(build): support to split the components and services

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang пре 3 година
родитељ
комит
0f6a2efd8c
38 измењених фајлова са 1792 додато и 1108 уклоњено
  1. 21 0
      Makefile
  2. 16 1
      internal/binder/factory.go
  3. 30 1
      internal/binder/factory_test.go
  4. 19 0
      internal/conf/funcMap.go
  5. 0 23
      internal/processor/rule.go
  6. 44 0
      internal/processor/rule_query.go
  7. 38 0
      internal/server/component.go
  8. 235 0
      internal/server/meta_init.go
  9. 122 0
      internal/server/meta_plugin_init.go
  10. 5 1
      internal/server/rest_test.go
  11. 192 0
      internal/server/plugin_init.go
  12. 99 0
      internal/server/portable_init.go
  13. 44 0
      internal/server/pprof_init.go
  14. 72 0
      internal/server/prome_init.go
  15. 6 582
      internal/server/rest.go
  16. 52 238
      internal/server/rpc.go
  17. 102 0
      internal/server/rpc_plugin.go
  18. 55 0
      internal/server/rpc_plugin_both.go
  19. 107 0
      internal/server/rpc_plugin_native.go
  20. 51 0
      internal/server/rpc_plugin_portable.go
  21. 118 0
      internal/server/rpc_service.go
  22. 0 0
      internal/server/rule_manager.go
  23. 20 88
      internal/server/server.go
  24. 140 0
      internal/server/service_init.go
  25. 37 0
      internal/server/tpl_init.go
  26. 0 73
      internal/template/funcs_test.go
  27. 2 2
      internal/topo/node/join_align_node.go
  28. 2 2
      internal/topo/node/operations.go
  29. 4 9
      internal/topo/node/prometheus.go
  30. 1 1
      internal/topo/node/sink_node.go
  31. 5 0
      internal/topo/node/sink_node_test.go
  32. 2 2
      internal/topo/node/source_node.go
  33. 17 72
      internal/topo/node/stats_manager.go
  34. 32 0
      internal/topo/node/stats_mem.go
  35. 87 0
      internal/topo/node/stats_prom.go
  36. 2 2
      internal/topo/node/window_op.go
  37. 10 8
      internal/template/funcs.go
  38. 3 3
      internal/topo/transform/template.go

+ 21 - 0
Makefile

@@ -55,6 +55,27 @@ build_with_edgex: build_prepare
 	@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
 	@echo "Build successfully"
 
+.PHONY: build_core
+build_core: build_prepare
+	GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags core -o kuiperd cmd/kuiperd/main.go
+	@if [ ! -z $$(which upx) ]; then upx ./kuiperd; fi
+	@mv ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
+	@echo "Build successfully"
+
+.PHONY: build_pprof
+build_pprof: build_prepare
+	GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags pprof -o kuiperd cmd/kuiperd/main.go
+	@if [ ! -z $$(which upx) ]; then upx ./kuiperd; fi
+	@mv ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
+	@echo "Build successfully"
+
+.PHONY: build_with_plugin
+build_with_plugin: build_prepare
+	GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags plugin -o kuiperd cmd/kuiperd/main.go
+	@if [ ! -z $$(which upx) ]; then upx ./kuiperd; fi
+	@mv ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
+	@echo "Build successfully"
+
 .PHONY: pkg_with_edgex
 pkg_with_edgex: build_with_edgex
 	@make real_pkg

+ 16 - 1
internal/binder/factory.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -37,4 +37,19 @@ type FuncFactory interface {
 type FactoryEntry struct {
 	Name    string
 	Factory interface{}
+	Weight  int // bigger weight will be initialized first
+}
+
+type Entries []FactoryEntry
+
+func (e Entries) Len() int {
+	return len(e)
+}
+
+func (e Entries) Less(i, j int) bool {
+	return e[i].Weight > e[j].Weight
+}
+
+func (e Entries) Swap(i, j int) {
+	e[i], e[j] = e[j], e[i]
 }

+ 30 - 1
internal/binder/factory_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@ package binder
 
 import (
 	"github.com/lf-edge/ekuiper/internal/binder/mock"
+	"reflect"
+	"sort"
 	"testing"
 )
 
@@ -29,3 +31,30 @@ func TestEntry(t *testing.T) {
 		t.Errorf("cannot instantiate FactoryEntry")
 	}
 }
+
+func TestEntriesSort(t *testing.T) {
+	m := mock.NewMockFactory()
+	e := FactoryEntry{
+		Name:    "mock",
+		Factory: m,
+		Weight:  10,
+	}
+	e2 := FactoryEntry{
+		Name:    "mock2",
+		Factory: m,
+		Weight:  5,
+	}
+	e3 := FactoryEntry{
+		Name:    "mock3",
+		Factory: m,
+		Weight:  8,
+	}
+	entries := Entries{e, e2, e3}
+	sort.Sort(entries)
+
+	expect := Entries{e, e3, e2}
+
+	if reflect.DeepEqual(entries, expect) == false {
+		t.Errorf("sort error, expect: %v, actual: %v", expect, entries)
+	}
+}

+ 19 - 0
internal/conf/funcMap.go

@@ -0,0 +1,19 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package conf
+
+import "text/template"
+
+var FuncMap template.FuncMap

+ 0 - 23
internal/processor/rule.go

@@ -20,9 +20,6 @@ import (
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
-	"github.com/lf-edge/ekuiper/internal/topo"
-	"github.com/lf-edge/ekuiper/internal/topo/node"
-	"github.com/lf-edge/ekuiper/internal/topo/planner"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -171,26 +168,6 @@ func (p *RuleProcessor) getRuleByJson(name, ruleJson string) (*api.Rule, error)
 	return rule, nil
 }
 
-func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*topo.Topo, error) {
-	if tp, err := planner.PlanWithSourcesAndSinks(p.getDefaultRule(ruleid, sql), nil, []*node.SinkNode{node.NewSinkNode("sink_memory_log", "logToMemory", nil)}); err != nil {
-		return nil, err
-	} else {
-		go func() {
-			select {
-			case err := <-tp.Open():
-				if err != nil {
-					log.Infof("closing query for error: %v", err)
-					tp.GetContext().SetError(err)
-					tp.Cancel()
-				} else {
-					log.Info("closing query")
-				}
-			}
-		}()
-		return tp, nil
-	}
-}
-
 func (p *RuleProcessor) ExecDesc(name string) (string, error) {
 	var s1 string
 	f, _ := p.db.Get(name, &s1)

+ 44 - 0
internal/processor/rule_query.go

@@ -0,0 +1,44 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build rpc || !core
+// +build rpc !core
+
+package processor
+
+import (
+	"github.com/lf-edge/ekuiper/internal/topo"
+	"github.com/lf-edge/ekuiper/internal/topo/node"
+	"github.com/lf-edge/ekuiper/internal/topo/planner"
+)
+
+func (p *RuleProcessor) ExecQuery(ruleid, sql string) (*topo.Topo, error) {
+	if tp, err := planner.PlanWithSourcesAndSinks(p.getDefaultRule(ruleid, sql), nil, []*node.SinkNode{node.NewSinkNode("sink_memory_log", "logToMemory", nil)}); err != nil {
+		return nil, err
+	} else {
+		go func() {
+			select {
+			case err := <-tp.Open():
+				if err != nil {
+					log.Infof("closing query for error: %v", err)
+					tp.GetContext().SetError(err)
+					tp.Cancel()
+				} else {
+					log.Info("closing query")
+				}
+			}
+		}()
+		return tp, nil
+	}
+}

+ 38 - 0
internal/server/component.go

@@ -0,0 +1,38 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/binder"
+)
+
+type component interface {
+	register()
+	rest(r *mux.Router)
+}
+
+type compServer interface {
+	serve()
+	close()
+}
+
+type restEndpoint func(r *mux.Router)
+
+var (
+	components = make(map[string]component)
+	servers    = make(map[string]compServer)
+	entries    binder.Entries
+)

+ 235 - 0
internal/server/meta_init.go

@@ -0,0 +1,235 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build ui || !core
+// +build ui !core
+
+package server
+
+import (
+	"fmt"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/meta"
+	"io/ioutil"
+	"net/http"
+)
+
+func init() {
+	components["meta"] = metaComp{}
+}
+
+var metaEndpoints []restEndpoint
+
+type metaComp struct {
+}
+
+func (m metaComp) register() {
+	// do nothing
+}
+
+func (m metaComp) rest(r *mux.Router) {
+	r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
+
+	r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
+	r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
+
+	for _, endpoint := range metaEndpoints {
+		endpoint(r)
+	}
+}
+
+//list sink plugin
+func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	sinks := meta.GetSinks()
+	jsonResponse(sinks, w, logger)
+	return
+}
+
+//Get sink metadata when creating rules
+func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+
+	language := getLanguage(r)
+	ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	}
+	jsonResponse(ptrMetadata, w, logger)
+}
+
+//list functions
+func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	sinks := meta.GetFunctions()
+	jsonResponse(sinks, w, logger)
+	return
+}
+
+//list source plugin
+func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	ret := meta.GetSourcesPlugins()
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
+}
+
+//list shareMeta
+func connectionsMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	ret := meta.GetConnectionPlugins()
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
+}
+
+//Get source metadata when creating stream
+func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	language := getLanguage(r)
+	ret, err := meta.GetSourceMeta(pluginName, language)
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	}
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
+}
+
+//Get source metadata when creating stream
+func connectionMetaHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	language := getLanguage(r)
+	ret, err := meta.GetConnectionMeta(pluginName, language)
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	}
+	if nil != ret {
+		jsonResponse(ret, w, logger)
+		return
+	}
+}
+
+//Get source yaml
+func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	language := getLanguage(r)
+	configOperatorKey := fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, pluginName)
+	ret, err := meta.GetYamlConf(configOperatorKey, language)
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	} else {
+		w.Write(ret)
+	}
+}
+
+//Get share yaml
+func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	language := getLanguage(r)
+	configOperatorKey := fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, pluginName)
+	ret, err := meta.GetYamlConf(configOperatorKey, language)
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	} else {
+		w.Write(ret)
+	}
+}
+
+//Add  del confkey
+func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
+
+	defer r.Body.Close()
+	var err error
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	confKey := vars["confKey"]
+	language := getLanguage(r)
+	switch r.Method {
+	case http.MethodDelete:
+		err = meta.DelSourceConfKey(pluginName, confKey, language)
+	case http.MethodPut:
+		v, err1 := ioutil.ReadAll(r.Body)
+		if err1 != nil {
+			handleError(w, err, "Invalid body", logger)
+			return
+		}
+		err = meta.AddSourceConfKey(pluginName, confKey, language, v)
+	}
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	}
+}
+
+//Add  del confkey
+func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
+
+	defer r.Body.Close()
+	var err error
+	vars := mux.Vars(r)
+	pluginName := vars["name"]
+	confKey := vars["confKey"]
+	language := getLanguage(r)
+	switch r.Method {
+	case http.MethodDelete:
+		err = meta.DelConnectionConfKey(pluginName, confKey, language)
+	case http.MethodPut:
+		v, err1 := ioutil.ReadAll(r.Body)
+		if err1 != nil {
+			handleError(w, err1, "Invalid body", logger)
+			return
+		}
+		err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
+	}
+	if err != nil {
+		handleError(w, err, "", logger)
+		return
+	}
+}
+
+func getLanguage(r *http.Request) string {
+	language := r.Header.Get("Content-Language")
+	if 0 == len(language) {
+		language = "en_US"
+	}
+	return language
+}

+ 122 - 0
internal/server/meta_plugin_init.go

@@ -0,0 +1,122 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (plugin || !core) && (ui || !core)
+// +build plugin !core
+// +build ui !core
+
+package server
+
+import (
+	"fmt"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/internal/plugin"
+	"net/http"
+	"os"
+	"runtime"
+	"strings"
+)
+
+// This must be and will be run after meta_init.go init()
+func init() {
+	metaEndpoints = append(metaEndpoints, func(r *mux.Router) {
+		r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
+		r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
+		r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
+	})
+}
+
+func prebuildSourcePlugins(w http.ResponseWriter, r *http.Request) {
+	prebuildPluginsHandler(w, r, plugin.SOURCE)
+}
+
+func prebuildSinkPlugins(w http.ResponseWriter, r *http.Request) {
+	prebuildPluginsHandler(w, r, plugin.SINK)
+}
+
+func prebuildFuncsPlugins(w http.ResponseWriter, r *http.Request) {
+	prebuildPluginsHandler(w, r, plugin.FUNCTION)
+}
+
+func isOffcialDockerImage() bool {
+	if !strings.EqualFold(os.Getenv("MAINTAINER"), "emqx.io") {
+		return false
+	}
+	return true
+}
+
+func prebuildPluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
+	emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
+	if !isOffcialDockerImage() {
+		handleError(w, fmt.Errorf(emsg), "", logger)
+		return
+	} else if runtime.GOOS == "linux" {
+		osrelease, err := Read()
+		if err != nil {
+			logger.Infof("")
+			return
+		}
+		prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
+		os := "debian"
+		if strings.Contains(prettyName, "DEBIAN") {
+			hosts := conf.Config.Basic.PluginHosts
+
+			if err, plugins := fetchPluginList(t, hosts, os, runtime.GOARCH); err != nil {
+				handleError(w, err, "", logger)
+			} else {
+				jsonResponse(plugins, w, logger)
+			}
+		} else {
+			handleError(w, fmt.Errorf(emsg), "", logger)
+			return
+		}
+	} else {
+		handleError(w, fmt.Errorf(emsg), "", logger)
+	}
+}
+
+var NativeSourcePlugin = []string{"random", "zmq"}
+var NativeSinkPlugin = []string{"file", "image", "influx", "redis", "tdengine", "zmq"}
+var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage"}
+
+func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, result map[string]string) {
+	ptype := "sources"
+	plugins := NativeSourcePlugin
+	if t == plugin.SINK {
+		ptype = "sinks"
+		plugins = NativeSinkPlugin
+	} else if t == plugin.FUNCTION {
+		ptype = "functions"
+		plugins = NativeFunctionPlugin
+	}
+
+	if hosts == "" || ptype == "" || os == "" {
+		logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
+		return fmt.Errorf("invalid configruation for plugin host in kuiper.yaml"), nil
+	}
+	result = make(map[string]string)
+	hostsArr := strings.Split(hosts, ",")
+	for _, host := range hostsArr {
+		host := strings.Trim(host, " ")
+		tmp := []string{host, "kuiper-plugins", version, os, ptype}
+		//The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
+		url := strings.Join(tmp, "/")
+
+		for _, p := range plugins {
+			result[p] = url + "/" + p + "_" + arch + ".zip"
+		}
+	}
+	return
+}

+ 5 - 1
internal/server/rest_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,6 +12,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+//go:build (plugin || !core) && (ui || !core)
+// +build plugin !core
+// +build ui !core
+
 package server
 
 import (

+ 192 - 0
internal/server/plugin_init.go

@@ -0,0 +1,192 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build plugin || !core
+// +build plugin !core
+
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/binder"
+	"github.com/lf-edge/ekuiper/internal/plugin"
+	"github.com/lf-edge/ekuiper/internal/plugin/native"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"net/http"
+)
+
+var nativeManager *native.Manager
+
+func init() {
+	components["plugin"] = pluginComp{}
+}
+
+type pluginComp struct{}
+
+func (p pluginComp) register() {
+	var err error
+	nativeManager, err = native.InitManager()
+	if err != nil {
+		panic(err)
+	}
+	entries = append(entries, binder.FactoryEntry{Name: "native plugin", Factory: nativeManager, Weight: 9})
+}
+
+func (p pluginComp) rest(r *mux.Router) {
+	r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
+	r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
+	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
+	r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
+	r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
+	r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
+}
+
+func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
+	defer r.Body.Close()
+	switch r.Method {
+	case http.MethodGet:
+		content := nativeManager.List(t)
+		jsonResponse(content, w, logger)
+	case http.MethodPost:
+		sd := plugin.NewPluginByType(t)
+		err := json.NewDecoder(r.Body).Decode(sd)
+		// Problems decoding
+		if err != nil {
+			handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
+			return
+		}
+		err = nativeManager.Register(t, sd)
+		if err != nil {
+			handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
+			return
+		}
+		w.WriteHeader(http.StatusCreated)
+		w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
+	}
+}
+
+func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	name := vars["name"]
+	cb := r.URL.Query().Get("stop")
+	switch r.Method {
+	case http.MethodDelete:
+		r := cb == "1"
+		err := nativeManager.Delete(t, name, r)
+		if err != nil {
+			handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
+		if r {
+			result = fmt.Sprintf("%s and Kuiper will be stopped", result)
+		} else {
+			result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
+		}
+		w.Write([]byte(result))
+	case http.MethodGet:
+		j, ok := nativeManager.GetPluginInfo(t, name)
+		if !ok {
+			handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
+			return
+		}
+		jsonResponse(j, w, logger)
+	}
+}
+
+//list or create source plugin
+func sourcesHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugin.SOURCE)
+}
+
+//delete a source plugin
+func sourceHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugin.SOURCE)
+}
+
+//list or create sink plugin
+func sinksHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugin.SINK)
+}
+
+//delete a sink plugin
+func sinkHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugin.SINK)
+}
+
+//list or create function plugin
+func functionsHandler(w http.ResponseWriter, r *http.Request) {
+	pluginsHandler(w, r, plugin.FUNCTION)
+}
+
+//list all user defined functions in all function plugins
+func functionsListHandler(w http.ResponseWriter, _ *http.Request) {
+	content := nativeManager.ListSymbols()
+	jsonResponse(content, w, logger)
+}
+
+func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
+	vars := mux.Vars(r)
+	name := vars["name"]
+	j, ok := nativeManager.GetPluginBySymbol(plugin.FUNCTION, name)
+	if !ok {
+		handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
+		return
+	}
+	jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
+}
+
+//delete a function plugin
+func functionHandler(w http.ResponseWriter, r *http.Request) {
+	pluginHandler(w, r, plugin.FUNCTION)
+}
+
+type functionList struct {
+	Functions []string `json:"functions,omitempty"`
+}
+
+// register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
+// either by create or register. If the function plugin has been loaded because of auto load through so file, the function
+// list MUST be registered by this API or only the function with the same name as the plugin can be used.
+func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	name := vars["name"]
+	_, ok := nativeManager.GetPluginInfo(plugin.FUNCTION, name)
+	if !ok {
+		handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
+		return
+	}
+	sd := functionList{}
+	err := json.NewDecoder(r.Body).Decode(&sd)
+	// Problems decoding
+	if err != nil {
+		handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
+		return
+	}
+	err = nativeManager.RegisterFuncs(name, sd.Functions)
+	if err != nil {
+		handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
+		return
+	}
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
+}

+ 99 - 0
internal/server/portable_init.go

@@ -0,0 +1,99 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build portable || !core
+// +build portable !core
+
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/binder"
+	"github.com/lf-edge/ekuiper/internal/plugin"
+	"github.com/lf-edge/ekuiper/internal/plugin/portable"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"net/http"
+)
+
+var portableManager *portable.Manager
+
+func init() {
+	components["portable"] = portableComp{}
+}
+
+type portableComp struct{}
+
+func (p portableComp) register() {
+	var err error
+	portableManager, err = portable.InitManager()
+	if err != nil {
+		panic(err)
+	}
+	entries = append(entries, binder.FactoryEntry{Name: "portable plugin", Factory: portableManager, Weight: 8})
+}
+
+func (p portableComp) rest(r *mux.Router) {
+	r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete)
+}
+
+func portablesHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	switch r.Method {
+	case http.MethodGet:
+		content := portableManager.List()
+		jsonResponse(content, w, logger)
+	case http.MethodPost:
+		sd := plugin.NewPluginByType(plugin.PORTABLE)
+		err := json.NewDecoder(r.Body).Decode(sd)
+		// Problems decoding
+		if err != nil {
+			handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
+			return
+		}
+		err = portableManager.Register(sd)
+		if err != nil {
+			handleError(w, err, "portable plugin create command error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusCreated)
+		w.Write([]byte(fmt.Sprintf("portable plugin %s is created", sd.GetName())))
+	}
+}
+
+func portableHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	name := vars["name"]
+	switch r.Method {
+	case http.MethodDelete:
+		err := portableManager.Delete(name)
+		if err != nil {
+			handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		result := fmt.Sprintf("portable plugin %s is deleted", name)
+		w.Write([]byte(result))
+	case http.MethodGet:
+		j, ok := portableManager.GetPluginInfo(name)
+		if !ok {
+			handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
+			return
+		}
+		jsonResponse(j, w, logger)
+	}
+}

+ 44 - 0
internal/server/pprof_init.go

@@ -0,0 +1,44 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build pprof
+// +build pprof
+
+package server
+
+import (
+	"log"
+	"net/http"
+	_ "net/http/pprof"
+	"os"
+)
+
+func init() {
+	servers["pprof"] = pprofComp{}
+}
+
+type pprofComp struct {
+	s *http.Server
+}
+
+func (p pprofComp) serve() {
+	if err := http.ListenAndServe(":6060", nil); err != nil {
+		log.Fatal(err)
+	}
+	os.Exit(0)
+}
+
+func (p pprofComp) close() {
+	// do nothing
+}

+ 72 - 0
internal/server/prome_init.go

@@ -0,0 +1,72 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build prometheus || !core
+// +build prometheus !core
+
+package server
+
+import (
+	"context"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+	"net/http"
+	"time"
+)
+
+func init() {
+	servers["prometheus"] = promeComp{}
+}
+
+type promeComp struct {
+	s *http.Server
+}
+
+func (p promeComp) serve() {
+	//Start prometheus service
+	if conf.Config.Basic.Prometheus {
+		portPrometheus := conf.Config.Basic.PrometheusPort
+		if portPrometheus <= 0 {
+			logger.Fatal("Miss configuration prometheusPort")
+		}
+		mux := http.NewServeMux()
+		mux.Handle("/metrics", promhttp.Handler())
+		srvPrometheus := &http.Server{
+			Addr:         fmt.Sprintf("0.0.0.0:%d", portPrometheus),
+			WriteTimeout: time.Second * 15,
+			ReadTimeout:  time.Second * 15,
+			IdleTimeout:  time.Second * 60,
+			Handler:      mux,
+		}
+		go func() {
+			if err := srvPrometheus.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+				logger.Fatal("Listen prometheus error: ", err)
+			}
+		}()
+		p.s = srvPrometheus
+		msg := fmt.Sprintf("Serving prometheus metrics on port http://localhost:%d/metrics", portPrometheus)
+		logger.Infof(msg)
+		fmt.Println(msg)
+	}
+}
+
+func (p promeComp) close() {
+	if p.s != nil {
+		if err := p.s.Shutdown(context.TODO()); err != nil {
+			logger.Errorf("prometheus server shutdown error: %v", err)
+		}
+		logger.Info("prometheus server successfully shutdown.")
+	}
+}

+ 6 - 582
internal/server/rest.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -19,20 +19,13 @@ import (
 	"fmt"
 	"github.com/gorilla/handlers"
 	"github.com/gorilla/mux"
-	"github.com/lf-edge/ekuiper/internal/conf"
-	"github.com/lf-edge/ekuiper/internal/meta"
-	"github.com/lf-edge/ekuiper/internal/plugin"
-	"github.com/lf-edge/ekuiper/internal/plugin/native"
-	"github.com/lf-edge/ekuiper/internal/plugin/portable"
 	"github.com/lf-edge/ekuiper/internal/server/middleware"
-	"github.com/lf-edge/ekuiper/internal/service"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"io"
 	"io/ioutil"
 	"net/http"
-	"os"
 	"runtime"
 	"strings"
 	"time"
@@ -106,39 +99,11 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
 	r.HandleFunc("/rules/{name}/restart", restartRuleHandler).Methods(http.MethodPost)
 	r.HandleFunc("/rules/{name}/topo", getTopoRuleHandler).Methods(http.MethodGet)
 
-	r.HandleFunc("/plugins/sources", sourcesHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/plugins/sources/prebuild", prebuildSourcePlugins).Methods(http.MethodGet)
-	r.HandleFunc("/plugins/sources/{name}", sourceHandler).Methods(http.MethodDelete, http.MethodGet)
-	r.HandleFunc("/plugins/sinks", sinksHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/plugins/sinks/prebuild", prebuildSinkPlugins).Methods(http.MethodGet)
-	r.HandleFunc("/plugins/sinks/{name}", sinkHandler).Methods(http.MethodDelete, http.MethodGet)
-	r.HandleFunc("/plugins/functions", functionsHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/plugins/functions/prebuild", prebuildFuncsPlugins).Methods(http.MethodGet)
-	r.HandleFunc("/plugins/functions/{name}", functionHandler).Methods(http.MethodDelete, http.MethodGet)
-	r.HandleFunc("/plugins/functions/{name}/register", functionRegisterHandler).Methods(http.MethodPost)
-	r.HandleFunc("/plugins/udfs", functionsListHandler).Methods(http.MethodGet)
-	r.HandleFunc("/plugins/udfs/{name}", functionsGetHandler).Methods(http.MethodGet)
-
-	r.HandleFunc("/plugins/portables", portablesHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/plugins/portables/{name}", portableHandler).Methods(http.MethodGet, http.MethodDelete)
-
-	r.HandleFunc("/metadata/functions", functionsMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/sinks", sinksMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/sinks/{name}", newSinkMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/sources", sourcesMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/sources/{name}", sourceMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/sources/yaml/{name}", sourceConfHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/sources/{name}/confKeys/{confKey}", sourceConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
-
-	r.HandleFunc("/metadata/connections", connectionsMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/connections/{name}", connectionMetaHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/connections/yaml/{name}", connectionConfHandler).Methods(http.MethodGet)
-	r.HandleFunc("/metadata/connections/{name}/confKeys/{confKey}", connectionConfKeyHandler).Methods(http.MethodDelete, http.MethodPut)
-
-	r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
-	r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
-	r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
-	r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
+	// Register extended routes
+	for k, v := range components {
+		logger.Infof("register rest endpoint for component %s", k)
+		v.rest(r)
+	}
 
 	if needToken {
 		r.Use(middleware.Auth)
@@ -432,544 +397,3 @@ func getTopoRuleHandler(w http.ResponseWriter, r *http.Request) {
 	w.Header().Set(ContentType, ContentTypeJSON)
 	w.Write([]byte(content))
 }
-
-func pluginsHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
-	pluginManager := native.GetManager()
-	defer r.Body.Close()
-	switch r.Method {
-	case http.MethodGet:
-		content := pluginManager.List(t)
-		jsonResponse(content, w, logger)
-	case http.MethodPost:
-		sd := plugin.NewPluginByType(t)
-		err := json.NewDecoder(r.Body).Decode(sd)
-		// Problems decoding
-		if err != nil {
-			handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the %s plugin json", plugin.PluginTypes[t]), logger)
-			return
-		}
-		err = pluginManager.Register(t, sd)
-		if err != nil {
-			handleError(w, err, fmt.Sprintf("%s plugins create command error", plugin.PluginTypes[t]), logger)
-			return
-		}
-		w.WriteHeader(http.StatusCreated)
-		w.Write([]byte(fmt.Sprintf("%s plugin %s is created", plugin.PluginTypes[t], sd.GetName())))
-	}
-}
-
-func pluginHandler(w http.ResponseWriter, r *http.Request, t plugin.PluginType) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	name := vars["name"]
-	cb := r.URL.Query().Get("stop")
-	pluginManager := native.GetManager()
-	switch r.Method {
-	case http.MethodDelete:
-		r := cb == "1"
-		err := pluginManager.Delete(t, name, r)
-		if err != nil {
-			handleError(w, err, fmt.Sprintf("delete %s plugin %s error", plugin.PluginTypes[t], name), logger)
-			return
-		}
-		w.WriteHeader(http.StatusOK)
-		result := fmt.Sprintf("%s plugin %s is deleted", plugin.PluginTypes[t], name)
-		if r {
-			result = fmt.Sprintf("%s and Kuiper will be stopped", result)
-		} else {
-			result = fmt.Sprintf("%s and Kuiper must restart for the change to take effect.", result)
-		}
-		w.Write([]byte(result))
-	case http.MethodGet:
-		j, ok := pluginManager.GetPluginInfo(t, name)
-		if !ok {
-			handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe %s plugin %s error", plugin.PluginTypes[t], name), logger)
-			return
-		}
-		jsonResponse(j, w, logger)
-	}
-}
-
-//list or create source plugin
-func sourcesHandler(w http.ResponseWriter, r *http.Request) {
-	pluginsHandler(w, r, plugin.SOURCE)
-}
-
-//delete a source plugin
-func sourceHandler(w http.ResponseWriter, r *http.Request) {
-	pluginHandler(w, r, plugin.SOURCE)
-}
-
-//list or create sink plugin
-func sinksHandler(w http.ResponseWriter, r *http.Request) {
-	pluginsHandler(w, r, plugin.SINK)
-}
-
-//delete a sink plugin
-func sinkHandler(w http.ResponseWriter, r *http.Request) {
-	pluginHandler(w, r, plugin.SINK)
-}
-
-//list or create function plugin
-func functionsHandler(w http.ResponseWriter, r *http.Request) {
-	pluginsHandler(w, r, plugin.FUNCTION)
-}
-
-//list all user defined functions in all function plugins
-func functionsListHandler(w http.ResponseWriter, _ *http.Request) {
-	pluginManager := native.GetManager()
-	content := pluginManager.ListSymbols()
-	jsonResponse(content, w, logger)
-}
-
-func functionsGetHandler(w http.ResponseWriter, r *http.Request) {
-	vars := mux.Vars(r)
-	name := vars["name"]
-	pluginManager := native.GetManager()
-	j, ok := pluginManager.GetPluginBySymbol(plugin.FUNCTION, name)
-	if !ok {
-		handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
-		return
-	}
-	jsonResponse(map[string]string{"name": name, "plugin": j}, w, logger)
-}
-
-//delete a function plugin
-func functionHandler(w http.ResponseWriter, r *http.Request) {
-	pluginHandler(w, r, plugin.FUNCTION)
-}
-
-type functionList struct {
-	Functions []string `json:"functions,omitempty"`
-}
-
-// register function list for function plugin. If a plugin exports multiple functions, the function list must be registered
-// either by create or register. If the function plugin has been loaded because of auto load through so file, the function
-// list MUST be registered by this API or only the function with the same name as the plugin can be used.
-func functionRegisterHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	name := vars["name"]
-	pluginManager := native.GetManager()
-	_, ok := pluginManager.GetPluginInfo(plugin.FUNCTION, name)
-	if !ok {
-		handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("register %s plugin %s error", plugin.PluginTypes[plugin.FUNCTION], name), logger)
-		return
-	}
-	sd := functionList{}
-	err := json.NewDecoder(r.Body).Decode(&sd)
-	// Problems decoding
-	if err != nil {
-		handleError(w, err, fmt.Sprintf("Invalid body: Error decoding the function list json %s", r.Body), logger)
-		return
-	}
-	err = pluginManager.RegisterFuncs(name, sd.Functions)
-	if err != nil {
-		handleError(w, err, fmt.Sprintf("function plugins %s regiser functions error", name), logger)
-		return
-	}
-	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(fmt.Sprintf("function plugin %s function list is registered", name)))
-}
-
-func portablesHandler(w http.ResponseWriter, r *http.Request) {
-	m := portable.GetManager()
-	defer r.Body.Close()
-	switch r.Method {
-	case http.MethodGet:
-		content := m.List()
-		jsonResponse(content, w, logger)
-	case http.MethodPost:
-		sd := plugin.NewPluginByType(plugin.PORTABLE)
-		err := json.NewDecoder(r.Body).Decode(sd)
-		// Problems decoding
-		if err != nil {
-			handleError(w, err, "Invalid body: Error decoding the portable plugin json", logger)
-			return
-		}
-		err = m.Register(sd)
-		if err != nil {
-			handleError(w, err, "portable plugin create command error", logger)
-			return
-		}
-		w.WriteHeader(http.StatusCreated)
-		w.Write([]byte(fmt.Sprintf("portable plugin %s is created", sd.GetName())))
-	}
-}
-
-func portableHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	name := vars["name"]
-	m := portable.GetManager()
-	switch r.Method {
-	case http.MethodDelete:
-		err := m.Delete(name)
-		if err != nil {
-			handleError(w, err, fmt.Sprintf("delete portable plugin %s error", name), logger)
-			return
-		}
-		w.WriteHeader(http.StatusOK)
-		result := fmt.Sprintf("portable plugin %s is deleted", name)
-		w.Write([]byte(result))
-	case http.MethodGet:
-		j, ok := m.GetPluginInfo(name)
-		if !ok {
-			handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe portable plugin %s error", name), logger)
-			return
-		}
-		jsonResponse(j, w, logger)
-	}
-}
-
-func prebuildSourcePlugins(w http.ResponseWriter, _ *http.Request) {
-	prebuildPluginsHandler(w, plugin.SOURCE)
-}
-
-func prebuildSinkPlugins(w http.ResponseWriter, _ *http.Request) {
-	prebuildPluginsHandler(w, plugin.SINK)
-}
-
-func prebuildFuncsPlugins(w http.ResponseWriter, _ *http.Request) {
-	prebuildPluginsHandler(w, plugin.FUNCTION)
-}
-
-func isOffcialDockerImage() bool {
-	if !strings.EqualFold(os.Getenv("MAINTAINER"), "emqx.io") {
-		return false
-	}
-	return true
-}
-
-func prebuildPluginsHandler(w http.ResponseWriter, t plugin.PluginType) {
-	emsg := "It's strongly recommended to install plugins at official released Debian Docker images. If you choose to proceed to install plugin, please make sure the plugin is already validated in your own build."
-	if !isOffcialDockerImage() {
-		handleError(w, fmt.Errorf(emsg), "", logger)
-		return
-	} else if runtime.GOOS == "linux" {
-		osrelease, err := Read()
-		if err != nil {
-			logger.Infof("")
-			return
-		}
-		prettyName := strings.ToUpper(osrelease["PRETTY_NAME"])
-		o := "debian"
-		if strings.Contains(prettyName, "DEBIAN") {
-			hosts := conf.Config.Basic.PluginHosts
-
-			if err, plugins := fetchPluginList(t, hosts, o, runtime.GOARCH); err != nil {
-				handleError(w, err, "", logger)
-			} else {
-				jsonResponse(plugins, w, logger)
-			}
-		} else {
-			handleError(w, fmt.Errorf(emsg), "", logger)
-			return
-		}
-	} else {
-		handleError(w, fmt.Errorf(emsg), "", logger)
-	}
-}
-
-var NativeSourcePlugin = []string{"random", "zmq"}
-var NativeSinkPlugin = []string{"file", "image", "influx", "redis", "tdengine", "zmq"}
-var NativeFunctionPlugin = []string{"accumulateWordCount", "countPlusOne", "echo", "geohash", "image", "labelImage"}
-
-func fetchPluginList(t plugin.PluginType, hosts, os, arch string) (err error, result map[string]string) {
-	ptype := "sources"
-	plugins := NativeSourcePlugin
-	if t == plugin.SINK {
-		ptype = "sinks"
-		plugins = NativeSinkPlugin
-	} else if t == plugin.FUNCTION {
-		ptype = "functions"
-		plugins = NativeFunctionPlugin
-	}
-
-	if hosts == "" || ptype == "" || os == "" {
-		logger.Errorf("Invalid parameter value: hosts %s, ptype %s or os: %s should not be empty.", hosts, ptype, os)
-		return fmt.Errorf("invalid configruation for plugin host in kuiper.yaml"), nil
-	}
-	result = make(map[string]string)
-	hostsArr := strings.Split(hosts, ",")
-	for _, host := range hostsArr {
-		host := strings.Trim(host, " ")
-		tmp := []string{host, "kuiper-plugins", version, os, ptype}
-		//The url is similar to http://host:port/kuiper-plugins/0.9.1/debian/sinks/
-		url := strings.Join(tmp, "/")
-
-		for _, p := range plugins {
-			result[p] = url + "/" + p + "_" + arch + ".zip"
-		}
-	}
-	return
-}
-
-//list sink plugin
-func sinksMetaHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	sinks := meta.GetSinks()
-	jsonResponse(sinks, w, logger)
-	return
-}
-
-//Get sink metadata when creating rules
-func newSinkMetaHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	pluginName := vars["name"]
-
-	language := getLanguage(r)
-	ptrMetadata, err := meta.GetSinkMeta(pluginName, language)
-	if err != nil {
-		handleError(w, err, "", logger)
-		return
-	}
-	jsonResponse(ptrMetadata, w, logger)
-}
-
-//list functions
-func functionsMetaHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	sinks := meta.GetFunctions()
-	jsonResponse(sinks, w, logger)
-	return
-}
-
-//list source plugin
-func sourcesMetaHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	ret := meta.GetSourcesPlugins()
-	if nil != ret {
-		jsonResponse(ret, w, logger)
-		return
-	}
-}
-
-//list shareMeta
-func connectionsMetaHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	ret := meta.GetConnectionPlugins()
-	if nil != ret {
-		jsonResponse(ret, w, logger)
-		return
-	}
-}
-
-//Get source metadata when creating stream
-func sourceMetaHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	pluginName := vars["name"]
-	language := getLanguage(r)
-	ret, err := meta.GetSourceMeta(pluginName, language)
-	if err != nil {
-		handleError(w, err, "", logger)
-		return
-	}
-	if nil != ret {
-		jsonResponse(ret, w, logger)
-		return
-	}
-}
-
-//Get source metadata when creating stream
-func connectionMetaHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	pluginName := vars["name"]
-	language := getLanguage(r)
-	ret, err := meta.GetConnectionMeta(pluginName, language)
-	if err != nil {
-		handleError(w, err, "", logger)
-		return
-	}
-	if nil != ret {
-		jsonResponse(ret, w, logger)
-		return
-	}
-}
-
-//Get source yaml
-func sourceConfHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	pluginName := vars["name"]
-	language := getLanguage(r)
-	configOperatorKey := fmt.Sprintf(meta.SourceCfgOperatorKeyTemplate, pluginName)
-	ret, err := meta.GetYamlConf(configOperatorKey, language)
-	if err != nil {
-		handleError(w, err, "", logger)
-		return
-	} else {
-		w.Write(ret)
-	}
-}
-
-//Get share yaml
-func connectionConfHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	pluginName := vars["name"]
-	language := getLanguage(r)
-	configOperatorKey := fmt.Sprintf(meta.ConnectionCfgOperatorKeyTemplate, pluginName)
-	ret, err := meta.GetYamlConf(configOperatorKey, language)
-	if err != nil {
-		handleError(w, err, "", logger)
-		return
-	} else {
-		w.Write(ret)
-	}
-}
-
-//Add  del confkey
-func sourceConfKeyHandler(w http.ResponseWriter, r *http.Request) {
-
-	defer r.Body.Close()
-	var err error
-	vars := mux.Vars(r)
-	pluginName := vars["name"]
-	confKey := vars["confKey"]
-	language := getLanguage(r)
-	switch r.Method {
-	case http.MethodDelete:
-		err = meta.DelSourceConfKey(pluginName, confKey, language)
-	case http.MethodPut:
-		v, err1 := ioutil.ReadAll(r.Body)
-		if err1 != nil {
-			handleError(w, err, "Invalid body", logger)
-			return
-		}
-		err = meta.AddSourceConfKey(pluginName, confKey, language, v)
-	}
-	if err != nil {
-		handleError(w, err, "", logger)
-		return
-	}
-}
-
-//Add  del confkey
-func connectionConfKeyHandler(w http.ResponseWriter, r *http.Request) {
-
-	defer r.Body.Close()
-	var err error
-	vars := mux.Vars(r)
-	pluginName := vars["name"]
-	confKey := vars["confKey"]
-	language := getLanguage(r)
-	switch r.Method {
-	case http.MethodDelete:
-		err = meta.DelConnectionConfKey(pluginName, confKey, language)
-	case http.MethodPut:
-		v, err1 := ioutil.ReadAll(r.Body)
-		if err1 != nil {
-			handleError(w, err1, "Invalid body", logger)
-			return
-		}
-		err = meta.AddConnectionConfKey(pluginName, confKey, language, v)
-	}
-	if err != nil {
-		handleError(w, err, "", logger)
-		return
-	}
-}
-
-func getLanguage(r *http.Request) string {
-	language := r.Header.Get("Content-Language")
-	if 0 == len(language) {
-		language = "en_US"
-	}
-	return language
-}
-
-func servicesHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	serviceManager := service.GetManager()
-	switch r.Method {
-	case http.MethodGet:
-		content, err := serviceManager.List()
-		if err != nil {
-			handleError(w, err, "service list command error", logger)
-			return
-		}
-		jsonResponse(content, w, logger)
-	case http.MethodPost:
-		sd := &service.ServiceCreationRequest{}
-		err := json.NewDecoder(r.Body).Decode(sd)
-		// Problems decoding
-		if err != nil {
-			handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
-			return
-		}
-		err = serviceManager.Create(sd)
-		if err != nil {
-			handleError(w, err, "service create command error", logger)
-			return
-		}
-		w.WriteHeader(http.StatusCreated)
-		w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
-	}
-}
-
-func serviceHandler(w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-	vars := mux.Vars(r)
-	name := vars["name"]
-	serviceManager := service.GetManager()
-	switch r.Method {
-	case http.MethodDelete:
-		err := serviceManager.Delete(name)
-		if err != nil {
-			handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
-			return
-		}
-		w.WriteHeader(http.StatusOK)
-		result := fmt.Sprintf("service %s is deleted", name)
-		w.Write([]byte(result))
-	case http.MethodGet:
-		j, err := serviceManager.Get(name)
-		if err != nil {
-			handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
-			return
-		}
-		jsonResponse(j, w, logger)
-	case http.MethodPut:
-		sd := &service.ServiceCreationRequest{}
-		err := json.NewDecoder(r.Body).Decode(sd)
-		// Problems decoding
-		if err != nil {
-			handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
-			return
-		}
-		sd.Name = name
-		err = serviceManager.Update(sd)
-		if err != nil {
-			handleError(w, err, "service update command error", logger)
-			return
-		}
-		w.WriteHeader(http.StatusOK)
-		w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
-	}
-}
-
-func serviceFunctionsHandler(w http.ResponseWriter, _ *http.Request) {
-	serviceManager := service.GetManager()
-	content, err := serviceManager.ListFunctions()
-	if err != nil {
-		handleError(w, err, "service list command error", logger)
-		return
-	}
-	jsonResponse(content, w, logger)
-}
-
-func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
-	vars := mux.Vars(r)
-	name := vars["name"]
-	serviceManager := service.GetManager()
-	j, err := serviceManager.GetFunction(name)
-	if err != nil {
-		handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
-		return
-	}
-	jsonResponse(j, w, logger)
-}

+ 52 - 238
internal/server/rpc.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,24 +12,71 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+//go:build rpc || !core
+// +build rpc !core
+
 package server
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/model"
-	"github.com/lf-edge/ekuiper/internal/plugin"
-	"github.com/lf-edge/ekuiper/internal/plugin/native"
-	"github.com/lf-edge/ekuiper/internal/plugin/portable"
-	"github.com/lf-edge/ekuiper/internal/service"
 	"github.com/lf-edge/ekuiper/internal/topo/sink"
+	"net/http"
+	"net/rpc"
 	"strings"
 	"time"
 )
 
 const QueryRuleId = "internal-ekuiper_query_rule"
 
+func init() {
+	servers["rpc"] = rpcComp{}
+}
+
+type rpcComp struct {
+	s *http.Server
+}
+
+func (r rpcComp) register() {}
+
+func (r rpcComp) serve() {
+	// Start rpc service
+	server := new(Server)
+	portRpc := conf.Config.Basic.Port
+	ipRpc := conf.Config.Basic.Ip
+	rpcSrv := rpc.NewServer()
+	err := rpcSrv.Register(server)
+	if err != nil {
+		logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
+	}
+	srvRpc := &http.Server{
+		Addr:         fmt.Sprintf("%s:%d", ipRpc, portRpc),
+		WriteTimeout: time.Second * 15,
+		ReadTimeout:  time.Second * 15,
+		IdleTimeout:  time.Second * 60,
+		Handler:      rpcSrv,
+	}
+	r.s = srvRpc
+	go func() {
+		if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
+			logger.Fatal("Error serving rpc service:", err)
+		}
+	}()
+}
+
+func (r rpcComp) close() {
+	if r.s != nil {
+		if err := r.s.Shutdown(context.TODO()); err != nil {
+			logger.Errorf("rpc server shutdown error: %v", err)
+		}
+		logger.Info("rpc server shutdown.")
+	}
+}
+
 type Server int
 
 func (t *Server) CreateQuery(sql string, reply *string) error {
@@ -203,228 +250,6 @@ func (t *Server) DropRule(name string, reply *string) error {
 	return nil
 }
 
-func (t *Server) CreatePlugin(arg *model.PluginDesc, reply *string) error {
-	pt := plugin.PluginType(arg.Type)
-	p, err := getPluginByJson(arg, pt)
-	if err != nil {
-		return fmt.Errorf("Create plugin error: %s", err)
-	}
-	if p.GetFile() == "" {
-		return fmt.Errorf("Create plugin error: Missing plugin file url.")
-	}
-	if pt == plugin.PORTABLE {
-		err = portable.GetManager().Register(p)
-	} else {
-		err = native.GetManager().Register(pt, p)
-	}
-	if err != nil {
-		return fmt.Errorf("Create plugin error: %s", err)
-	} else {
-		*reply = fmt.Sprintf("Plugin %s is created.", p.GetName())
-	}
-	return nil
-}
-
-func (t *Server) RegisterPlugin(arg *model.PluginDesc, reply *string) error {
-	p, err := getPluginByJson(arg, plugin.FUNCTION)
-	if err != nil {
-		return fmt.Errorf("Register plugin functions error: %s", err)
-	}
-	if len(p.GetSymbols()) == 0 {
-		return fmt.Errorf("Register plugin functions error: Missing function list.")
-	}
-	err = native.GetManager().RegisterFuncs(p.GetName(), p.GetSymbols())
-	if err != nil {
-		return fmt.Errorf("Create plugin error: %s", err)
-	} else {
-		*reply = fmt.Sprintf("Plugin %s is created.", p.GetName())
-	}
-	return nil
-}
-
-func (t *Server) DropPlugin(arg *model.PluginDesc, reply *string) error {
-	pt := plugin.PluginType(arg.Type)
-	p, err := getPluginByJson(arg, pt)
-	if err != nil {
-		return fmt.Errorf("Drop plugin error: %s", err)
-	}
-	if pt == plugin.PORTABLE {
-		err = portable.GetManager().Delete(p.GetName())
-		if err != nil {
-			return fmt.Errorf("Drop plugin error: %s", err)
-		} else {
-			*reply = fmt.Sprintf("Plugin %s is dropped .", p.GetName())
-		}
-	} else {
-		err = native.GetManager().Delete(pt, p.GetName(), arg.Stop)
-		if err != nil {
-			return fmt.Errorf("Drop plugin error: %s", err)
-		} else {
-			if arg.Stop {
-				*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.GetName())
-			} else {
-				*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper must restart for the change to take effect.", p.GetName())
-			}
-		}
-	}
-	return nil
-}
-
-func (t *Server) ShowPlugins(arg int, reply *string) error {
-	pt := plugin.PluginType(arg)
-	l := native.GetManager().List(pt)
-	if len(l) == 0 {
-		l = append(l, "No plugin is found.")
-	}
-	*reply = strings.Join(l, "\n")
-	return nil
-}
-
-func (t *Server) ShowUdfs(_ int, reply *string) error {
-	l := native.GetManager().ListSymbols()
-	if len(l) == 0 {
-		l = append(l, "No udf is found.")
-	}
-	*reply = strings.Join(l, "\n")
-	return nil
-}
-
-func (t *Server) DescPlugin(arg *model.PluginDesc, reply *string) error {
-	pt := plugin.PluginType(arg.Type)
-	p, err := getPluginByJson(arg, pt)
-	if err != nil {
-		return fmt.Errorf("Describe plugin error: %s", err)
-	}
-	var m interface{}
-	var ok bool
-	if pt == plugin.PORTABLE {
-		m, ok = portable.GetManager().GetPluginInfo(p.GetName())
-	} else {
-		m, ok = native.GetManager().GetPluginInfo(pt, p.GetName())
-	}
-	if !ok {
-		return fmt.Errorf("Describe plugin error: not found")
-	} else {
-		r, err := marshalDesc(m)
-		if err != nil {
-			return fmt.Errorf("Describe plugin error: %v", err)
-		}
-		*reply = r
-	}
-	return nil
-}
-
-func (t *Server) DescUdf(arg string, reply *string) error {
-	m, ok := native.GetManager().GetPluginBySymbol(plugin.FUNCTION, arg)
-	if !ok {
-		return fmt.Errorf("Describe udf error: not found")
-	} else {
-		j := map[string]string{
-			"name":   arg,
-			"plugin": m,
-		}
-		r, err := marshalDesc(j)
-		if err != nil {
-			return fmt.Errorf("Describe udf error: %v", err)
-		}
-		*reply = r
-	}
-	return nil
-}
-
-func (t *Server) CreateService(arg *model.RPCArgDesc, reply *string) error {
-	sd := &service.ServiceCreationRequest{}
-	if arg.Json != "" {
-		if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
-			return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
-		}
-	}
-	if sd.Name != arg.Name {
-		return fmt.Errorf("Create service error: name mismatch.")
-	}
-	if sd.File == "" {
-		return fmt.Errorf("Create service error: Missing service file url.")
-	}
-	err := service.GetManager().Create(sd)
-	if err != nil {
-		return fmt.Errorf("Create service error: %s", err)
-	} else {
-		*reply = fmt.Sprintf("Service %s is created.", arg.Name)
-	}
-	return nil
-}
-
-func (t *Server) DescService(name string, reply *string) error {
-	s, err := service.GetManager().Get(name)
-	if err != nil {
-		return fmt.Errorf("Desc service error : %s.", err)
-	} else {
-		r, err := marshalDesc(s)
-		if err != nil {
-			return fmt.Errorf("Describe service error: %v", err)
-		}
-		*reply = r
-	}
-	return nil
-}
-
-func (t *Server) DescServiceFunc(name string, reply *string) error {
-	s, err := service.GetManager().GetFunction(name)
-	if err != nil {
-		return fmt.Errorf("Desc service func error : %s.", err)
-	} else {
-		r, err := marshalDesc(s)
-		if err != nil {
-			return fmt.Errorf("Describe service func error: %v", err)
-		}
-		*reply = r
-	}
-	return nil
-}
-
-func (t *Server) DropService(name string, reply *string) error {
-	err := service.GetManager().Delete(name)
-	if err != nil {
-		return fmt.Errorf("Drop service error : %s.", err)
-	}
-	*reply = fmt.Sprintf("Service %s is dropped", name)
-	return nil
-}
-
-func (t *Server) ShowServices(_ int, reply *string) error {
-	s, err := service.GetManager().List()
-	if err != nil {
-		return fmt.Errorf("Show service error: %s.", err)
-	}
-	if len(s) == 0 {
-		*reply = "No service definitions are found."
-	} else {
-		r, err := marshalDesc(s)
-		if err != nil {
-			return fmt.Errorf("Show service error: %v", err)
-		}
-		*reply = r
-	}
-	return nil
-}
-
-func (t *Server) ShowServiceFuncs(_ int, reply *string) error {
-	s, err := service.GetManager().ListFunctions()
-	if err != nil {
-		return fmt.Errorf("Show service funcs error: %s.", err)
-	}
-	if len(s) == 0 {
-		*reply = "No service definitions are found."
-	} else {
-		r, err := marshalDesc(s)
-		if err != nil {
-			return fmt.Errorf("Show service funcs error: %v", err)
-		}
-		*reply = r
-	}
-	return nil
-}
-
 func marshalDesc(m interface{}) (string, error) {
 	s, err := json.Marshal(m)
 	if err != nil {
@@ -437,17 +262,6 @@ func marshalDesc(m interface{}) (string, error) {
 	return dst.String(), nil
 }
 
-func getPluginByJson(arg *model.PluginDesc, pt plugin.PluginType) (plugin.Plugin, error) {
-	p := plugin.NewPluginByType(pt)
-	if arg.Json != "" {
-		if err := json.Unmarshal([]byte(arg.Json), p); err != nil {
-			return nil, fmt.Errorf("Parse plugin %s error : %s.", arg.Json, err)
-		}
-	}
-	p.SetName(arg.Name)
-	return p, nil
-}
-
 func init() {
 	ticker := time.NewTicker(time.Second * 5)
 	go func() {

+ 102 - 0
internal/server/rpc_plugin.go

@@ -0,0 +1,102 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build (rpc || !core) && (plugin || portable || !core)
+// +build rpc !core
+// +build plugin portable !core
+
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/internal/plugin"
+)
+
+func (t *Server) CreatePlugin(arg *model.PluginDesc, reply *string) error {
+	pt := plugin.PluginType(arg.Type)
+	p, err := getPluginByJson(arg, pt)
+	if err != nil {
+		return fmt.Errorf("Create plugin error: %s", err)
+	}
+	if p.GetFile() == "" {
+		return fmt.Errorf("Create plugin error: Missing plugin file url.")
+	}
+	// define according to the build tag
+	err = t.doRegister(pt, p)
+	if err != nil {
+		return err
+	}
+	if err != nil {
+		return fmt.Errorf("Create plugin error: %s", err)
+	} else {
+		*reply = fmt.Sprintf("Plugin %s is created.", p.GetName())
+	}
+	return nil
+}
+
+func (t *Server) DropPlugin(arg *model.PluginDesc, reply *string) error {
+	pt := plugin.PluginType(arg.Type)
+	p, err := getPluginByJson(arg, pt)
+	if err != nil {
+		return fmt.Errorf("Drop plugin error: %s", err)
+	}
+	err = t.doDelete(pt, p.GetName(), arg.Stop)
+	if err != nil {
+		return fmt.Errorf("Drop plugin error: %s", err)
+	} else {
+		if pt == plugin.PORTABLE {
+			*reply = fmt.Sprintf("Plugin %s is dropped .", p.GetName())
+		} else {
+			if arg.Stop {
+				*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.GetName())
+			} else {
+				*reply = fmt.Sprintf("Plugin %s is dropped and Kuiper must restart for the change to take effect.", p.GetName())
+			}
+		}
+	}
+
+	return nil
+}
+
+func (t *Server) DescPlugin(arg *model.PluginDesc, reply *string) error {
+	pt := plugin.PluginType(arg.Type)
+	p, err := getPluginByJson(arg, pt)
+	if err != nil {
+		return fmt.Errorf("Describe plugin error: %s", err)
+	}
+	m, err := t.doDesc(pt, p.GetName())
+	if err != nil {
+		return fmt.Errorf("Describe plugin error: %s", err)
+	} else {
+		r, err := marshalDesc(m)
+		if err != nil {
+			return fmt.Errorf("Describe plugin error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}
+
+func getPluginByJson(arg *model.PluginDesc, pt plugin.PluginType) (plugin.Plugin, error) {
+	p := plugin.NewPluginByType(pt)
+	if arg.Json != "" {
+		if err := json.Unmarshal([]byte(arg.Json), p); err != nil {
+			return nil, fmt.Errorf("Parse plugin %s error : %s.", arg.Json, err)
+		}
+	}
+	p.SetName(arg.Name)
+	return p, nil
+}

+ 55 - 0
internal/server/rpc_plugin_both.go

@@ -0,0 +1,55 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !core || (rpc && portable && plugin)
+// +build !core rpc,portable,plugin
+
+package server
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/plugin"
+)
+
+func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error {
+	if pt == plugin.PORTABLE {
+		return portableManager.Register(p)
+	} else {
+		return nativeManager.Register(pt, p)
+	}
+}
+
+func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error {
+	if pt == plugin.PORTABLE {
+		return portableManager.Delete(name)
+	} else {
+		return nativeManager.Delete(pt, name, stopRun)
+	}
+}
+
+func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) {
+	var (
+		result interface{}
+		ok     bool
+	)
+	if pt == plugin.PORTABLE {
+		result, ok = portableManager.GetPluginInfo(name)
+	} else {
+		result, ok = nativeManager.GetPluginInfo(pt, name)
+	}
+	if !ok {
+		return nil, fmt.Errorf("not found")
+	}
+	return result, nil
+}

+ 107 - 0
internal/server/rpc_plugin_native.go

@@ -0,0 +1,107 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build plugin && rpc && core && !portable
+// +build plugin,rpc,core,!portable
+
+package server
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/internal/plugin"
+	"strings"
+)
+
+func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error {
+	if pt == plugin.PORTABLE {
+		return fmt.Errorf("portable plugin support is disabled")
+	} else {
+		return nativeManager.Register(pt, p)
+	}
+}
+
+func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error {
+	if pt == plugin.PORTABLE {
+		return fmt.Errorf("portable plugin support is disabled")
+	} else {
+		return nativeManager.Delete(pt, name, stopRun)
+	}
+}
+
+func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) {
+	if pt == plugin.PORTABLE {
+		return nil, fmt.Errorf("portable plugin support is disabled")
+	} else {
+		r, ok := nativeManager.GetPluginInfo(pt, name)
+		if !ok {
+			return nil, fmt.Errorf("not found")
+		}
+		return r, nil
+	}
+}
+
+func (t *Server) RegisterPlugin(arg *model.PluginDesc, reply *string) error {
+	p, err := getPluginByJson(arg, plugin.FUNCTION)
+	if err != nil {
+		return fmt.Errorf("Register plugin functions error: %s", err)
+	}
+	if len(p.GetSymbols()) == 0 {
+		return fmt.Errorf("Register plugin functions error: Missing function list.")
+	}
+	err = nativeManager.RegisterFuncs(p.GetName(), p.GetSymbols())
+	if err != nil {
+		return fmt.Errorf("Create plugin error: %s", err)
+	} else {
+		*reply = fmt.Sprintf("Plugin %s is created.", p.GetName())
+	}
+	return nil
+}
+
+func (t *Server) ShowPlugins(arg int, reply *string) error {
+	pt := plugin.PluginType(arg)
+	l := nativeManager.List(pt)
+	if len(l) == 0 {
+		l = append(l, "No plugin is found.")
+	}
+	*reply = strings.Join(l, "\n")
+	return nil
+}
+
+func (t *Server) ShowUdfs(_ int, reply *string) error {
+	l := nativeManager.ListSymbols()
+	if len(l) == 0 {
+		l = append(l, "No udf is found.")
+	}
+	*reply = strings.Join(l, "\n")
+	return nil
+}
+
+func (t *Server) DescUdf(arg string, reply *string) error {
+	m, ok := nativeManager.GetPluginBySymbol(plugin.FUNCTION, arg)
+	if !ok {
+		return fmt.Errorf("Describe udf error: not found")
+	} else {
+		j := map[string]string{
+			"name":   arg,
+			"plugin": m,
+		}
+		r, err := marshalDesc(j)
+		if err != nil {
+			return fmt.Errorf("Describe udf error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}

+ 51 - 0
internal/server/rpc_plugin_portable.go

@@ -0,0 +1,51 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build portable && rpc && core && !plugin
+// +build portable,rpc,core,!plugin
+
+package server
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/plugin"
+)
+
+func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error {
+	if pt == plugin.PORTABLE {
+		return portableManager.Register(p)
+	} else {
+		return fmt.Errorf("native plugin support is disabled")
+	}
+}
+
+func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error {
+	if pt == plugin.PORTABLE {
+		return portableManager.Delete(name)
+	} else {
+		return fmt.Errorf("native plugin support is disabled")
+	}
+}
+
+func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) {
+	if pt == plugin.PORTABLE {
+		r, ok := portableManager.GetPluginInfo(name)
+		if !ok {
+			return nil, fmt.Errorf("not found")
+		}
+		return r, nil
+	} else {
+		return nil, fmt.Errorf("native plugin support is disabled")
+	}
+}

+ 118 - 0
internal/server/rpc_service.go

@@ -0,0 +1,118 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !core || (rpc && service)
+// +build !core rpc,service
+
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/pkg/model"
+	"github.com/lf-edge/ekuiper/internal/service"
+)
+
+func (t *Server) CreateService(arg *model.RPCArgDesc, reply *string) error {
+	sd := &service.ServiceCreationRequest{}
+	if arg.Json != "" {
+		if err := json.Unmarshal([]byte(arg.Json), sd); err != nil {
+			return fmt.Errorf("Parse service %s error : %s.", arg.Json, err)
+		}
+	}
+	if sd.Name != arg.Name {
+		return fmt.Errorf("Create service error: name mismatch.")
+	}
+	if sd.File == "" {
+		return fmt.Errorf("Create service error: Missing service file url.")
+	}
+	err := serviceManager.Create(sd)
+	if err != nil {
+		return fmt.Errorf("Create service error: %s", err)
+	} else {
+		*reply = fmt.Sprintf("Service %s is created.", arg.Name)
+	}
+	return nil
+}
+
+func (t *Server) DescService(name string, reply *string) error {
+	s, err := serviceManager.Get(name)
+	if err != nil {
+		return fmt.Errorf("Desc service error : %s.", err)
+	} else {
+		r, err := marshalDesc(s)
+		if err != nil {
+			return fmt.Errorf("Describe service error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}
+
+func (t *Server) DescServiceFunc(name string, reply *string) error {
+	s, err := serviceManager.GetFunction(name)
+	if err != nil {
+		return fmt.Errorf("Desc service func error : %s.", err)
+	} else {
+		r, err := marshalDesc(s)
+		if err != nil {
+			return fmt.Errorf("Describe service func error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}
+
+func (t *Server) DropService(name string, reply *string) error {
+	err := serviceManager.Delete(name)
+	if err != nil {
+		return fmt.Errorf("Drop service error : %s.", err)
+	}
+	*reply = fmt.Sprintf("Service %s is dropped", name)
+	return nil
+}
+
+func (t *Server) ShowServices(_ int, reply *string) error {
+	s, err := serviceManager.List()
+	if err != nil {
+		return fmt.Errorf("Show service error: %s.", err)
+	}
+	if len(s) == 0 {
+		*reply = "No service definitions are found."
+	} else {
+		r, err := marshalDesc(s)
+		if err != nil {
+			return fmt.Errorf("Show service error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}
+
+func (t *Server) ShowServiceFuncs(_ int, reply *string) error {
+	s, err := serviceManager.ListFunctions()
+	if err != nil {
+		return fmt.Errorf("Show service funcs error: %s.", err)
+	}
+	if len(s) == 0 {
+		*reply = "No service definitions are found."
+	} else {
+		r, err := marshalDesc(s)
+		if err != nil {
+			return fmt.Errorf("Show service funcs error: %v", err)
+		}
+		*reply = r
+	}
+	return nil
+}

internal/server/ruleManager.go → internal/server/rule_manager.go


+ 20 - 88
internal/server/server.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -15,26 +15,19 @@
 package server
 
 import (
-	"github.com/lf-edge/ekuiper/internal/binder"
+	"context"
+	"fmt"
 	"github.com/lf-edge/ekuiper/internal/binder/function"
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/binder/meta"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
-	"github.com/lf-edge/ekuiper/internal/plugin/native"
-	"github.com/lf-edge/ekuiper/internal/plugin/portable"
-	"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
 	"github.com/lf-edge/ekuiper/internal/processor"
-	"github.com/lf-edge/ekuiper/internal/service"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
-	"github.com/prometheus/client_golang/prometheus/promhttp"
-
-	"context"
-	"fmt"
 	"net/http"
-	"net/rpc"
 	"os"
 	"os/signal"
+	"sort"
 	"syscall"
 	"time"
 )
@@ -62,24 +55,14 @@ func StartUp(Version, LoadFileType string) {
 	ruleProcessor = processor.NewRuleProcessor()
 	streamProcessor = processor.NewStreamProcessor()
 
-	// Bind the source, function, sink
-	nativeManager, err := native.InitManager()
-	if err != nil {
-		panic(err)
-	}
-	portableManager, err := portable.InitManager()
-	if err != nil {
-		panic(err)
-	}
-	serviceManager, err := service.InitManager()
-	if err != nil {
-		panic(err)
-	}
-	entries := []binder.FactoryEntry{
-		{Name: "native plugin", Factory: nativeManager},
-		{Name: "portable plugin", Factory: portableManager},
-		{Name: "external service", Factory: serviceManager},
+	// register all extensions
+	for k, v := range components {
+		logger.Infof("register component %s", k)
+		v.register()
 	}
+
+	// Bind the source, function, sink
+	sort.Sort(entries)
 	err = function.Initialize(entries)
 	if err != nil {
 		panic(err)
@@ -91,8 +74,6 @@ func StartUp(Version, LoadFileType string) {
 	meta.Bind()
 
 	registry = &RuleRegistry{internal: make(map[string]*RuleState)}
-
-	server := new(Server)
 	//Start rules
 	if rules, err := ruleProcessor.GetAllRules(); err != nil {
 		logger.Infof("Start rules error: %s", err)
@@ -108,32 +89,6 @@ func StartUp(Version, LoadFileType string) {
 		}
 	}
 
-	//Start prometheus service
-	var srvPrometheus *http.Server = nil
-	if conf.Config.Basic.Prometheus {
-		portPrometheus := conf.Config.Basic.PrometheusPort
-		if portPrometheus <= 0 {
-			logger.Fatal("Miss configuration prometheusPort")
-		}
-		mux := http.NewServeMux()
-		mux.Handle("/metrics", promhttp.Handler())
-		srvPrometheus = &http.Server{
-			Addr:         fmt.Sprintf("0.0.0.0:%d", portPrometheus),
-			WriteTimeout: time.Second * 15,
-			ReadTimeout:  time.Second * 15,
-			IdleTimeout:  time.Second * 60,
-			Handler:      mux,
-		}
-		go func() {
-			if err := srvPrometheus.ListenAndServe(); err != nil && err != http.ErrServerClosed {
-				logger.Fatal("Listen prometheus error: ", err)
-			}
-		}()
-		msg := fmt.Sprintf("Serving prometheus metrics on port http://localhost:%d/metrics", portPrometheus)
-		logger.Infof(msg)
-		fmt.Println(msg)
-	}
-
 	//Start rest service
 	srvRest := createRestServer(conf.Config.Basic.RestIp, conf.Config.Basic.RestPort, conf.Config.Basic.Authentication)
 	go func() {
@@ -144,30 +99,15 @@ func StartUp(Version, LoadFileType string) {
 			err = srvRest.ListenAndServeTLS(conf.Config.Basic.RestTls.Certfile, conf.Config.Basic.RestTls.Keyfile)
 		}
 		if err != nil && err != http.ErrServerClosed {
-			logger.Fatal("Error serving rest service: ", err)
+			logger.Errorf("Error serving rest service: %s", err)
 		}
 	}()
 
-	// Start rpc service
-	portRpc := conf.Config.Basic.Port
-	ipRpc := conf.Config.Basic.Ip
-	rpcSrv := rpc.NewServer()
-	err = rpcSrv.Register(server)
-	if err != nil {
-		logger.Fatal("Format of service Server isn'restHttpType correct. ", err)
+	// Start extend services
+	for k, v := range servers {
+		logger.Infof("start service %s", k)
+		v.serve()
 	}
-	srvRpc := &http.Server{
-		Addr:         fmt.Sprintf("%s:%d", ipRpc, portRpc),
-		WriteTimeout: time.Second * 15,
-		ReadTimeout:  time.Second * 15,
-		IdleTimeout:  time.Second * 60,
-		Handler:      rpcSrv,
-	}
-	go func() {
-		if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed {
-			logger.Fatal("Error serving rpc service:", err)
-		}
-	}()
 
 	//Startup message
 	restHttpType := "http"
@@ -183,23 +123,15 @@ func StartUp(Version, LoadFileType string) {
 	signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
 	<-sigint
 
-	runtime.GetPluginInsManager().KillAll()
-
-	if err = srvRpc.Shutdown(context.TODO()); err != nil {
-		logger.Errorf("rpc server shutdown error: %v", err)
-	}
-	logger.Info("rpc server shutdown.")
-
 	if err = srvRest.Shutdown(context.TODO()); err != nil {
 		logger.Errorf("rest server shutdown error: %v", err)
 	}
 	logger.Info("rest server successfully shutdown.")
 
-	if srvPrometheus != nil {
-		if err = srvPrometheus.Shutdown(context.TODO()); err != nil {
-			logger.Errorf("prometheus server shutdown error: %v", err)
-		}
-		logger.Info("prometheus server successfully shutdown.")
+	// close extend services
+	for k, v := range servers {
+		logger.Infof("close service %s", k)
+		v.close()
 	}
 
 	os.Exit(0)

+ 140 - 0
internal/server/service_init.go

@@ -0,0 +1,140 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build service || !core
+// +build service !core
+
+package server
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/binder"
+	"github.com/lf-edge/ekuiper/internal/service"
+	"github.com/lf-edge/ekuiper/pkg/errorx"
+	"net/http"
+)
+
+var serviceManager *service.Manager
+
+func init() {
+	components["service"] = serviceComp{}
+}
+
+type serviceComp struct{}
+
+func (s serviceComp) register() {
+	var err error
+	serviceManager, err = service.InitManager()
+	if err != nil {
+		panic(err)
+	}
+	entries = append(entries, binder.FactoryEntry{Name: "external service", Factory: serviceManager, Weight: 1})
+}
+
+func (s serviceComp) rest(r *mux.Router) {
+	r.HandleFunc("/services", servicesHandler).Methods(http.MethodGet, http.MethodPost)
+	r.HandleFunc("/services/functions", serviceFunctionsHandler).Methods(http.MethodGet)
+	r.HandleFunc("/services/functions/{name}", serviceFunctionHandler).Methods(http.MethodGet)
+	r.HandleFunc("/services/{name}", serviceHandler).Methods(http.MethodDelete, http.MethodGet, http.MethodPut)
+}
+
+func servicesHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	switch r.Method {
+	case http.MethodGet:
+		content, err := serviceManager.List()
+		if err != nil {
+			handleError(w, err, "service list command error", logger)
+			return
+		}
+		jsonResponse(content, w, logger)
+	case http.MethodPost:
+		sd := &service.ServiceCreationRequest{}
+		err := json.NewDecoder(r.Body).Decode(sd)
+		// Problems decoding
+		if err != nil {
+			handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
+			return
+		}
+		err = serviceManager.Create(sd)
+		if err != nil {
+			handleError(w, err, "service create command error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusCreated)
+		w.Write([]byte(fmt.Sprintf("service %s is created", sd.Name)))
+	}
+}
+
+func serviceHandler(w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+	vars := mux.Vars(r)
+	name := vars["name"]
+	switch r.Method {
+	case http.MethodDelete:
+		err := serviceManager.Delete(name)
+		if err != nil {
+			handleError(w, err, fmt.Sprintf("delete service %s error", name), logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		result := fmt.Sprintf("service %s is deleted", name)
+		w.Write([]byte(result))
+	case http.MethodGet:
+		j, err := serviceManager.Get(name)
+		if err != nil {
+			handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe service %s error", name), logger)
+			return
+		}
+		jsonResponse(j, w, logger)
+	case http.MethodPut:
+		sd := &service.ServiceCreationRequest{}
+		err := json.NewDecoder(r.Body).Decode(sd)
+		// Problems decoding
+		if err != nil {
+			handleError(w, err, "Invalid body: Error decoding the %s service request payload", logger)
+			return
+		}
+		sd.Name = name
+		err = serviceManager.Update(sd)
+		if err != nil {
+			handleError(w, err, "service update command error", logger)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte(fmt.Sprintf("service %s is updated", sd.Name)))
+	}
+}
+
+func serviceFunctionsHandler(w http.ResponseWriter, r *http.Request) {
+	content, err := serviceManager.ListFunctions()
+	if err != nil {
+		handleError(w, err, "service list command error", logger)
+		return
+	}
+	jsonResponse(content, w, logger)
+}
+
+func serviceFunctionHandler(w http.ResponseWriter, r *http.Request) {
+	vars := mux.Vars(r)
+	name := vars["name"]
+	j, err := serviceManager.GetFunction(name)
+	if err != nil {
+		handleError(w, errorx.NewWithCode(errorx.NOT_FOUND, "not found"), fmt.Sprintf("describe function %s error", name), logger)
+		return
+	}
+	jsonResponse(j, w, logger)
+}

+ 37 - 0
internal/server/tpl_init.go

@@ -0,0 +1,37 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build template || !core
+// +build template !core
+
+package server
+
+import (
+	"github.com/gorilla/mux"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
+)
+
+func init() {
+	components["template"] = tplComp{}
+}
+
+type tplComp struct{}
+
+func (t tplComp) register() {
+	transform.RegisterAdditionalFuncs()
+}
+
+func (t tplComp) rest(_ *mux.Router) {
+	// do nothing
+}

+ 0 - 73
internal/template/funcs_test.go

@@ -1,73 +0,0 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package template
-
-import (
-	"encoding/base64"
-	"fmt"
-	"github.com/lf-edge/ekuiper/internal/testx"
-	"reflect"
-	"testing"
-)
-
-func TestBase64Encode(t *testing.T) {
-	var tests = []struct {
-		para   interface{}
-		expect string
-		err    string
-	}{
-		{
-			para:   1,
-			expect: "1",
-		},
-
-		{
-			para:   float32(3.14),
-			expect: "3.14",
-		},
-
-		{
-			para:   float64(3.1415),
-			expect: "3.1415",
-		},
-		{
-			para:   "hello",
-			expect: "hello",
-		},
-		{
-			para:   "{\"hello\" : 3}",
-			expect: "{\"hello\" : 3}",
-		},
-		{
-			para: map[string]interface{}{
-				"temperature": 30,
-				"humidity":    20,
-			},
-			expect: `{"humidity":20,"temperature":30}`,
-		},
-	}
-
-	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
-	for i, tt := range tests {
-		result, err := Base64Encode(tt.para)
-		r, _ := base64.StdEncoding.DecodeString(result)
-		if !reflect.DeepEqual(tt.err, testx.Errstring(err)) {
-			t.Errorf("%d. %q: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.para, tt.err, err)
-
-		} else if tt.err == "" && !reflect.DeepEqual(tt.expect, string(r)) {
-			t.Errorf("%d. %q\n\n mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.para, tt.expect, string(r))
-		}
-	}
-}

+ 2 - 2
internal/topo/node/join_align_node.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -60,7 +60,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		go func() { errCh <- fmt.Errorf("no output channel found") }()
 		return
 	}
-	stats, err := NewStatManager("op", ctx)
+	stats, err := NewStatManager(ctx, "op")
 	if err != nil {
 		go func() { errCh <- err }()
 		return

+ 2 - 2
internal/topo/node/operations.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -98,7 +98,7 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 		cancel()
 	}()
 
-	stats, err := NewStatManager("op", ctx)
+	stats, err := NewStatManager(ctx, "op")
 	if err != nil {
 		o.drainError(errCh, err, ctx)
 		return

+ 4 - 9
internal/topo/node/prometheus.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,6 +12,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+//go:build prometheus || !core
+// +build prometheus !core
+
 package node
 
 import (
@@ -19,15 +22,7 @@ import (
 	"sync"
 )
 
-const RecordsInTotal = "records_in_total"
-const RecordsOutTotal = "records_out_total"
-const ExceptionsTotal = "exceptions_total"
-const ProcessLatencyUs = "process_latency_us"
-const LastInvocation = "last_invocation"
-const BufferLength = "buffer_length"
-
 var (
-	MetricNames        = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyUs, BufferLength, LastInvocation}
 	prometheuseMetrics *PrometheusMetrics
 	mutex              sync.RWMutex
 )

+ 1 - 1
internal/topo/node/sink_node.go

@@ -175,7 +175,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 					sink = m.sinks[instance]
 				}
 
-				stats, err := NewStatManager("sink", ctx)
+				stats, err := NewStatManager(ctx, "sink")
 				if err != nil {
 					m.drainError(result, err, ctx, logger)
 					return

+ 5 - 0
internal/topo/node/sink_node_test.go

@@ -12,6 +12,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+//go:build template || !core
+// +build template !core
+
 package node
 
 import (
@@ -19,6 +22,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"reflect"
 	"testing"
@@ -27,6 +31,7 @@ import (
 
 func TestSinkTemplate_Apply(t *testing.T) {
 	conf.InitConf()
+	transform.RegisterAdditionalFuncs()
 	var tests = []struct {
 		config map[string]interface{}
 		data   []map[string]interface{}

+ 2 - 2
internal/topo/node/source_node.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -114,7 +114,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 					buffer.Close()
 				}()
 
-				stats, err := NewStatManager("source", ctx)
+				stats, err := NewStatManager(ctx, "source")
 				if err != nil {
 					m.drainError(errCh, err, ctx, logger)
 					return

+ 17 - 72
internal/topo/node/stats_manager.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -16,13 +16,19 @@ package node
 
 import (
 	"fmt"
-	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"github.com/prometheus/client_golang/prometheus"
-	"strconv"
 	"time"
 )
 
+const RecordsInTotal = "records_in_total"
+const RecordsOutTotal = "records_out_total"
+const ExceptionsTotal = "exceptions_total"
+const ProcessLatencyUs = "process_latency_us"
+const LastInvocation = "last_invocation"
+const BufferLength = "buffer_length"
+
+var MetricNames = []string{RecordsInTotal, RecordsOutTotal, ExceptionsTotal, ProcessLatencyUs, BufferLength, LastInvocation}
+
 type StatManager interface {
 	IncTotalRecordsIn()
 	IncTotalRecordsOut()
@@ -50,17 +56,7 @@ type DefaultStatManager struct {
 	instanceId       int
 }
 
-type PrometheusStatManager struct {
-	DefaultStatManager
-	//prometheus metrics
-	pTotalRecordsIn  prometheus.Counter
-	pTotalRecordsOut prometheus.Counter
-	pTotalExceptions prometheus.Counter
-	pProcessLatency  prometheus.Gauge
-	pBufferLength    prometheus.Gauge
-}
-
-func NewStatManager(opType string, ctx api.StreamContext) (StatManager, error) {
+func NewStatManager(ctx api.StreamContext, opType string) (StatManager, error) {
 	var prefix string
 	switch opType {
 	case "source":
@@ -73,35 +69,13 @@ func NewStatManager(opType string, ctx api.StreamContext) (StatManager, error) {
 		return nil, fmt.Errorf("invalid opType %s, must be \"source\", \"sink\" or \"op\"", opType)
 	}
 
-	var sm StatManager
-	if conf.Config != nil && conf.Config.Basic.Prometheus {
-		ctx.GetLogger().Debugf("Create prometheus stat manager")
-		psm := &PrometheusStatManager{
-			DefaultStatManager: DefaultStatManager{
-				opType:     opType,
-				prefix:     prefix,
-				opId:       ctx.GetOpId(),
-				instanceId: ctx.GetInstanceId(),
-			},
-		}
-		//assign prometheus
-		mg := GetPrometheusMetrics().GetMetricsGroup(opType)
-		strInId := strconv.Itoa(ctx.GetInstanceId())
-		psm.pTotalRecordsIn = mg.TotalRecordsIn.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
-		psm.pTotalRecordsOut = mg.TotalRecordsOut.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
-		psm.pTotalExceptions = mg.TotalExceptions.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
-		psm.pProcessLatency = mg.ProcessLatency.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
-		psm.pBufferLength = mg.BufferLength.WithLabelValues(ctx.GetRuleId(), opType, ctx.GetOpId(), strInId)
-		sm = psm
-	} else {
-		sm = &DefaultStatManager{
-			opType:     opType,
-			prefix:     prefix,
-			opId:       ctx.GetOpId(),
-			instanceId: ctx.GetInstanceId(),
-		}
+	ds := DefaultStatManager{
+		opType:     opType,
+		prefix:     prefix,
+		opId:       ctx.GetOpId(),
+		instanceId: ctx.GetInstanceId(),
 	}
-	return sm, nil
+	return getStatManager(ctx, ds)
 }
 
 func (sm *DefaultStatManager) IncTotalRecordsIn() {
@@ -133,35 +107,6 @@ func (sm *DefaultStatManager) SetBufferLength(l int64) {
 	sm.bufferLength = l
 }
 
-func (sm *PrometheusStatManager) IncTotalRecordsIn() {
-	sm.totalRecordsIn++
-	sm.pTotalRecordsIn.Inc()
-}
-
-func (sm *PrometheusStatManager) IncTotalRecordsOut() {
-	sm.totalRecordsOut++
-	sm.pTotalRecordsOut.Inc()
-}
-
-func (sm *PrometheusStatManager) IncTotalExceptions() {
-	sm.totalExceptions++
-	sm.pTotalExceptions.Inc()
-	var t time.Time
-	sm.processTimeStart = t
-}
-
-func (sm *PrometheusStatManager) ProcessTimeEnd() {
-	if !sm.processTimeStart.IsZero() {
-		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
-		sm.pProcessLatency.Set(float64(sm.processLatency))
-	}
-}
-
-func (sm *PrometheusStatManager) SetBufferLength(l int64) {
-	sm.bufferLength = l
-	sm.pBufferLength.Set(float64(l))
-}
-
 func (sm *DefaultStatManager) GetMetrics() []interface{} {
 	result := []interface{}{
 		sm.totalRecordsIn, sm.totalRecordsOut, sm.totalExceptions, sm.processLatency, sm.bufferLength,

+ 32 - 0
internal/topo/node/stats_mem.go

@@ -0,0 +1,32 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build !prometheus && core
+// +build !prometheus,core
+
+package node
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/api"
+)
+
+func getStatManager(_ api.StreamContext, sm DefaultStatManager) (StatManager, error) {
+	if conf.Config != nil && conf.Config.Basic.Prometheus {
+		return nil, fmt.Errorf("prometheus support is disabled")
+	} else {
+		return &sm, nil
+	}
+}

+ 87 - 0
internal/topo/node/stats_prom.go

@@ -0,0 +1,87 @@
+// Copyright 2022 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build prometheus || !core
+// +build prometheus !core
+
+package node
+
+import (
+	"github.com/lf-edge/ekuiper/internal/conf"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/prometheus/client_golang/prometheus"
+	"strconv"
+	"time"
+)
+
+func getStatManager(ctx api.StreamContext, dsm DefaultStatManager) (StatManager, error) {
+	ctx.GetLogger().Debugf("Create prometheus stat manager")
+	var sm StatManager
+	if conf.Config != nil && conf.Config.Basic.Prometheus {
+		psm := &PrometheusStatManager{
+			DefaultStatManager: dsm,
+		}
+		//assign prometheus
+		mg := GetPrometheusMetrics().GetMetricsGroup(dsm.opType)
+		strInId := strconv.Itoa(dsm.instanceId)
+		psm.pTotalRecordsIn = mg.TotalRecordsIn.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
+		psm.pTotalRecordsOut = mg.TotalRecordsOut.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
+		psm.pTotalExceptions = mg.TotalExceptions.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
+		psm.pProcessLatency = mg.ProcessLatency.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
+		psm.pBufferLength = mg.BufferLength.WithLabelValues(ctx.GetRuleId(), dsm.opType, dsm.opId, strInId)
+		sm = psm
+	} else {
+		sm = &dsm
+	}
+	return sm, nil
+}
+
+type PrometheusStatManager struct {
+	DefaultStatManager
+	//prometheus metrics
+	pTotalRecordsIn  prometheus.Counter
+	pTotalRecordsOut prometheus.Counter
+	pTotalExceptions prometheus.Counter
+	pProcessLatency  prometheus.Gauge
+	pBufferLength    prometheus.Gauge
+}
+
+func (sm *PrometheusStatManager) IncTotalRecordsIn() {
+	sm.totalRecordsIn++
+	sm.pTotalRecordsIn.Inc()
+}
+
+func (sm *PrometheusStatManager) IncTotalRecordsOut() {
+	sm.totalRecordsOut++
+	sm.pTotalRecordsOut.Inc()
+}
+
+func (sm *PrometheusStatManager) IncTotalExceptions() {
+	sm.totalExceptions++
+	sm.pTotalExceptions.Inc()
+	var t time.Time
+	sm.processTimeStart = t
+}
+
+func (sm *PrometheusStatManager) ProcessTimeEnd() {
+	if !sm.processTimeStart.IsZero() {
+		sm.processLatency = int64(time.Since(sm.processTimeStart) / time.Microsecond)
+		sm.pProcessLatency.Set(float64(sm.processLatency))
+	}
+}
+
+func (sm *PrometheusStatManager) SetBufferLength(l int64) {
+	sm.bufferLength = l
+	sm.pBufferLength.Set(float64(l))
+}

+ 2 - 2
internal/topo/node/window_op.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -95,7 +95,7 @@ func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error) {
 		go func() { errCh <- fmt.Errorf("no output channel found") }()
 		return
 	}
-	stats, err := NewStatManager("op", ctx)
+	stats, err := NewStatManager(ctx, "op")
 	if err != nil {
 		go func() { errCh <- err }()
 		return

+ 10 - 8
internal/template/funcs.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2022 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,24 +12,26 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package template
+//go:build template || !core
+// +build template !core
+
+package transform
 
 import (
 	"encoding/base64"
 	"encoding/json"
 	"fmt"
 	"github.com/Masterminds/sprig/v3"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"reflect"
 	"strconv"
 	"text/template"
 )
 
-var FuncMap template.FuncMap
-
-func init() {
-	FuncMap = template.FuncMap(sprig.FuncMap())
-	FuncMap["json"] = FuncMap["toJson"]
-	FuncMap["base64"] = Base64Encode
+func RegisterAdditionalFuncs() {
+	conf.FuncMap = template.FuncMap(sprig.FuncMap())
+	conf.FuncMap["json"] = conf.FuncMap["toJson"]
+	conf.FuncMap["base64"] = Base64Encode
 }
 
 func Base64Encode(para interface{}) (string, error) {

+ 3 - 3
internal/topo/transform/template.go

@@ -18,7 +18,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
-	ct "github.com/lf-edge/ekuiper/internal/template"
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"text/template"
 )
 
@@ -27,7 +27,7 @@ type TransFunc func(interface{}) ([]byte, bool, error)
 func GenTransform(dt string) (TransFunc, error) {
 	var tp *template.Template = nil
 	if dt != "" {
-		temp, err := template.New("sink").Funcs(ct.FuncMap).Parse(dt)
+		temp, err := template.New("sink").Funcs(conf.FuncMap).Parse(dt)
 		if err != nil {
 			return nil, err
 		}
@@ -49,5 +49,5 @@ func GenTransform(dt string) (TransFunc, error) {
 }
 
 func GenTp(dt string) (*template.Template, error) {
-	return template.New("sink").Funcs(ct.FuncMap).Parse(dt)
+	return template.New("sink").Funcs(conf.FuncMap).Parse(dt)
 }