浏览代码

test(portable): unit test for python plugins

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

chore(CI): setup python env for test case workflows

Signed-off-by: zhanghongtong <rory-z@outlook.com>

test(portable): unit test for python plugins

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 年之前
父节点
当前提交
172d9a522f

+ 11 - 4
.github/workflows/run_test_case.yaml

@@ -11,11 +11,16 @@ jobs:
     run_test_case:
         runs-on: ubuntu-latest
 
-        container:
-            image: golang:1.15.1
-
         steps:
         - uses: actions/checkout@v2
+        - uses: actions/setup-go@v2
+          with:
+            go-version: '1.15.1'
+        - uses: actions/setup-python@v2
+          with:
+            python-version: '3.x'
+            architecture: 'x64'
+        - run: pip3 install pynng
         - name: run code static check
           run : |
             if [ ! -z "$(gofmt -l .)" ];then
@@ -24,7 +29,7 @@ jobs:
               exit 1
             fi
         - name: install lib
-          run: apt-get update && apt-get install libzmq3-dev -y
+          run: sudo apt-get update && sudo apt-get install libzmq3-dev -y
         - name: run test case
           run: |
             set -e -u -x
@@ -41,6 +46,8 @@ jobs:
             go build -o ../../../../plugins/portable/mirror/mirror .
             cp mirror.json ../../../../plugins/portable/mirror
             cd ../../../../
+            cp -r sdk/python/example/pysam plugins/portable/pysam
+            cp -r sdk/python/ekuiper plugins/portable/pysam/
             go test --tags="edgex test" ./...
         - uses: actions/upload-artifact@v1
           if: failure()

+ 1 - 1
internal/plugin/portable/manager_test.go

@@ -106,7 +106,7 @@ func TestManager_Read(t *testing.T) {
 		},
 	}
 	result := manager.List()
-	if len(result) != 2 {
+	if len(result) != 3 {
 		t.Errorf("list result mismatch:\n  exp=%v\n  got=%v\n\n", expPlugins, result)
 	}
 

+ 1 - 1
internal/plugin/portable/runtime/connection.go

@@ -180,7 +180,7 @@ func CreateControlChannel(pluginName string) (ControlChannel, error) {
 	if err = listenWithRetry(sock, url); err != nil {
 		return nil, fmt.Errorf("can't listen on rep socket: %s", err.Error())
 	}
-	conf.Log.Infof("sink channel created: %s", url)
+	conf.Log.Infof("control channel created: %s", url)
 	return &NanomsgReqChannel{sock: sock}, nil
 }
 

+ 4 - 1
internal/plugin/portable/runtime/sink.go

@@ -91,5 +91,8 @@ func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error {
 }
 
 func (ps *PortableSink) Close(ctx api.StreamContext) error {
-	return ps.clean()
+	if ps.clean != nil {
+		return ps.clean()
+	}
+	return nil
 }

+ 60 - 11
internal/plugin/portable/test/portable_rule_test.go

@@ -25,6 +25,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/plugin/portable"
 	"github.com/lf-edge/ekuiper/internal/plugin/portable/runtime"
 	"github.com/lf-edge/ekuiper/internal/processor"
+	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/planner"
 	"github.com/lf-edge/ekuiper/internal/topo/topotest"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -51,10 +52,8 @@ func init() {
 	}
 }
 
