瀏覽代碼

Merge branch 'master' into add_chart

zhanghongtong 5 年之前
父節點
當前提交
a39ee7a441

File diff suppressed because it is too large
+ 86 - 0
README-CN.md


File diff suppressed because it is too large
+ 62 - 45
README.md


File diff suppressed because it is too large
+ 53 - 12
deploy/docker/README.md


+ 10 - 0
docs/en_US/cross-compile.md

@@ -0,0 +1,10 @@
+## Cross-compile binaries
+
+**Notice: Kuiper plugins bases on Golang, and due to Golang restrictions, ``CGO_ENABLED``  flag must be set to 0 to use the Golang cross-compile. But with this flag mode, the Golang plugins will not work. So if you want to use plugins in Kuiper, you can NOT use cross-compile to produce the binary packages.**
+
+- Preparation
+  - docker version >= 19.03
+  - Enable Docker CLI  experimental mode
+- Cross-compile binary files: ``$ make cross_build``
+- Cross-compile images for all platforms and push to registry:``$ make cross_docker``
+

+ 2 - 2
docs/en_US/plugins/overview.md

@@ -4,8 +4,8 @@ Kuiper implemented several plugins.
 
 | Name                  | Description                                                  |
 | --------------------- | ------------------------------------------------------------ |
-| [zmq](sources/zmq.md)   | The source will subscribe to a Zero Mq topic to import the messages into kuiper
-| [random](sources/random.md)   | The source will generate random inputs with a specified pattern
+| [zmq](sources/zmq.md)   | The source will subscribe to a Zero Mq topic to import the messages into kuiper |
+| [random](sources/random.md)   | The source will generate random inputs with a specified pattern |
 
 ## Sinks
 

+ 1 - 1
docs/en_US/rules/overview.md

@@ -45,7 +45,7 @@ The sql query to run for the rule.
 
 ### actions
 
-Currently, 2 kinds of actions are supported: [log](sinks/logs.md) and [mqtt](sinks/mqtt.md). Each action can define its own properties.
+Currently, 3 kinds of actions are supported: [log](sinks/logs.md), [mqtt](sinks/mqtt.md) and [rest](sinks/rest.md). Each action can define its own properties.
 
 Actions could be customized to support different kinds of outputs, see [extension](../extension/overview.md) for more detailed info.
 

File diff suppressed because it is too large
+ 23 - 0
docs/en_US/rules/sinks/rest.md


+ 1 - 1
docs/en_US/rules/sources/mqtt.md

@@ -1,6 +1,6 @@
 # MQTT source 
 
