Browse Source

feat(graph): support source node refer to created stream

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 year ago
parent
commit
5a12638c49

+ 2 - 1
.github/workflows/run_test_case.yaml

@@ -56,7 +56,8 @@ jobs:
         go build --buildmode=plugin -o plugins/functions/CountPlusOne@v1.0.0.so extensions/functions/countPlusOne/countPlusOne.go
         go build --buildmode=plugin -o plugins/functions/AccumulateWordCount@v1.0.0.so extensions/functions/accumulateWordCount/accumulateWordCount.go
         go build --buildmode=plugin -o data/test/helloworld.so internal/converter/protobuf/test/*.go
-        go build --buildmode=plugin -o data/test/myFormat.so internal/converter/custom/test/*.go
+        go build --buildmode=plugin -o data/myFormat.so internal/converter/custom/test/*.go
+        cp data/myFormat.so data/test/myFormat.so
         mkdir -p plugins/portable/mirror
         cd sdk/go/example/mirror
         go build -o ../../../../plugins/portable/mirror/mirror .

+ 2 - 1
internal/processor/stream_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -280,6 +280,7 @@ func TestAll(t *testing.T) {
 func TestInferredStream(t *testing.T) {
 	// init schema
 	// Prepare test schema file
+	conf.IsTesting = false
 	dataDir, err := conf.GetDataLoc()
 	if err != nil {
 		t.Fatal(err)

+ 18 - 0
internal/schema/inferer.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"strings"
 
+	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 )
 
@@ -32,6 +33,23 @@ func InferFromSchemaFile(schemaType string, schemaId string) (ast.StreamFields,
 		return nil, fmt.Errorf("invalid schemaId: %s", schemaId)
 	}
 	if c, ok := inferes[schemaType]; ok {
+		// mock result for testing
+		if conf.IsTesting {
+			return ast.StreamFields{
+				{
+					Name: "field1",
+					FieldType: &ast.BasicType{
+						Type: ast.BIGINT,
+					},
+				},
+				{
+					Name: "field2",
+					FieldType: &ast.BasicType{
+						Type: ast.STRINGS,
+					},
+				},
+			}, nil
+		}
 		return c(r[0], r[1])
 	} else {
 		return nil, fmt.Errorf("unsupported type: %s", schemaType)

+ 21 - 12
internal/topo/planner/analyzer.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -32,27 +32,21 @@ type streamInfo struct {
 
 // Analyze the select statement by decorating the info from stream statement.
 // Typically, set the correct stream name for fieldRefs
-func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]streamInfo, []*ast.Call, error) {
+func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*ast.Call, error) {
 	streamsFromStmt := xsql.GetStreams(s)
-	streamStmts := make([]streamInfo, len(streamsFromStmt))
+	streamStmts := make([]*streamInfo, len(streamsFromStmt))
 	isSchemaless := false
 	for i, s := range streamsFromStmt {
 		streamStmt, err := xsql.GetDataSource(store, s)
 		if err != nil {
 			return nil, nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s)
 		}
-		ss := streamStmt.StreamFields
-		if streamStmt.Options.SCHEMAID != "" {
-			ss, err = schema.InferFromSchemaFile(streamStmt.Options.FORMAT, streamStmt.Options.SCHEMAID)
-		}
+		si, err := convertStreamInfo(streamStmt)
 		if err != nil {
 			return nil, nil, err
 		}
-		streamStmts[i] = streamInfo{
-			stmt:   streamStmt,
-			schema: ss,
-		}
-		if ss == nil {
+		streamStmts[i] = si
+		if si.schema == nil {
 			isSchemaless = true
 		}
 	}
@@ -241,6 +235,21 @@ func allAggregate(expr ast.Expr) (r bool) {
 	return
 }
 
+func convertStreamInfo(streamStmt *ast.StreamStmt) (*streamInfo, error) {
+	ss := streamStmt.StreamFields
+	var err error
+	if streamStmt.Options.SCHEMAID != "" {
+		ss, err = schema.InferFromSchemaFile(streamStmt.Options.FORMAT, streamStmt.Options.SCHEMAID)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return &streamInfo{
+		stmt:   streamStmt,
+		schema: ss,
+	}, nil
+}
+
 type fieldsMap struct {
 	content       map[string]streamFieldStore
 	isSchemaless  bool

+ 170 - 1
internal/topo/planner/analyzer_test.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -250,3 +250,172 @@ func Test_validationSchemaless(t *testing.T) {
 		}
 	}
 }
+
+func TestConvertStreamInfo(t *testing.T) {
+	testCases := []struct {
+		name       string
+		streamStmt *ast.StreamStmt
+		expected   ast.StreamFields
+	}{
+		{
+			name: "with match fields & schema",
+			streamStmt: &ast.StreamStmt{
+				StreamFields: []ast.StreamField{
+					{
+						Name: "field1",
+						FieldType: &ast.BasicType{
+							Type: ast.BIGINT,
+						},
+					},
+					{
+						Name: "field2",
+						FieldType: &ast.BasicType{
+							Type: ast.STRINGS,
+						},
+					},
+				},
+				Options: &ast.Options{
+					FORMAT:    "protobuf",
+					SCHEMAID:  "myschema.schema1",
+					TIMESTAMP: "ts",
+				},
+			},
+			expected: []ast.StreamField{
+				{
+					Name: "field1",
+					FieldType: &ast.BasicType{
+						Type: ast.BIGINT,
+					},
+				},
+				{
+					Name: "field2",
+					FieldType: &ast.BasicType{
+						Type: ast.STRINGS,
+					},
+				},
+			},
+		},
+		{
+			name: "with unmatch fields & schema",
+			streamStmt: &ast.StreamStmt{
+				StreamFields: []ast.StreamField{
+					{
+						Name: "field1",
+						FieldType: &ast.BasicType{
+							Type: ast.STRINGS,
+						},
+					},
+					{
+						Name: "field2",
+						FieldType: &ast.BasicType{
+							Type: ast.STRINGS,
+						},
+					},
+				},
+				Options: &ast.Options{
+					FORMAT:    "protobuf",
+					SCHEMAID:  "myschema.schema1",
+					TIMESTAMP: "ts",
+				},
+			},
+			expected: []ast.StreamField{
+				{
+					Name: "field1",
+					FieldType: &ast.BasicType{
+						Type: ast.BIGINT,
+					},
+				},
+				{
+					Name: "field2",
+					FieldType: &ast.BasicType{
+						Type: ast.STRINGS,
+					},
+				},
+			},
+		},
+		{
+			name: "without schema",
+			streamStmt: &ast.StreamStmt{
+				StreamFields: []ast.StreamField{
+					{
+						Name: "field1",
+						FieldType: &ast.BasicType{
+							Type: ast.FLOAT,
+						},
+					},
+					{
+						Name: "field2",
+						FieldType: &ast.BasicType{
+							Type: ast.STRINGS,
+						},
+					},
+				},
+				Options: &ast.Options{
+					FORMAT:    "json",
+					TIMESTAMP: "ts",
+				},
+			},
+			expected: []ast.StreamField{
+				{
+					Name: "field1",
+					FieldType: &ast.BasicType{
+						Type: ast.FLOAT,
+					},
+				},
+				{
+					Name: "field2",
+					FieldType: &ast.BasicType{
+						Type: ast.STRINGS,
+					},
+				},
+			},
+		},
+		{
+			name: "without fields",
+			streamStmt: &ast.StreamStmt{
+				Options: &ast.Options{
+					FORMAT:    "protobuf",
+					SCHEMAID:  "myschema.schema1",
+					TIMESTAMP: "ts",
+				},
+			},
+			expected: []ast.StreamField{
+				{
+					Name: "field1",
+					FieldType: &ast.BasicType{
+						Type: ast.BIGINT,
+					},
+				},
+				{
+					Name: "field2",
+					FieldType: &ast.BasicType{
+						Type: ast.STRINGS,
+					},
+				},
+			},
+		},
+		{
+			name: "schemaless",
+			streamStmt: &ast.StreamStmt{
+				Options: &ast.Options{
+					FORMAT:    "json",
+					TIMESTAMP: "ts",
+				},
+			},
+			expected: nil,
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			actual, err := convertStreamInfo(tc.streamStmt)
+			if err != nil {
+				t.Errorf("unexpected error: %v", err)
+				return
+			}
+			if !reflect.DeepEqual(actual.schema, tc.expected) {
+				t.Errorf("unexpected result: got %v, want %v", actual.schema, tc.expected)
+			}
+		})
+	}
+}

+ 48 - 41
internal/topo/planner/planner.go

@@ -120,48 +120,13 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	)
 	switch t := lp.(type) {
 	case *DataSourcePlan:
-		isSchemaless := t.isSchemaless
-		switch t.streamStmt.StreamType {
-		case ast.TypeStream:
-			var (
-				pp  node.UnOperation
-				err error
-			)
-			if t.iet || (!isSchemaless && (t.streamStmt.Options.STRICT_VALIDATION || t.isBinary)) {
-				pp, err = operator.NewPreprocessor(isSchemaless, t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary, t.streamStmt.Options.STRICT_VALIDATION)
-				if err != nil {
-					return nil, 0, err
-				}
-			}
-			var srcNode *node.SourceNode
-			if len(sources) == 0 {
-				sourceNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
-				srcNode = sourceNode
-			} else {
-				srcNode = getMockSource(sources, string(t.name))
-				if srcNode == nil {
-					return nil, 0, fmt.Errorf("can't find predefined source %s", t.name)
-				}
-			}
-			tp.AddSrc(srcNode)
-			inputs = []api.Emitter{srcNode}
-			op = srcNode
-		case ast.TypeTable:
-			pp, err := operator.NewTableProcessor(isSchemaless, string(t.name), t.streamFields, t.streamStmt.Options)
-			if err != nil {
-				return nil, 0, err
-			}
-			var srcNode *node.SourceNode
-			if len(sources) > 0 {
-				srcNode = getMockSource(sources, string(t.name))
-			}
-			if srcNode == nil {
-				srcNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
-			}
-			tp.AddSrc(srcNode)
-			inputs = []api.Emitter{srcNode}
-			op = srcNode
+		srcNode, err := transformSourceNode(t, sources, options)
+		if err != nil {
+			return nil, 0, err
 		}
+		tp.AddSrc(srcNode)
+		inputs = []api.Emitter{srcNode}
+		op = srcNode
 	case *AnalyticFuncsPlan:
 		op = Transform(&operator.AnalyticFuncsOp{Funcs: t.funcs}, fmt.Sprintf("%d_analytic", newIndex), options)
 	case *WindowPlan:
@@ -213,6 +178,48 @@ func buildOps(lp LogicalPlan, tp *topo.Topo, options *api.RuleOption, sources []
 	return op, newIndex, nil
 }
 
+func transformSourceNode(t *DataSourcePlan, sources []*node.SourceNode, options *api.RuleOption) (*node.SourceNode, error) {
+	isSchemaless := t.isSchemaless
+	switch t.streamStmt.StreamType {
+	case ast.TypeStream:
+		var (
+			pp  node.UnOperation
+			err error
+		)
+		if t.iet || (!isSchemaless && (t.streamStmt.Options.STRICT_VALIDATION || t.isBinary)) {
+			pp, err = operator.NewPreprocessor(isSchemaless, t.streamFields, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary, t.streamStmt.Options.STRICT_VALIDATION)
+			if err != nil {
+				return nil, err
+			}
+		}
+		var srcNode *node.SourceNode
+		if len(sources) == 0 {
+			sourceNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
+			srcNode = sourceNode
+		} else {
+			srcNode = getMockSource(sources, string(t.name))
+			if srcNode == nil {
+				return nil, fmt.Errorf("can't find predefined source %s", t.name)
+			}
+		}
+		return srcNode, nil
+	case ast.TypeTable:
+		pp, err := operator.NewTableProcessor(isSchemaless, string(t.name), t.streamFields, t.streamStmt.Options)
+		if err != nil {
+			return nil, err
+		}
+		var srcNode *node.SourceNode
+		if len(sources) > 0 {
+			srcNode = getMockSource(sources, string(t.name))
+		}
+		if srcNode == nil {
+			srcNode = node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options.SendError)
+		}
+		return srcNode, nil
+	}
+	return nil, fmt.Errorf("unknown stream type %d", t.streamStmt.StreamType)
+}
+
 func getMockSource(sources []*node.SourceNode, name string) *node.SourceNode {
 	for _, source := range sources {
 		if name == source.GetName() {

+ 67 - 21
internal/topo/planner/planner_graph.go

@@ -20,6 +20,7 @@ import (
 	"strings"
 
 	"github.com/lf-edge/ekuiper/internal/binder/function"
+	store2 "github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/topo"
 	"github.com/lf-edge/ekuiper/internal/topo/graph"
 	"github.com/lf-edge/ekuiper/internal/topo/node"
@@ -28,6 +29,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"github.com/lf-edge/ekuiper/pkg/kv"
 	"github.com/lf-edge/ekuiper/pkg/message"
 )
 
@@ -49,6 +51,7 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 		nodeMap = make(map[string]api.TopNode)
 		sinks   = make(map[string]bool)
 		sources = make(map[string]bool)
+		store   kv.KeyValue
 	)
 	for nodeName, gn := range ruleGraph.Nodes {
 		switch gn.Type {
@@ -56,36 +59,79 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 			if _, ok := ruleGraph.Topo.Edges[nodeName]; !ok {
 				return nil, fmt.Errorf("no edge defined for source node %s", nodeName)
 			}
-			sourceType, ok := gn.Props["source_type"]
-			if !ok {
-				sourceType = "stream"
-			}
-			st, ok := sourceType.(string)
-			if !ok {
-				return nil, fmt.Errorf("source_type %v is not string", sourceType)
+			sourceMeta := &api.SourceMeta{
+				SourceType: "stream",
 			}
-			st = strings.ToLower(st)
-			sourceOption := &ast.Options{}
-			err := cast.MapToStruct(gn.Props, sourceOption)
+			err = cast.MapToStruct(gn.Props, sourceMeta)
 			if err != nil {
 				return nil, err
 			}
-			sourceOption.TYPE = gn.NodeType
-			switch st {
-			case "stream":
-				// TODO deal with conf key
-				pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
+			if sourceMeta.SourceType != "stream" && sourceMeta.SourceType != "table" {
+				return nil, fmt.Errorf("source type %s not supported", sourceMeta.SourceType)
+			}
+			// If source name is specified, find the created stream/table from store
+			if sourceMeta.SourceName != "" {
+				if store == nil {
+					store, err = store2.GetKV("stream")
+					if err != nil {
+						return nil, err
+					}
+				}
+				streamStmt, e := xsql.GetDataSource(store, sourceMeta.SourceName)
+				if e != nil {
+					return nil, fmt.Errorf("fail to get stream %s, please check if stream is created", sourceMeta.SourceName)
+				}
+				if streamStmt.StreamType == ast.TypeStream && sourceMeta.SourceType == "table" {
+					return nil, fmt.Errorf("stream %s is not a table", sourceMeta.SourceName)
+				} else if streamStmt.StreamType == ast.TypeTable && sourceMeta.SourceType == "stream" {
+					return nil, fmt.Errorf("table %s is not a stream", sourceMeta.SourceName)
+				}
+				st := streamStmt.Options.TYPE
+				if st == "" {
+					st = "mqtt"
+				}
+				if st != gn.NodeType {
+					return nil, fmt.Errorf("source type %s does not match the stream type %s", gn.NodeType, st)
+				}
+				sInfo, err := convertStreamInfo(streamStmt)
 				if err != nil {
 					return nil, err
 				}
-				srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
+				// Use the plan to calculate the schema and other meta info
+				p := DataSourcePlan{
+					name:         sInfo.stmt.Name,
+					streamStmt:   sInfo.stmt,
+					streamFields: sInfo.schema.ToJsonSchema(),
+					isSchemaless: sInfo.schema == nil,
+					iet:          rule.Options.IsEventTime,
+					allMeta:      rule.Options.SendMetaToSink,
+				}.Init()
+				err = p.PruneColumns(nil)
+				if err != nil {
+					return nil, err
+				}
+				srcNode, err := transformSourceNode(p, nil, rule.Options)
 				nodeMap[nodeName] = srcNode
 				tp.AddSrc(srcNode)
-			case "table":
-				// TODO add table
-				return nil, fmt.Errorf("table source is not supported yet")
-			default:
-				return nil, fmt.Errorf("unknown source type %s", st)
+			} else {
+				sourceOption := &ast.Options{}
+				err = cast.MapToStruct(gn.Props, sourceOption)
+				if err != nil {
+					return nil, err
+				}
+				sourceOption.TYPE = gn.NodeType
+				switch sourceMeta.SourceType {
+				case "stream":
+					pp, err := operator.NewPreprocessor(true, nil, true, nil, rule.Options.IsEventTime, sourceOption.TIMESTAMP, sourceOption.TIMESTAMP_FORMAT, strings.EqualFold(sourceOption.FORMAT, message.FormatBinary), sourceOption.STRICT_VALIDATION)
+					if err != nil {
+						return nil, err
+					}
+					srcNode := node.NewSourceNode(nodeName, ast.TypeStream, pp, sourceOption, rule.Options.SendError)
+					nodeMap[nodeName] = srcNode
+					tp.AddSrc(srcNode)
+				case "table":
+					return nil, fmt.Errorf("anonymouse table source is not supported, please create it prior to the rule")
+				}
 			}
 			sources[nodeName] = true
 		case "sink":

+ 199 - 2
internal/topo/planner/planner_graph_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -20,8 +20,11 @@ import (
 	"reflect"
 	"testing"
 
+	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/testx"
+	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
+	"github.com/lf-edge/ekuiper/pkg/ast"
 )
 
 func TestPlannerGraphValidate(t *testing.T) {
@@ -493,7 +496,7 @@ func TestPlannerGraphValidate(t *testing.T) {
 		},
 	}
 
-	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	t.Logf("The test bucket size is %d.\n\n", len(tests))
 	for i, tt := range tests {
 		rg := &api.RuleGraph{}
 		err := json.Unmarshal([]byte(tt.graph), rg)
@@ -522,3 +525,197 @@ func TestPlannerGraphValidate(t *testing.T) {
 		}
 	}
 }
+
+func TestPlannerGraphWithStream(t *testing.T) {
+	store, err := store.GetKV("stream")
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	streamSqls := map[string]string{
+		"src1": `CREATE STREAM src1 (
+					id1 BIGINT,
+					temp BIGINT,
+					name string,
+					myarray array(string)
+				) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`,
+		"src2": `CREATE STREAM src2 (
+					id2 BIGINT,
+					hum BIGINT
+				) WITH (DATASOURCE="src2", FORMAT="json", KEY="ts", TIMESTAMP_FORMAT="YYYY-MM-dd HH:mm:ss");`,
+		"tableInPlanner": `CREATE TABLE tableInPlanner (
+					id BIGINT,
+					name STRING,
+					value STRING,
+					hum BIGINT
+				) WITH (TYPE="file");`,
+	}
+	types := map[string]ast.StreamType{
+		"src1":           ast.TypeStream,
+		"src2":           ast.TypeStream,
+		"tableInPlanner": ast.TypeTable,
+	}
+	for name, sql := range streamSqls {
+		s, err := json.Marshal(&xsql.StreamInfo{
+			StreamType: types[name],
+			Statement:  sql,
+		})
+		if err != nil {
+			t.Error(err)
+			t.Fail()
+		}
+		err = store.Set(name, string(s))
+		if err != nil {
+			t.Error(err)
+			t.Fail()
+		}
+	}
+	testCases := []struct {
+		name  string
+		graph string
+		err   error
+	}{
+		{
+			name: "test stream",
+			graph: `{
+    "nodes": {
+      "demo": {
+        "type": "source",
+        "nodeType": "mqtt",
+        "props": {
+          "sourceType": "stream",
+          "sourceName": "src1"
+        }
+      },
+      "log": {
+        "type": "sink",
+        "nodeType": "log",
+        "props": {}
+      }
+    },
+    "topo": {
+      "sources": ["demo"],
+      "edges": {
+        "demo": ["log"]
+      }
+    }
+}`,
+			err: nil,
+		},
+		{
+			name: "stream type wrong",
+			graph: `{
+    "nodes": {
+      "demo": {
+        "type": "source",
+        "nodeType": "file",
+        "props": {
+          "sourceType": "stream",
+          "sourceName": "src1"
+        }
+      },
+      "log": {
+        "type": "sink",
+        "nodeType": "log",
+        "props": {}
+      }
+    },
+    "topo": {
+      "sources": ["demo"],
+      "edges": {
+        "demo": ["log"]
+      }
+    }
+}`,
+			err: fmt.Errorf("source type file does not match the stream type mqtt"),
+		},
+		{
+			name: "non exist stream",
+			graph: `{
+    "nodes": {
+      "demo": {
+        "type": "source",
+        "nodeType": "mqtt",
+        "props": {
+          "sourceType": "stream",
+          "sourceName": "unknown"
+        }
+      },
+      "log": {
+        "type": "sink",
+        "nodeType": "log",
+        "props": {}
+      }
+    },
+    "topo": {
+      "sources": ["demo"],
+      "edges": {
+        "demo": ["log"]
+      }
+    }
+}`,
+			err: fmt.Errorf("fail to get stream unknown, please check if stream is created"),
+		},
+		{
+			name: "wrong source type",
+			graph: `{
+    "nodes": {
+      "demo": {
+        "type": "source",
+        "nodeType": "mqtt",
+        "props": {
+          "sourceType": "stream",
+          "sourceName": "tableInPlanner"
+        }
+      },
+      "log": {
+        "type": "sink",
+        "nodeType": "log",
+        "props": {}
+      }
+    },
+    "topo": {
+      "sources": ["demo"],
+      "edges": {
+        "demo": ["log"]
+      }
+    }
+}`,
+			err: fmt.Errorf("table tableInPlanner is not a stream"),
+		},
+	}
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			rg := &api.RuleGraph{}
+			err := json.Unmarshal([]byte(tc.graph), rg)
+			if err != nil {
+				t.Error(err)
+				return
+			}
+			_, err = PlanByGraph(&api.Rule{
+				Triggered: false,
+				Id:        "test",
+				Graph:     rg,
+				Options: &api.RuleOption{
+					IsEventTime:        false,
+					LateTol:            1000,
+					Concurrency:        1,
+					BufferLength:       1024,
+					SendMetaToSink:     false,
+					SendError:          true,
+					Qos:                api.AtMostOnce,
+					CheckpointInterval: 300000,
+				},
+			})
+			if tc.err == nil {
+				if err != nil {
+					t.Errorf("error mismatch:\n  exp=%s\n  got=%s\n\n", tc.err, err)
+				}
+				return
+			}
+			if !reflect.DeepEqual(tc.err.Error(), err.Error()) {
+				t.Errorf("error mismatch:\n  exp=%s\n  got=%s\n\n", tc.err, err)
+			}
+		})
+	}
+}

+ 42 - 0
internal/topo/planner/planner_test.go

@@ -25,6 +25,7 @@ import (
 
 	"github.com/lf-edge/ekuiper/internal/pkg/store"
 	"github.com/lf-edge/ekuiper/internal/testx"
+	"github.com/lf-edge/ekuiper/internal/topo/node"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
@@ -3103,3 +3104,44 @@ func Test_createLogicalPlan4Lookup(t *testing.T) {
 		}
 	}
 }
+
+func TestTransformSourceNode(t *testing.T) {
+	testCases := []struct {
+		name string
+		plan *DataSourcePlan
+		node *node.SourceNode
+	}{
+		{
+			name: "normal source node",
+			plan: &DataSourcePlan{
+				name: "test",
+				streamStmt: &ast.StreamStmt{
+					StreamType: ast.TypeStream,
+					Options: &ast.Options{
+						TYPE: "file",
+					},
+				},
+				streamFields: nil,
+				allMeta:      false,
+				metaFields:   []string{},
+				iet:          false,
+				isBinary:     false,
+			},
+			node: node.NewSourceNode("test", ast.TypeStream, nil, &ast.Options{
+				TYPE: "file",
+			}, false),
+		},
+	}
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			node, err := transformSourceNode(tc.plan, nil, &api.RuleOption{})
+			if err != nil {
+				t.Errorf("unexpected error: %v", err)
+				return
+			}
+			if !reflect.DeepEqual(node, tc.node) {
+				t.Errorf("unexpected result: got %v, want %v", node, tc.node)
+			}
+		})
+	}
+}

+ 7 - 0
pkg/api/stream.go

@@ -167,6 +167,13 @@ type GraphNode struct {
 	UI map[string]interface{} `json:"ui"`
 }
 
+// SourceMeta is the meta data of a source node. It describes what existed stream/table to refer to.
+// It is part of the Props in the GraphNode and it is optional
+type SourceMeta struct {
+	SourceName string `json:"sourceName"` // the name of the stream or table
+	SourceType string `json:"sourceType"` // stream or table
+}
+
 type RuleGraph struct {
 	Nodes map[string]*GraphNode `json:"nodes"`
 	Topo  *PrintableTopo        `json:"topo"`