-const CACHE_FILE = "cache2"
-
 func TestSourceAndFunc(t *testing.T) {
-	streamList := []string{"ext"}
+	streamList := []string{"ext", "extpy"}
 	topotest.HandleStream(false, streamList, t)
 	var tests = []struct {
 		Name string
@@ -64,7 +63,7 @@ func TestSourceAndFunc(t *testing.T) {
 	}{
 		{
 			Name: `TestPortableRule1`,
-			Rule: `{"sql":"SELECT echo(count) as ee FROM ext","actions":[{"file":{"path":"` + CACHE_FILE + `"}}]}`,
+			Rule: `{"sql":"SELECT echo(count) as ee FROM ext","actions":[{"file":{"path":"cache1"}}]}`,
 			R: [][]map[string]interface{}{
 				{{
 					"ee": float64(50),
@@ -82,13 +81,35 @@ func TestSourceAndFunc(t *testing.T) {
 				"source_ext_0_records_out_total":  int64(3),
 				"sink_file_0_0_records_out_total": int64(3),
 			},
+		}, {
+			Name: `TestPythonRule`,
+			Rule: `{"sql":"SELECT revert(name) as ee FROM extpy","actions":[{"file":{"path":"cache2"}},{"print":{}}]}`,
+			R: [][]map[string]interface{}{
+				{{
+					"ee": "nosjyp",
+				}},
+				{{
+					"ee": "nosjyp",
+				}},
+				{{
+					"ee": "nosjyp",
+				}},
+			},
+			M: map[string]interface{}{
+				"source_extpy_0_exceptions_total":  int64(0),
+				"source_extpy_0_records_in_total":  int64(3),
+				"source_extpy_0_records_out_total": int64(3),
+				"sink_file_0_0_records_out_total":  int64(3),
+				"sink_print_1_0_records_out_total": int64(3),
+			},
 		},
 	}
-	topotest.HandleStream(true, streamList, t)
+
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 	defer runtime.GetPluginInsManager().KillAll()
 	for i, tt := range tests {
-		_ = os.Remove(CACHE_FILE)
+		_ = os.Remove(fmt.Sprintf("cache%d", i+1))
+		topotest.HandleStream(true, streamList[i:i+1], t)
 		rs, err := CreateRule(tt.Name, tt.Rule)
 		if err != nil {
 			t.Errorf("failed to create rule: %s.", err)
@@ -111,17 +132,18 @@ func TestSourceAndFunc(t *testing.T) {
 			}
 			fmt.Println("all exit")
 		}(ctx)
+		topotest.HandleStream(false, streamList[i:i+1], t)
 		for {
 			if ctx.Err() != nil {
 				t.Errorf("Exiting with error %v", ctx.Err())
 				break
 			}
 			time.Sleep(10 * time.Millisecond)
-			if err := topotest.CompareMetrics(tp, tt.M); err == nil {
+			if compareMetrics(tp, tt.M) {
 				cancel()
 				// need to wait for file results
 				time.Sleep(10 * time.Millisecond)
-				results := getResults()
+				results := getResults(fmt.Sprintf("cache%d", i+1))
 				fmt.Printf("get results %v\n", results)
 				time.Sleep(10 * time.Millisecond)
 				var mm [][]map[string]interface{}
@@ -144,13 +166,40 @@ func TestSourceAndFunc(t *testing.T) {
 			}
 		}
 	}
-	topotest.HandleStream(false, streamList, t)
 	// wait for rule clean up
 	time.Sleep(1 * time.Second)
 }
 
-func getResults() []string {
-	f, err := os.Open(CACHE_FILE)
+func compareMetrics(tp *topo.Topo, m map[string]interface{}) bool {
+	keys, values := tp.GetMetrics()
+	for k, v := range m {
+		var (
+			index   int
+			key     string
+			matched bool
+		)
+		for index, key = range keys {
+			if k == key {
+				va, ok := values[index].(int64)
+				if !ok {
+					continue
+				}
+				ve := v.(int64)
+				if va < ve {
+					return false
+				}
+				matched = true
+			}
+		}
+		if !matched {
+			return false
+		}
+	}
+	return true
+}
+
+func getResults(fileName string) []string {
+	f, err := os.Open(fileName)
 	if err != nil {
 		panic(err)
 	}

+ 2 - 0
internal/topo/topotest/mock_topo.go

@@ -334,6 +334,8 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 				sql = "CREATE STREAM ext (count bigint) WITH (DATASOURCE=\"ext\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"ext\")"
 			case "ext2":
 				sql = "CREATE STREAM ext2 (count bigint) WITH (DATASOURCE=\"ext2\", FORMAT=\"JSON\", TYPE=\"random\", CONF_KEY=\"dedup\")"
+			case "extpy":
+				sql = "CREATE STREAM extpy (name string, value bigint) WITH (FORMAT=\"JSON\", TYPE=\"pyjson\", CONF_KEY=\"ext\")"
 			case "text":
 				sql = "CREATE STREAM text (slogan string, brand string) WITH (DATASOURCE=\"text\", TYPE=\"mock\", FORMAT=\"JSON\")"
 			case "binDemo":