-Kuiper provides built-in support for MQTT source stream, which can subscribe the message from MQTT broker and feed into the Kuiper processing pipeline.  The configuration file of MQTT source is at ``$kuiper/etc/sources/mqtt.yaml``. Below is the file format.
+Kuiper provides built-in support for MQTT source stream, which can subscribe the message from MQTT broker and feed into the Kuiper processing pipeline.  The configuration file of MQTT source is at ``$kuiper/etc/mqtt_source.yaml``. Below is the file format.
 
 ```yaml
 #Global MQTT configurations

二進制
docs/resources/arch.png


+ 12 - 0
docs/zh_CN/cross-compile.md

@@ -0,0 +1,12 @@
+## 交叉编译二进制文件
+
+**注:Kuiper 插件基于 Golang 的方式实现,由于 Golang 本身的限制,使用了交叉编译的方式必须将编译参数 ``CGO_ENABLED`` 设置为0,而在该模式下,<u>插件将不可工作</u>。所以如果使用了 Kuiper 的插件的话,<u>不能以交叉编译的方式来生成二进制包。</u>**
+
+- 准备
+  - docker version >= 19.03
+  - 启用 Docker CLI 的 experimental 模式(experimental mode)
+- 交叉编译二进制文件:``$ make cross_build``
+- 交叉编译跨平台镜像,并推到库中:``$ make cross_docker``
+
+
+

+ 1 - 1
docs/zh_CN/getting_started.md

@@ -149,4 +149,4 @@ $ bin/cli stop rule ruleDemo
 
 
 
-如果您想了解更多有关该项目的信息,请参考[doc home]()。
+如果您想了解更多有关该项目的信息,请参考[文档中心](reference.md)。

+ 1 - 0
docs/zh_CN/reference.md

@@ -10,3 +10,4 @@
 - [规则](rules/overview.md)
 - [扩展Kuiper](extension/overview.md)
 - [插件](plugins/overview.md)
+

+ 1 - 1
docs/zh_CN/rules/overview.md

@@ -45,7 +45,7 @@
 
 ### 动作
 
-当前,支持两种操作: [log](sinks/logs.md) 和 [mqtt](sinks/mqtt.md).。 每个动作可以定义自己的属性。
+当前,支持两种操作: [log](sinks/logs.md) 、[mqtt](sinks/mqtt.md) 和 [rest](sinks/rest.md)。 每个动作可以定义自己的属性。
 
 可以自定义动作以支持不同种类的输出,有关更多详细信息,请参见 [extension](../extension/overview.md) 。
 

File diff suppressed because it is too large
+ 23 - 0
docs/zh_CN/rules/sinks/rest.md


etc/sources/mqtt.yaml → etc/mqtt_source.yaml


+ 5 - 2
xsql/plans/aggregate_test.go

@@ -1,9 +1,10 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -257,6 +258,8 @@ func TestAggregatePlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFilterPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -265,7 +268,7 @@ func TestAggregatePlan_Apply(t *testing.T) {
 		}
 
 		pp := &AggregatePlan{Dimensions:stmt.Dimensions.GetGroups()}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		gr, ok := result.(xsql.GroupedTuplesSet)
 		if !ok {
 			t.Errorf("result is not GroupedTuplesSet")

+ 5 - 2
xsql/plans/filter_test.go

@@ -1,9 +1,10 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -218,6 +219,8 @@ func TestFilterPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestAggregatePlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -226,7 +229,7 @@ func TestFilterPlan_Apply(t *testing.T) {
 		}
 
 		pp := &FilterPlan{Condition:stmt.Condition}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 6 - 2
xsql/plans/having_test.go

@@ -1,8 +1,10 @@
 package plans
 
 import (
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -140,6 +142,8 @@ func TestHavingPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHavingPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -148,7 +152,7 @@ func TestHavingPlan_Apply(t *testing.T) {
 		}
 
 		pp := &HavingPlan{Condition:stmt.Having}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 6 - 2
xsql/plans/join_multi_test.go

@@ -1,8 +1,10 @@
 package plans
 
 import (
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -382,6 +384,8 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestMultiJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -393,7 +397,7 @@ func TestMultiJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}

+ 17 - 6
xsql/plans/join_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -597,6 +598,8 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestLeftJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -608,7 +611,7 @@ func TestLeftJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1023,6 +1026,8 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestInnerJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1034,7 +1039,7 @@ func TestInnerJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1151,6 +1156,8 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestRightJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1162,7 +1169,7 @@ func TestRightJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1405,6 +1412,8 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestFullJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1416,7 +1425,7 @@ func TestFullJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}
@@ -1535,6 +1544,8 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestCrossJoinPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -1546,7 +1557,7 @@ func TestCrossJoinPlan_Apply(t *testing.T) {
 			t.Errorf("statement source is not a table")
 		}else{
 			pp := &JoinPlan{Joins: stmt.Joins, From: table}
-			result := pp.Apply(nil, tt.data)
+			result := pp.Apply(ctx, tt.data)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 			}

+ 7 - 2
xsql/plans/math_func_test.go

@@ -2,8 +2,10 @@ package plans
 
 import (
 	"encoding/json"
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -453,6 +455,8 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestMathAndConversionFunc_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		//fmt.Println("Running test " + strconv.Itoa(i))
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
@@ -463,7 +467,8 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 			continue
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 11 - 3
xsql/plans/misc_func_test.go

@@ -2,8 +2,10 @@ package plans
 
 import (
 	"encoding/json"
-	"github.com/emqx/kuiper/xsql"
 	"fmt"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -130,13 +132,16 @@ func TestHashFunc_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestHashFunc_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -179,13 +184,16 @@ func TestMqttFunc_Apply2(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestMqttFunc_Apply2")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 5 - 2
xsql/plans/order_test.go

@@ -1,9 +1,10 @@
 package plans
 
 import (
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -365,6 +366,8 @@ func TestOrderPlan_Apply(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestOrderPlan_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil {
@@ -373,7 +376,7 @@ func TestOrderPlan_Apply(t *testing.T) {
 		}
 
 		pp := &OrderPlan{SortFields:stmt.SortFields}
-		result := pp.Apply(nil, tt.data)
+		result := pp.Apply(ctx, tt.data)
 		if !reflect.DeepEqual(tt.result, result) {
 			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}

+ 1 - 1
xsql/plans/preprocessor.go

@@ -63,7 +63,7 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) {
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(tuple, &xsql.FunctionValuer{})}
 			if v := ve.Eval(f.Expr); v != nil {
-				result[f.AName] = v
+				result[strings.ToLower(f.AName)] = v
 			}
 		}
 	}

+ 12 - 5
xsql/plans/preprocessor_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"log"
 	"reflect"
 	"testing"
@@ -157,6 +158,8 @@ func TestPreprocessor_Apply(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
 		pp := &Preprocessor{streamStmt: tt.stmt}
@@ -167,7 +170,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 			}
@@ -276,6 +279,8 @@ func TestPreprocessorTime_Apply(t *testing.T){
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
 		pp := &Preprocessor{streamStmt: tt.stmt}
@@ -286,7 +291,7 @@ func TestPreprocessorTime_Apply(t *testing.T){
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok{
 				if rtt, ok := rt.Message["abc"].(time.Time); ok{
@@ -424,9 +429,11 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
-		pp, err := NewPreprocessor(tt.stmt, true)
+		pp, err := NewPreprocessor(tt.stmt, nil,true)
 		if err != nil{
 			t.Error(err)
 		}
@@ -437,7 +444,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok{
 				if rtt, ok := rt.Message["abc"].(time.Time); ok{

+ 10 - 8
xsql/plans/project_operator.go

@@ -2,9 +2,9 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
-	"fmt"
 	"strconv"
 	"strings"
 )
@@ -12,6 +12,8 @@ import (
 type ProjectPlan struct {
 	Fields xsql.Fields
 	IsAggregate bool
+
+	isTest bool
 }
 
 /**
@@ -25,7 +27,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	switch input := data.(type) {
 	case *xsql.Tuple:
 		ve := pp.getVE(input, input)
-		results = append(results, project(pp.Fields, ve))
+		results = append(results, project(pp.Fields, ve, pp.isTest))
 	case xsql.WindowTuplesSet:
 		if len(input) != 1 {
 			log.Infof("WindowTuplesSet with multiple tuples cannot be evaluated")
@@ -34,7 +36,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input[0].Tuples
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
-			results = append(results, project(pp.Fields, ve))
+			results = append(results, project(pp.Fields, ve, pp.isTest))
 			if pp.IsAggregate{
 				break
 			}
@@ -43,7 +45,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 		ms := input
 		for _, v := range ms {
 			ve := pp.getVE(&v, input)
-			results = append(results, project(pp.Fields, ve))
+			results = append(results, project(pp.Fields, ve, pp.isTest))
 			if pp.IsAggregate{
 				break
 			}
@@ -51,7 +53,7 @@ func (pp *ProjectPlan) Apply(ctx api.StreamContext, data interface{}) interface{
 	case xsql.GroupedTuplesSet:
 		for _, v := range input{
 			ve := pp.getVE(v[0], v)
-			results = append(results, project(pp.Fields, ve))
+			results = append(results, project(pp.Fields, ve, pp.isTest))
 		}
 	default:
 		log.Errorf("Expect xsql.Valuer or its array type")
@@ -74,13 +76,13 @@ func (pp *ProjectPlan) getVE(tuple xsql.DataValuer, agg xsql.AggregateData) *xsq
 	}
 }
 
-func project(fs xsql.Fields, ve *xsql.ValuerEval) map[string]interface{} {
+func project(fs xsql.Fields, ve *xsql.ValuerEval, isTest bool) map[string]interface{} {
 	result := make(map[string]interface{})
 	for _, f := range fs {
 		//Avoid to re-evaluate for non-agg field has alias name, which was already evaluated in pre-processor operator.
-		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)){
+		if f.AName != "" && (!xsql.HasAggFuncs(f.Expr)) && !isTest{
 			fr := &xsql.FieldRef{StreamName:"", Name:f.AName}
-			v := ve.Eval(fr);
+			v := ve.Eval(fr)
 			result[f.AName] = v
 		} else {
 			v := ve.Eval(f.Expr)

+ 18 - 8
xsql/plans/project_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -276,11 +277,14 @@ func TestProjectPlan_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -706,12 +710,14 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_MultiInput")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
 		pp := &ProjectPlan{Fields:stmt.Fields}
-
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -836,14 +842,16 @@ func TestProjectPlan_Funcs(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_Funcs")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil{
 			t.Error(err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)
@@ -1055,14 +1063,16 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestProjectPlan_AggFuncs")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil{
 			t.Error(err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields, IsAggregate: true}
-
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 6 - 2
xsql/plans/str_func_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"strings"
 	"testing"
@@ -420,13 +421,16 @@ func TestStrFunc_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	contextLogger := common.Log.WithField("rule", "TestStrFunc_Apply1")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
 		}
 		pp := &ProjectPlan{Fields:stmt.Fields}
-		result := pp.Apply(nil, tt.data)
+		pp.isTest = true
+		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
 		if v, ok := result.([]byte); ok {
 			err := json.Unmarshal(v, &mapRes)

+ 2 - 0
xsql/processors/xsql_processor.go

@@ -477,6 +477,8 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 		s = sinks.NewLogSink()
 	case "mqtt":
 		s = &sinks.MQTTSink{}
+	case "rest":
+		s = &sinks.RestSink{}
 	default:
 		nf, err := plugin_manager.GetPlugin(name, "sinks")
 		if err != nil {

+ 13 - 0
xsql/processors/xsql_processor_test.go

@@ -404,6 +404,19 @@ func TestSingleSQL(t *testing.T) {
 					"ts":    float64(1541152488442),
 				}},
 			},
+		}, {
+			name: `rule3`,
+			sql:  `SELECT size as Int8, ts FROM demo where size > 3`,
+			r: [][]map[string]interface{}{
+				{{
+					"Int8":  float64(6),
+					"ts":    float64(1541152486822),
+				}},
+				{{
+					"Int8":  float64(4),
+					"ts":    float64(1541152488442),
+				}},
+			},
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 6 - 4
xstream/nodes/sink_node.go

@@ -37,10 +37,12 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 		for {
 			select {
 			case item := <-m.input:
-				if err := m.sink.Collect(ctx, item); err != nil{
-					//TODO deal with publish error
-					logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
-				}
+				go func() {
+					if err := m.sink.Collect(ctx, item); err != nil{
+						//TODO deal with publish error
+						logger.Errorf("sink node %s publish %v error: %v", ctx.GetOpId(), item, err)
+					}
+				}()
 			case <-ctx.Done():
 				logger.Infof("sink node %s done", m.name)
 				if err := m.sink.Close(ctx); err != nil{

+ 5 - 1
xstream/nodes/source_node.go

@@ -74,7 +74,11 @@ func getConf(t string, confkey string, ctx api.StreamContext) map[string]interfa
 	if t == ""{
 		t = "mqtt"
 	}
-	conf, err := common.LoadConf("sources/" + t + ".yaml")
+	confPath := "sources/" + t + ".yaml"
+	if t == "mqtt"{
+		confPath = "mqtt_source.yaml"
+	}
+	conf, err := common.LoadConf(confPath)
 	props := make(map[string]interface{})
 	if err == nil {
 		cfg := make(map[string]map[string]interface{})

+ 6 - 8
xstream/sinks/mqtt_sink.go

@@ -2,10 +2,10 @@ package sinks
 
 import (
 	"crypto/tls"
-	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/xstream/api"
 	"fmt"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/api"
 	"github.com/google/uuid"
 	"strings"
 )
@@ -41,7 +41,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 		}
 	}
 	var pVersion uint = 3
-	pVersionStr, ok := ps["protocolVersion"];
+	pVersionStr, ok := ps["protocolVersion"]
 	if ok {
 		v, _ := pVersionStr.(string)
 		if v == "3.1" {
@@ -54,7 +54,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 	}
 
 	uName := ""
-	un, ok := ps["username"];
+	un, ok := ps["username"]
 	if ok {
 		v, _ := un.(string)
 		if strings.Trim(v, " ") != "" {
@@ -63,7 +63,7 @@ func (ms *MQTTSink) Configure(ps map[string]interface{}) error {
 	}
 
 	password := ""
-	pwd, ok := ps["password"];
+	pwd, ok := ps["password"]
 	if ok {
 		v, _ := pwd.(string)
 		if strings.Trim(v, " ") != "" {
@@ -154,6 +154,4 @@ func (ms *MQTTSink) Close(ctx api.StreamContext) error {
 		ms.conn.Disconnect(5000)
 	}
 	return nil
-}
-
-
+}

+ 202 - 0
xstream/sinks/rest_sink.go

@@ -0,0 +1,202 @@
+package sinks
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"github.com/emqx/kuiper/xstream/api"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+)
+
+type RestSink struct {
+	method      string
+	url         string
+	headers     map[string]string
+	bodyType    string
+	timeout		int64
+	sendSingle  bool
+
+	client      *http.Client
+}
+
+var methodsMap = map[string]bool{"GET": true, "HEAD": true, "POST": true, "PUT": true, "DELETE": true, "PATCH": true}
+var bodyTypeMap = map[string]bool{"none":true, "raw": true, "form": true}
+
+func (ms *RestSink) Configure(ps map[string]interface{}) error {
+	temp, ok := ps["method"]
+	if ok {
+		ms.method, ok = temp.(string)
+		if !ok {
+			return fmt.Errorf("rest sink property method %v is not a string", temp)
+		}
+		ms.method = strings.ToUpper(strings.Trim(ms.method, ""))
+	}else{
+		ms.method = "GET"
+	}
+	if _, ok = methodsMap[ms.method]; !ok {
+		return fmt.Errorf("invalid property method: %s", ms.method)
+	}
+	switch ms.method{
+	case "GET", "HEAD":
+		ms.bodyType = "none"
+	default:
+		ms.bodyType = "raw"
+	}
+
+	temp, ok = ps["url"]
+	if !ok {
+		return fmt.Errorf("rest sink is missing property url")
+	}
+	ms.url, ok = temp.(string)
+	if !ok {
+		return fmt.Errorf("rest sink property url %v is not a string", temp)
+	}
+	ms.url = strings.ToLower(strings.Trim(ms.url, ""))
+
+	temp, ok = ps["headers"]
+	if ok{
+		ms.headers, ok = temp.(map[string]string)
+		if !ok {
+			return fmt.Errorf("rest sink property headers %v is not a map[string][]string", temp)
+		}
+	}
+
+	temp, ok = ps["bodyType"]
+	if ok{
+		ms.bodyType, ok = temp.(string)
+		if !ok {
+			return fmt.Errorf("rest sink property bodyType %v is not a string", temp)
+		}
+		ms.bodyType = strings.ToLower(strings.Trim(ms.bodyType, ""))
+	}
+	if _, ok = bodyTypeMap[ms.bodyType]; !ok {
+		return fmt.Errorf("invalid property bodyType: %s, should be \"none\" or \"form\"", ms.bodyType)
+	}
+
+	temp, ok = ps["timeout"]
+	if !ok {
+		ms.timeout = 5000
+	}else{
+		to, ok := temp.(float64)
+		if !ok {
+			return fmt.Errorf("rest sink property timeout %v is not a number", temp)
+		}
+		ms.timeout = int64(to)
+	}
+
+	temp, ok = ps["sendSingle"]
+	if !ok{
+		ms.sendSingle = false
+	}else{
+		ms.sendSingle, ok = temp.(bool)
+		if !ok {
+			return fmt.Errorf("rest sink property sendSingle %v is not a bool", temp)
+		}
+	}
+
+	return nil
+}
+
+func (ms *RestSink) Open(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	ms.client = &http.Client{Timeout: time.Duration(ms.timeout) * time.Millisecond}
+	logger.Debugf("open rest sink with configuration: {method: %s, url: %s, bodyType: %s, timeout: %d,header: %v, sendSingle: %v", ms.method, ms.url, ms.bodyType, ms.timeout, ms.headers, ms.sendSingle)
+	return nil
+}
+
+func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
+	logger := ctx.GetLogger()
+	v, ok := item.([]byte)
+	if !ok {
+		logger.Warnf("rest sink receive non []byte data: %v", item)
+	}
+	logger.Debugf("rest sink receive %s", item)
+	if !ms.sendSingle{
+		return ms.send(v, logger)
+	}else{
+		var j []map[string]interface{}
+		if err := json.Unmarshal(v, &j); err != nil {
+			return fmt.Errorf("fail to decode the input %s as json: %v", v, err)
+		}
+		logger.Debugf("receive %d records", len(j))
+		for _, r := range j{
+			ms.send(r, logger)
+		}
+	}
+	return nil
+}
+
+func (ms *RestSink) send(v interface{}, logger api.Logger) error {
+	var req *http.Request
+	var err error
+	switch ms.bodyType {
+	case "none":
+		req, err = http.NewRequest(ms.method, ms.url, nil)
+		if err != nil {
+			return fmt.Errorf("fail to create request: %v", err)
+		}
+	case "raw":
+		var content []byte
+		switch t := v.(type) {
+		case []byte:
+			content = t
+		case map[string]interface{}:
+			content, err = json.Marshal(t)
+			if err != nil{
+				return fmt.Errorf("fail to encode content: %v", err)
+			}
+		default:
+			return fmt.Errorf("invalid content: %v", v)
+		}
+		body := bytes.NewBuffer(content)
+		req, err = http.NewRequest(ms.method, ms.url, body)
+		if err != nil {
+			return fmt.Errorf("fail to create request: %v", err)
+		}
+		req.Header.Set("Content-Type", "application/json")
+	case "form":
+		form := url.Values{}
+		switch t := v.(type) {
+		case []byte:
+			form.Set("result", string(t))
+		case map[string]interface{}:
+			for key, value := range t {
+				form.Set(key, fmt.Sprintf("%v", value))
+			}
+		default:
+			return fmt.Errorf("invalid content: %v", v)
+		}
+		body := ioutil.NopCloser(strings.NewReader(form.Encode()))
+		req, err = http.NewRequest(ms.method, ms.url, body)
+		if err != nil {
+			return fmt.Errorf("fail to create request: %v", err)
+		}
+		req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value")
+	default:
+		return fmt.Errorf("unsupported body type %s", ms.bodyType)
+	}
+
+	if len(ms.headers) > 0 {
+		for k, v := range ms.headers {
+			req.Header.Set(k, v)
+		}
+	}
+	logger.Debugf("do request: %s %s with %s", ms.method, ms.url, req.Body)
+	resp, err := ms.client.Do(req)
+	if err != nil {
+		return fmt.Errorf("rest sink fails to send out the data")
+	} else {
+		logger.Debugf("rest sink got response %v", resp)
+	}
+	return nil
+}
+
+func (ms *RestSink) Close(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	logger.Infof("Closing rest sink")
+	return nil
+}