Browse Source

build(graph): move script node as an optional node

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 years ago
parent
commit
1a712cd2dd

+ 1 - 1
.github/workflows/run_test_case.yaml

@@ -59,7 +59,7 @@ jobs:
         cd ../../../../
         cp -r sdk/python/example/pysam plugins/portable/pysam
         cp -r sdk/python/ekuiper plugins/portable/pysam/
-        go test --tags="edgex test" ./...
+        go test --tags="edgex script test" ./...
     - name: run plugins test case
       run: |
         mkdir -p data/test/uploads

+ 8 - 0
Makefile

@@ -56,6 +56,14 @@ build_with_edgex: build_prepare
 	@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
 	@echo "Build successfully"
 
+.PHONY: build_with_edgex_and_script
+build_with_edgex_and_script: build_prepare
+	GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go
+	GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging script" -o kuiperd cmd/kuiperd/main.go
+	@if [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
+	@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
+	@echo "Build successfully"
+
 .PHONY: build_core
 build_core: build_prepare
 	GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags core -o kuiperd cmd/kuiperd/main.go

+ 1 - 1
deploy/docker/Dockerfile-dev

@@ -22,7 +22,7 @@ WORKDIR /go/kuiper
 RUN apt-get clean \
     && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
 
-RUN make build_with_edgex \
+RUN make build_with_edgex_and_script \
     && ln -s /go/kuiper/_build/kuiper-$(git describe --tags --always)-$(go env GOOS)-$(go env GOARCH) /kuiper
 
 RUN ln -s /go/kuiper/deploy/docker/docker-entrypoint.sh /usr/bin/docker-entrypoint.sh

+ 1 - 1
deploy/docker/Dockerfile-slim

@@ -19,7 +19,7 @@ COPY . /go/kuiper
 
 WORKDIR /go/kuiper
 
-RUN make build_with_edgex
+RUN make build_with_edgex_and_script
 
 FROM debian:buster-20211011 
 

+ 1 - 1
deploy/docker/Dockerfile-slim-python

@@ -19,7 +19,7 @@ COPY . /go/kuiper
 
 WORKDIR /go/kuiper
 
-RUN make build_with_edgex
+RUN make build_with_edgex_and_script
 
 FROM python:3.8.12-slim-bullseye
 

+ 2 - 0
internal/topo/operator/script_operator.go

@@ -12,6 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+//go:build script
+
 package operator
 
 import (

+ 48 - 0
internal/topo/planner/ext_graph_node.go

@@ -0,0 +1,48 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:build script
+
+package planner
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/graph"
+	"github.com/lf-edge/ekuiper/internal/topo/operator"
+	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/cast"
+)
+
+func init() {
+	extNodes["script"] = func(name string, props map[string]interface{}, options *api.RuleOption) (api.TopNode, error) {
+		sop, err := parseScript(props)
+		if err != nil {
+			return nil, err
+		}
+		op := Transform(sop, name, options)
+		return op, nil
+	}
+}
+
+func parseScript(props map[string]interface{}) (*operator.ScriptOp, error) {
+	n := &graph.Script{}
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
+	if n.Script == "" {
+		return nil, fmt.Errorf("script node must have script")
+	}
+	return operator.NewScriptOp(n.Script)
+}

+ 12 - 18
internal/topo/planner/planner_graph.go

@@ -30,6 +30,10 @@ import (
 	"strings"
 )
 
+type genNodeFunc func(name string, props map[string]interface{}, options *api.RuleOption) (api.TopNode, error)
+
+var extNodes = map[string]genNodeFunc{}
+
 // PlanByGraph returns a topo.Topo object by a graph
 func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 	ruleGraph := rule.Graph
@@ -93,7 +97,8 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 			if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
 				return nil, fmt.Errorf("no edge defined for operator node %s", nodeName)
 			}
-			switch strings.ToLower(gn.NodeType) {
+			nt := strings.ToLower(gn.NodeType)
+			switch nt {
 			case "function":
 				fop, err := parseFunc(gn.Props)
 				if err != nil {
@@ -164,15 +169,16 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 					return nil, fmt.Errorf("create switch %s error: %v", nodeName, err)
 				}
 				nodeMap[nodeName] = op
-			case "script":
-				sop, err := parseScript(gn.Props)
+			default:
+				gnf, ok := extNodes[nt]
+				if !ok {
+					return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
+				}
+				op, err := gnf(nodeName, gn.Props, rule.Options)
 				if err != nil {
 					return nil, err
 				}
-				op := Transform(sop, nodeName, rule.Options)
 				nodeMap[nodeName] = op
-			default: // TODO other node type
-				return nil, fmt.Errorf("unknown operator type %s", gn.NodeType)
 			}
 		default:
 			return nil, fmt.Errorf("unknown node type %s", gn.Type)
@@ -597,15 +603,3 @@ func parseSwitch(props map[string]interface{}) (*node.SwitchConfig, error) {
 		StopAtFirstMatch: n.StopAtFirstMatch,
 	}, nil
 }
-
-func parseScript(props map[string]interface{}) (*operator.ScriptOp, error) {
-	n := &graph.Script{}
-	err := cast.MapToStruct(props, n)
-	if err != nil {
-		return nil, err
-	}
-	if n.Script == "" {
-		return nil, fmt.Errorf("script node must have script")
-	}
-	return operator.NewScriptOp(n.Script)
-}