Преглед на файлове

Merge pull request #47 from emqx/jiyong

Refactor and bug fixes
jinfahua преди 5 години
родител
ревизия
c502bf1d2e

+ 1 - 1
docs/en_US/rules/sources/mqtt.md

@@ -1,6 +1,6 @@
 # MQTT source 
 
-Kuiper provides built-in support for MQTT source stream, which can subscribe the message from MQTT broker and feed into the Kuiper processing pipeline.  The configuration file of MQTT source is at ``$kuiper/etc/sources/mqtt.yaml``. Below is the file format.
+Kuiper provides built-in support for MQTT source stream, which can subscribe the message from MQTT broker and feed into the Kuiper processing pipeline.  The configuration file of MQTT source is at ``$kuiper/etc/mqtt_source.yaml``. Below is the file format.
 
 ```yaml
 #Global MQTT configurations

etc/sources/mqtt.yaml → etc/mqtt_source.yaml


+ 5 - 2
xsql/plans/aggregate_test.go

@@ -1,9 +1,10 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -257,6 +258,8 @@ func TestAggregatePlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFilterPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -265,7 +268,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 		}
 
 		pp := &AggregatePlan{Dimensions:stmt.Dimensions.GetGroups()}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		if !ok {
 			t.Errorf("result is not GroupedTuplesSet")

+ 5 - 2
xsql/plans/filter_test.go

@@ -1,9 +1,10 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -218,6 +219,8 @@ func TestFilterPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -226,7 +229,7 @@ func TestFilterPlan_Apply(t *testing.T) {
 		}
 
 		pp := &FilterPlan{Condition:stmt.Condition}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 6 - 2
xsql/plans/having_test.go

@@ -1,8 +1,10 @@
 package plans
 
 import (
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -140,6 +142,8 @@ func TestHavingPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -148,7 +152,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 		}
 
 		pp := &HavingPlan{Condition:stmt.Having}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 6 - 2
xsql/plans/join_multi_test.go

@@ -1,8 +1,10 @@
 package plans
 
 import (
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -382,6 +384,8 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestMultiJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -393,7 +397,7 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}

+ 17 - 6
xsql/plans/join_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -597,6 +598,8 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestLeftJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -608,7 +611,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1023,6 +1026,8 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestInnerJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1034,7 +1039,7 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1151,6 +1156,8 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestRightJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1162,7 +1169,7 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1405,6 +1412,8 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFullJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1416,7 +1425,7 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1535,6 +1544,8 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1546,7 +1557,7 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}

+ 7 - 2
xsql/plans/math_func_test.go

@@ -2,8 +2,10 @@ package plans
 
 import (
 	"encoding/json"
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -453,6 +455,8 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestMathAndConversionFunc_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		//fmt.Println("Running test " + strconv.Itoa(i))
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
@@ -463,7 +467,8 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 			continue
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 11 - 3
xsql/plans/misc_func_test.go

@@ -2,8 +2,10 @@ package plans
 
 import (
 	"encoding/json"
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -130,13 +132,16 @@ func TestHashFunc_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHashFunc_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -179,13 +184,16 @@ func TestMqttFunc_Apply2(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestMqttFunc_Apply2")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 5 - 2
xsql/plans/order_test.go

@@ -1,9 +1,10 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -365,6 +366,8 @@ func TestOrderPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestOrderPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -373,7 +376,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 		}
 
 		pp := &OrderPlan{SortFields:stmt.SortFields}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 12 - 5
xsql/plans/preprocessor_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"log"
 	"reflect"
 	"testing"
@@ -157,6 +158,8 @@ func TestPreprocessor_Apply(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
 		pp := &Preprocessor{streamStmt: tt.stmt}
@@ -167,7 +170,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 			}
@@ -276,6 +279,8 @@ func TestPreprocessorTime_Apply(t *testing.T){
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
 		pp := &Preprocessor{streamStmt: tt.stmt}
@@ -286,7 +291,7 @@ func TestPreprocessorTime_Apply(t *testing.T){
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok{
 				if rtt, ok := rt.Message["abc"].(time.Time); ok{
@@ -424,9 +429,11 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
-		pp, err := NewPreprocessor(tt.stmt, true)
+		pp, err := NewPreprocessor(tt.stmt, nil,true)
 		if err != nil{
 			t.Error(err)
 		}
@@ -437,7 +444,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok{
 				if rtt, ok := rt.Message["abc"].(time.Time); ok{

+ 10 - 8
xsql/plans/project_operator.go

@@ -2,9 +2,9 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
-	"fmt"
 	"strconv"
 	"strings"
 )
@@ -12,6 +12,8 @@ import (
 type ProjectPlan struct {
 	Fields xsql.Fields
 	IsAggregate bool
+
+	isTest bool
 }
 
 /**
@@ -25,7 +27,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	switch input := data.(type) {
 	case *xsql.Tuple:
 		ve := pp.getVE(input, input)
-		results = append(results, project(pp.Fields, ve))
+		results = append(results, project(pp.Fields, ve, pp.isTest))
 	case xsql.WindowTuplesSet:
 		if len(input) != 1 {
 			log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
@@ -34,7 +36,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input[0].Tuples
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
-			results = append(results, project(pp.Fields, ve))
+			results = append(results, project(pp.Fields, ve, pp.isTest))
 			if pp.IsAggregate{
 				break
 			}
@@ -43,7 +45,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
-			results = append(results, project(pp.Fields, ve))
+			results = append(results, project(pp.Fields, ve, pp.isTest))
 			if pp.IsAggregate{
 				break
 			}
@@ -51,7 +53,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case xsql.GroupedTuplesSet:
 		for _, v := range input{
 			ve := pp.getVE(v[0], v)
-			results = append(results, project(pp.Fields, ve))
+			results = append(results, project(pp.Fields, ve, pp.isTest))
 		}
 	default:
 		log.Errorf("Expect xsql.Valuer or its array type")
@@ -74,13 +76,13 @@ func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData) *xsq
 	}
 }
 
-func project(fs xsql.Fields, ve *xsql.ValuerEval) map[string]interface{} {
+func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) map[string]interface{} {
 	result := make(map[string]interface{})
 	for _, f := range fs {
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
-		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)){
+		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) && !isTest{
 			fr := &xsql.FieldRef{StreamName:"", Name:f.AName}
-			v := ve.Eval(fr);
+			v := ve.Eval(fr)
 			result[f.AName] = v
 		} else {
 			v := ve.Eval(f.Expr)

+ 18 - 8
xsql/plans/project_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -276,11 +277,14 @@ func TestProjectPlan_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -706,12 +710,14 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
 		pp := &ProjectPlan{Fields:stmt.Fields}
-
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -836,14 +842,16 @@ func TestProjectPlan_Funcs(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil{
 			t.Error(err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -1055,14 +1063,16 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil{
 			t.Error(err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields, IsAggregate: true}
-
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 6 - 2
xsql/plans/str_func_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -420,13 +421,16 @@ func TestStrFunc_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestStrFunc_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 5 - 1
xstream/nodes/source_node.go

@@ -74,7 +74,11 @@ func getConf(t string, confkey string, ctx api.StreamContext) map[string]interfa
 	if t == ""{
 		t = "mqtt"
 	}
-	conf, err := common.LoadConf("sources/" + t + ".yaml")
+	confPath := "sources/" + t + ".yaml"
+	if t == "mqtt"{
+		confPath = "mqtt_source.yaml"
+	}
+	conf, err := common.LoadConf(confPath)
 	props := make(map[string]interface{})
 	if err == nil {
 		cfg := make(map[string]map[string]interface{})