Pārlūkot izejas kodu

feat(topo): add context function to get the transformed data

Also support retryInterval and retryCount when cached is not set #1031

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 3 gadi atpakaļ
vecāks
revīzija
931fdebb9f
36 mainītis faili ar 661 papildinājumiem un 706 dzēšanām
  1. 0 12
      docs/en_US/extension/native/overview.md
  2. 19 1
      docs/en_US/extension/native/sink.md
  3. 2 2
      docs/en_US/extension/portable/overview.md
  4. 0 10
      docs/zh_CN/extension/native/overview.md
  5. 19 1
      docs/zh_CN/extension/native/sink.md
  6. 3 3
      docs/zh_CN/extension/portable/overview.md
  7. 2 2
      extensions/sinks/file/file.go
  8. 23 15
      extensions/sinks/image/image.go
  9. 53 38
      extensions/sinks/influx/influx.go
  10. 37 50
      extensions/sinks/redis/redis.go
  11. 3 10
      extensions/sinks/tdengine/tdengine.go
  12. 1 1
      extensions/sinks/zmq/zmq.go
  13. 20 1
      internal/pkg/httpx/http.go
  14. 6 7
      internal/plugin/portable/runtime/sink.go
  15. 42 2
      internal/topo/context/default_test.go
  16. 36 0
      internal/topo/context/transform.go
  17. 1 6
      internal/topo/memory/manager_test.go
  18. 12 20
      internal/topo/memory/sink.go
  19. 120 201
      internal/topo/node/sink_node.go
  20. 14 13
      internal/topo/node/sink_node_test.go
  21. 1 0
      internal/topo/node/source_pool.go
  22. 1 1
      internal/topo/operator/filter_test.go
  23. 14 27
      internal/topo/operator/math_func_test.go
  24. 17 57
      internal/topo/operator/misc_func_test.go
  25. 1 6
      internal/topo/operator/project_operator.go
  26. 112 164
      internal/topo/operator/project_test.go
  27. 9 22
      internal/topo/operator/str_func_test.go
  28. 2 2
      internal/topo/sink/edgex_sink.go
  29. 7 3
      internal/topo/sink/mqtt_sink.go
  30. 10 5
      internal/topo/sink/rest_sink.go
  31. 19 14
      internal/topo/sink/rest_sink_test.go
  32. 1 1
      internal/topo/topotest/mock_topo.go
  33. 2 2
      internal/topo/topotest/mocknode/mock_sink.go
  34. 49 0
      internal/topo/transform/template.go
  35. 2 1
      pkg/api/stream.go
  36. 1 6
      tools/plugin_server/plugin_test_server.go

+ 0 - 12
docs/en_US/extension/native/overview.md

@@ -52,16 +52,4 @@ In the plugin source code, developers can access the dependencies of file system
 
 
 ```go
 ```go
 ctx.GetRootPath()
 ctx.GetRootPath()
-```
-
-## Parse dynamic properties
-
-For customized sink plugins, users may still want to support [dynamic properties](../../rules/overview.md#dynamic-properties) like the built-in ones. 
-
-In the context object, a function `ParseDynamicProp` is provided to support the parsing of the dynamic property syntax. In the customized sink, developers can specify some properties to be dynamic according to the business logic. And in the plugin code, use this function to parse the user input in the collect function or elsewhere.
-
-```go
-// Parse the prop of jsonpath syntax against the current data.
-value, err := ctx.ParseDynamicProp(s.prop, data)
-// Use the parsed value for the following business logic.
 ```
 ```

+ 19 - 1
docs/en_US/extension/native/sink.md

@@ -22,7 +22,13 @@ The next task is to implement _open_ method. The implementation should be synchr
 Open(ctx StreamContext) error
 Open(ctx StreamContext) error
 ```  
 ```  
 
 
-The main task for a Sink is to implement _collect_ method. The function will be invoked when eKuiper feed any data into the sink. As an infinite stream, this function will be invoked continuously. The task of this function is to publish data to the external system. The first parameter is the context, and the second parameter is the data received from eKuiper.
+The main task for a Sink is to implement _collect_ method. The function will be invoked when eKuiper feed any data into the sink. As an infinite stream, this function will be invoked continuously. The task of this function is to publish data to the external system. The first parameter is the context, and the second parameter is the data received from eKuiper. The data could be 2 types:
+1. Map slice `[]map[string]interface{}`: this is the default data type.
+2. Map `map[string]interface{}`: this is a possible data type when the `sendSingle` property is set.
+
+Most of the time, the map content will be the selective fields. But if `sendError` property is enabled and there are errors happen in the rule, the map content will be like `{"error":"error message here"}`.
+
+The developer can fetch the transformed result from the context method `ctx.TransformOutput()`. The return values are the transformed value of `[]byte` type. Currently, it will be transformed to the json byte array be default or formatted with the set [`dataTemlate` property](../../rules/overview.md#data-template). If the value is transformed by dataTemplate, the second return value will be true. 
 
 
 ```go
 ```go
 //Called when each row of data has transferred to this sink
 //Called when each row of data has transferred to this sink
@@ -45,6 +51,18 @@ func MySink() api.Sink {
 
 
 The [Memory Sink](https://github.com/lf-edge/ekuiper/blob/master/extensions/sinks/memory/memory.go) is a good example.
 The [Memory Sink](https://github.com/lf-edge/ekuiper/blob/master/extensions/sinks/memory/memory.go) is a good example.
 
 
+#### Parse dynamic properties
+
+For customized sink plugins, users may still want to support [dynamic properties](../../rules/overview.md#dynamic-properties) like the built-in ones.
+
+In the context object, a function `ParseDynamicProp` is provided to support the parsing of the dynamic property syntax. In the customized sink, developers can specify some properties to be dynamic according to the business logic. And in the plugin code, use this function to parse the user input in the collect function or elsewhere.
+
+```go
+// Parse the prop of jsonpath syntax against the current data.
+value, err := ctx.ParseDynamicProp(s.prop, data)
+// Use the parsed value for the following business logic.
+```
+
 ### Package the sink
 ### Package the sink
 Build the implemented sink as a go plugin and make sure the output so file resides in the plugins/sinks folder.
 Build the implemented sink as a go plugin and make sure the output so file resides in the plugins/sinks folder.
 
 

+ 2 - 2
docs/en_US/extension/portable/overview.md

@@ -84,6 +84,6 @@ To manage the portable plugins in runtime, we can use the [REST](../../restapi/p
 
 
 Currently, there are two limitations compared to native plugins:
 Currently, there are two limitations compared to native plugins:
 
 
-1. [State](../native/overview.md#state-storage) and Connection API are not supported. Whereas, state is planned to be supported in the future.
-2. In the [Function interface], the arguments cannot be transferred with the AST which means the user cannot validate the argument types. The only validation supported may be the argument count.
+1. Support less context methods. For example, [State](../native/overview.md#state-storage) and Connection API are not supported; dynamic properties are required to be parsed by developers. Whereas, state is planned to be supported in the future.
+2. In the function interface, the arguments cannot be transferred with the AST which means the user cannot validate the argument types. The only validation supported may be the argument count. In the sink interface, the collect function parameter data will always be a json encoded `[]byte`, developers need to decode by themselves.
 
 

+ 0 - 10
docs/zh_CN/extension/native/overview.md

@@ -65,14 +65,4 @@ func (f *accumulateWordCountFunc) Exec(args []interface{}, ctx api.FunctionConte
 
 
 ```go
 ```go
 ctx.GetRootPath()
 ctx.GetRootPath()
-```
-
-## 解析动态属性
-
-在自定义的 sink 插件中,用户可能仍然想要像内置的 sink 一样支持[动态属性](../../rules/overview.md#动态属性)。 我们在 context 对象中提供了 `ParseDynamicProp` 方法使得开发者可以方便地解析动态属性并应用于插件中。开发组应当根据业务逻辑,设计那些属性支持动态值。然后在代码编写时,使用此方法解析用户传入的属性值。
-
-```go
-// Parse the prop of jsonpath syntax against the current data.
-value, err := ctx.ParseDynamicProp(s.prop, data)
-// Use the parsed value for the following business logic.
 ```
 ```

+ 19 - 1
docs/zh_CN/extension/native/sink.md

@@ -23,7 +23,15 @@ Configure(props map[string]interface{}) error
 Open(ctx StreamContext) error
 Open(ctx StreamContext) error
 ```
 ```
 
 
-Sink (目标)的主要任务是实现 _collect_ 方法。 当 eKuiper 将任何数据输入 Sink (目标)时,将调用该函数。 作为无限流,此函数将被连续调用。 此功能的任务是将数据发布到外部系统。 第一个参数是上下文,第二个参数是从 eKuiper 接收的数据。
+Sink (目标)的主要任务是实现 _collect_ 方法。 当 eKuiper 将任何数据输入 Sink (目标)时,将调用该函数。 作为无限流,此函数将被连续调用。 此功能的任务是将数据发布到外部系统。 第一个参数是上下文,第二个参数是从 eKuiper 接收的数据。接收到的数据有2种可能的类型:
+1. Map数组 `[]map[string]interface{}`: 默认类型。
+2. Map `map[string]interface{}`: 当 [`sendSingle` 属性](../../rules/overview.md#目标/动作)设置为 true 时,可能收到此类型。
+
+大多数时候,收到到的 map 的内容为规则选择的列的值。但是,如果 `sendError` 属性设置为 true 且规则有错误,则错误信息会放到 map 里,形似 `{"error":"error message here"}` 。
+
+开发者可通过 context 方法`ctx.TransformOutput()` 获取转换后的字节数组。默认情况下,该方法将返回 json 编码的字节数组,若 [`dataTemlate` 属性](../../rules/overview.md#数据模板) 有设置,则返回格式化后的字符串数组,且第二个返回值设为 true,表示结果已经过变换。
+
+需要注意的是,当 [`dataTemlate` 属性](../../rules/overview.md#数据模板) 设置时,开发者可通过 context 方法`ctx.TransformOutput()` 获取转换后的数据。若数据模板未设置,则该方法返回空值。
 
 
 ```go
 ```go
 //Called when each row of data has transferred to this sink
 //Called when each row of data has transferred to this sink
@@ -46,6 +54,16 @@ func MySink() api.Sink {
 
 
 [Memory Sink](https://github.com/lf-edge/ekuiper/blob/master/extensions/sinks/memory/memory.go) 是一个很好的示例。
 [Memory Sink](https://github.com/lf-edge/ekuiper/blob/master/extensions/sinks/memory/memory.go) 是一个很好的示例。
 
 
+#### 解析动态属性
+
+在自定义的 sink 插件中,用户可能仍然想要像内置的 sink 一样支持[动态属性](../../rules/overview.md#动态属性)。 我们在 context 对象中提供了 `ParseDynamicProp` 方法使得开发者可以方便地解析动态属性并应用于插件中。开发组应当根据业务逻辑,设计那些属性支持动态值。然后在代码编写时,使用此方法解析用户传入的属性值。
+
+```go
+// Parse the prop of jsonpath syntax against the current data.
+value, err := ctx.ParseDynamicProp(s.prop, data)
+// Use the parsed value for the following business logic.
+```
+
 ### 将 Sink (目标)打包
 ### 将 Sink (目标)打包
 将实现的 Sink (目标)构建为 go 插件,并确保输出的 so 文件位于 plugins/sinks 文件夹中。
 将实现的 Sink (目标)构建为 go 插件,并确保输出的 so 文件位于 plugins/sinks 文件夹中。
 
 

+ 3 - 3
docs/zh_CN/extension/portable/overview.md

@@ -75,7 +75,7 @@
 要在运行时管理可移植插件,我们可以使用 [REST](../../restapi/plugins.md) 或 [CLI](../../cli/plugins.md) 命令。
 要在运行时管理可移植插件,我们可以使用 [REST](../../restapi/plugins.md) 或 [CLI](../../cli/plugins.md) 命令。
 ## 限制
 ## 限制
 
 
-目前,与原生插件相比,有两个限制
+目前,与原生插件相比,有两个方面的区别
 
 
-1. 不支持 [State](../native/overview.md#state-storage) 和 Connection API。而 state 计划在未来得到支持。
-2. 在函数接口中,参数不能通过AST传递,即用户无法验证参数类型。唯一支持的验证可能是参数计数。
+1. 支持的 Context 方法较少,例如 [State](../native/overview.md#state-storage) , Connection API 暂不支持;动态参数解析需要开发者自行计算。而 state 计划在未来得到支持。
+2. 在函数接口中,参数不能通过AST传递,即用户无法验证参数类型。唯一支持的验证可能是参数计数。在 Sink 接口中,collect 函数的数据类型为 json 编码的 `[]byte`,需要开发者自行解码。

+ 2 - 2
extensions/sinks/file/file.go

@@ -107,13 +107,13 @@ func (m *fileSink) save(logger api.Logger) {
 
 
 func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	if v, ok := item.([]byte); ok {
+	if v, _, err := ctx.TransformOutput(); err == nil {
 		logger.Debugf("file sink receive %s", item)
 		logger.Debugf("file sink receive %s", item)
 		m.mux.Lock()
 		m.mux.Lock()
 		m.results = append(m.results, v)
 		m.results = append(m.results, v)
 		m.mux.Unlock()
 		m.mux.Unlock()
 	} else {
 	} else {
-		logger.Debug("file sink receive non byte data")
+		return fmt.Errorf("file sink transform data error: %v", err)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 23 - 15
extensions/sinks/image/image.go

@@ -17,7 +17,6 @@ package main
 import (
 import (
 	"bytes"
 	"bytes"
 	"context"
 	"context"
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"image/jpeg"
 	"image/jpeg"
@@ -163,28 +162,37 @@ func (m *imageSink) saveFile(b []byte, fpath string) error {
 	return nil
 	return nil
 }
 }
 
 
-func (m *imageSink) saveFiles(msg []map[string][]byte) error {
-	for _, images := range msg {
-		for k, v := range images {
-			suffix := m.getSuffix()
-			fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
-			fpath := filepath.Join(m.path, fname)
-			m.saveFile(v, fpath)
+func (m *imageSink) saveFiles(images map[string]interface{}) error {
+	for k, v := range images {
+		image, ok := v.([]byte)
+		if !ok {
+			return fmt.Errorf("found none bytes data %v for path %s", image, k)
 		}
 		}
+		suffix := m.getSuffix()
+		fname := fmt.Sprintf(`%s%s.%s`, k, suffix, m.format)
+		fpath := filepath.Join(m.path, fname)
+		m.saveFile(image, fpath)
 	}
 	}
 	return nil
 	return nil
 }
 }
 
 
 func (m *imageSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *imageSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	if v, ok := item.([]byte); ok {
-		var msg []map[string][]byte
-		if err := json.Unmarshal(v, &msg); nil != err {
-			return fmt.Errorf("The sink only accepts bytea field, other types are not supported.")
+	switch v := item.(type) {
+	case []map[string]interface{}:
+		var outer error
+		for _, vm := range v {
+			err := m.saveFiles(vm)
+			if err != nil {
+				outer = err
+				logger.Error(err)
+			}
 		}
 		}
-		return m.saveFiles(msg)
-	} else {
-		logger.Debug("image sink receive non byte data")
+		return outer
+	case map[string]interface{}:
+		return m.saveFiles(v)
+	default:
+		fmt.Errorf("image sink receive invalid data %v", item)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 53 - 38
extensions/sinks/influx/influx.go

@@ -1,3 +1,17 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 // You may obtain a copy of the License at
@@ -15,7 +29,6 @@
 package main
 package main
 
 
 import (
 import (
-	"encoding/json"
 	_ "github.com/influxdata/influxdb1-client/v2"
 	_ "github.com/influxdata/influxdb1-client/v2"
 	client "github.com/influxdata/influxdb1-client/v2"
 	client "github.com/influxdata/influxdb1-client/v2"
 	api "github.com/lf-edge/ekuiper/pkg/api"
 	api "github.com/lf-edge/ekuiper/pkg/api"
@@ -36,8 +49,6 @@ type influxSink struct {
 	fieldmap     map[string]interface{}
 	fieldmap     map[string]interface{}
 }
 }
 
 
-type ListMap []map[string]interface{}
-
 func (m *influxSink) Configure(props map[string]interface{}) error {
 func (m *influxSink) Configure(props map[string]interface{}) error {
 	if i, ok := props["addr"]; ok {
 	if i, ok := props["addr"]; ok {
 		if i, ok := i.(string); ok {
 		if i, ok := i.(string); ok {
@@ -99,45 +110,49 @@ func (m *influxSink) Open(ctx api.StreamContext) (err error) {
 
 
 func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-
-	if v, ok := data.([]byte); ok {
-		var out ListMap
-		if err := json.Unmarshal([]byte(v), &out); err != nil {
-			logger.Debug("Failed to unmarshal data with error %s.\n", err)
-			return err
-		}
-		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
-			Database:  m.databasename,
-			Precision: "ns",
-		})
-		if err != nil {
-			logger.Debug(err)
-			return err
-		}
-		tags := map[string]string{m.tagkey: m.tagvalue}
-		fields := strings.Split(m.fields, ",")
-		m.fieldmap = make(map[string]interface{}, 100)
-		for _, field := range fields {
-			if out[0][field] != nil {
-				m.fieldmap[field] = out[0][field]
-			}
+	var output map[string]interface{}
+	switch v := data.(type) {
+	case map[string]interface{}:
+		output = v
+	case []map[string]interface{}:
+		if len(v) > 0 {
+			output = v[0]
+		} else {
+			ctx.GetLogger().Warnf("Get empty data %v, just return", data)
+			return nil
 		}
 		}
+	}
 
 
-		pt, err := client.NewPoint(m.measurement, tags, m.fieldmap, time.Now())
-		if err != nil {
-			logger.Debug(err)
-			return err
-		}
-		bp.AddPoint(pt)
-		err = m.cli.Write(bp)
-		if err != nil {
-			logger.Debug(err)
-			return err
+	bp, err := client.NewBatchPoints(client.BatchPointsConfig{
+		Database:  m.databasename,
+		Precision: "ns",
+	})
+	if err != nil {
+		logger.Debug(err)
+		return err
+	}
+	tags := map[string]string{m.tagkey: m.tagvalue}
+	fields := strings.Split(m.fields, ",")
+	m.fieldmap = make(map[string]interface{}, 100)
+	for _, field := range fields {
+		if output[field] != nil {
+			m.fieldmap[field] = output[field]
 		}
 		}
-		logger.Debug("insert success")
-	} else {
-		logger.Debug("insert failed")
 	}
 	}
+
+	pt, err := client.NewPoint(m.measurement, tags, m.fieldmap, time.Now())
+	if err != nil {
+		logger.Debug(err)
+		return err
+	}
+	bp.AddPoint(pt)
+	err = m.cli.Write(bp)
+	if err != nil {
+		logger.Debug(err)
+		return err
+	}
+	logger.Debug("insert success")
+
 	return nil
 	return nil
 }
 }
 
 

+ 37 - 50
extensions/sinks/redis/redis.go

@@ -15,7 +15,6 @@
 package main
 package main
 
 
 import (
 import (
-	"encoding/json"
 	"errors"
 	"errors"
 	"time"
 	"time"
 
 
@@ -125,47 +124,17 @@ func (r *RedisSink) Open(ctx api.StreamContext) (err error) {
 
 
 func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-
-	if v, ok := data.([]byte); ok {
-		if r.field != "" {
-			if !r.sendSingle {
-				var out []map[string]interface{}
-				if err := json.Unmarshal(v, &out); err != nil {
-					logger.Debug("Failed to unmarshal data with error: ", err, " data:", string(v))
-					return err
-				}
-
-				for _, m := range out {
-					key := r.field
-					field, ok := m[key].(string)
-					if ok {
-						key = field
-					}
-
-					if r.dataType == "list" {
-						err := r.cli.LPush(key, v).Err()
-						if err != nil {
-							logger.Error(err)
-							return err
-						}
-						logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
-					} else {
-						err := r.cli.Set(key, v, r.expiration*time.Second).Err()
-						if err != nil {
-							logger.Error(err)
-							return err
-						}
-						logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
-					}
-				}
-			} else {
-				var out map[string]interface{}
-				if err := json.Unmarshal(v, &out); err != nil {
-					logger.Debug("Failed to unmarshal data with error: ", err, " data:", string(v))
-					return err
-				}
+	v, _, err := ctx.TransformOutput()
+	if err != nil {
+		logger.Error(err)
+		return err
+	}
+	if r.field != "" {
+		switch out := data.(type) {
+		case []map[string]interface{}:
+			for _, m := range out {
 				key := r.field
 				key := r.field
-				field, ok := out[key].(string)
+				field, ok := m[key].(string)
 				if ok {
 				if ok {
 					key = field
 					key = field
 				}
 				}
@@ -186,29 +155,47 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 					logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
 					logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
 				}
 				}
 			}
 			}
+		case map[string]interface{}:
+			key := r.field
+			field, ok := out[key].(string)
+			if ok {
+				key = field
+			}
 
 
-		} else if r.key != "" {
 			if r.dataType == "list" {
 			if r.dataType == "list" {
-				err := r.cli.LPush(r.key, v).Err()
+				err := r.cli.LPush(key, v).Err()
 				if err != nil {
 				if err != nil {
 					logger.Error(err)
 					logger.Error(err)
 					return err
 					return err
 				}
 				}
-				logger.Debugf("send redis list success, key:%s data: %s", r.key, string(v))
+				logger.Debugf("send redis list success, key:%s data: %s", key, string(v))
 			} else {
 			} else {
-				err := r.cli.Set(r.key, v, r.expiration*time.Second).Err()
+				err := r.cli.Set(key, v, r.expiration*time.Second).Err()
 				if err != nil {
 				if err != nil {
 					logger.Error(err)
 					logger.Error(err)
 					return err
 					return err
 				}
 				}
-				logger.Debugf("send redis string success, key:%s data: %s", r.key, string(v))
+				logger.Debugf("send redis string success, key:%s data: %s", key, string(v))
 			}
 			}
 		}
 		}
-
-		logger.Debug("insert success", string(v))
-	} else {
-		logger.Debug("insert failed data is not []byte data:", data)
+	} else if r.key != "" {
+		if r.dataType == "list" {
+			err := r.cli.LPush(r.key, v).Err()
+			if err != nil {
+				logger.Error(err)
+				return err
+			}
+			logger.Debugf("send redis list success, key:%s data: %s", r.key, string(v))
+		} else {
+			err := r.cli.Set(r.key, v, r.expiration*time.Second).Err()
+			if err != nil {
+				logger.Error(err)
+				return err
+			}
+			logger.Debugf("send redis string success, key:%s data: %s", r.key, string(v))
+		}
 	}
 	}
+	logger.Debug("insert success %v", data)
 	return nil
 	return nil
 }
 }
 
 

+ 3 - 10
extensions/sinks/tdengine/tdengine.go

@@ -18,7 +18,6 @@ package main
 
 
 import (
 import (
 	"database/sql"
 	"database/sql"
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -156,17 +155,11 @@ func (m *taosSink) Open(ctx api.StreamContext) (err error) {
 
 
 func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	data, ok := item.([]byte)
-	if !ok {
-		logger.Debug("tdengine sink receive non string data")
-		return nil
-	}
 	logger.Debugf("tdengine sink receive %s", item)
 	logger.Debugf("tdengine sink receive %s", item)
 
 
-	var sliData []map[string]interface{}
-	err := json.Unmarshal(data, &sliData)
-	if nil != err {
-		return err
+	sliData, ok := item.([]map[string]interface{})
+	if !ok {
+		return fmt.Errorf("tdengine sink receive non map slice data: %#v", item)
 	}
 	}
 	for _, mapData := range sliData {
 	for _, mapData := range sliData {
 		sql, err := m.conf.buildSql(ctx, mapData)
 		sql, err := m.conf.buildSql(ctx, mapData)

+ 1 - 1
extensions/sinks/zmq/zmq.go

@@ -66,7 +66,7 @@ func (m *zmqSink) Open(ctx api.StreamContext) (err error) {
 
 
 func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) (err error) {
 func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) (err error) {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	if v, ok := item.([]byte); ok {
+	if v, _, err := ctx.TransformOutput(); err == nil {
 		logger.Debugf("zmq sink receive %s", item)
 		logger.Debugf("zmq sink receive %s", item)
 		if m.topic == "" {
 		if m.topic == "" {
 			_, err = m.publisher.Send(string(v), 0)
 			_, err = m.publisher.Send(string(v), 0)

+ 20 - 1
internal/pkg/httpx/http.go

@@ -32,6 +32,7 @@ import (
 
 
 var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
 var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
 
 
+// Send v must be a []byte or map
 func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, sendSingle bool, v interface{}) (*http.Response, error) {
 func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, sendSingle bool, v interface{}) (*http.Response, error) {
 	var req *http.Request
 	var req *http.Request
 	var err error
 	var err error
@@ -47,7 +48,11 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string
 		case []byte:
 		case []byte:
 			body = bytes.NewBuffer(t)
 			body = bytes.NewBuffer(t)
 		default:
 		default:
-			return nil, fmt.Errorf("invalid content: %v", v)
+			vj, err := json.Marshal(v)
+			if err != nil {
+				return nil, fmt.Errorf("invalid content: %v", v)
+			}
+			body = bytes.NewBuffer(vj)
 		}
 		}
 		req, err = http.NewRequest(method, u, body)
 		req, err = http.NewRequest(method, u, body)
 		if err != nil {
 		if err != nil {
@@ -105,6 +110,20 @@ func convertToMap(v interface{}, sendSingle bool) (map[string]interface{}, error
 			}
 			}
 		}
 		}
 		return r, nil
 		return r, nil
+	case map[string]interface{}:
+		return t, nil
+	case []map[string]interface{}:
+		r := make(map[string]interface{})
+		if sendSingle {
+			return nil, fmt.Errorf("invalid content: %v", t)
+		} else {
+			j, err := json.Marshal(t)
+			if err != nil {
+				return nil, err
+			}
+			r["result"] = string(j)
+		}
+		return r, nil
 	default:
 	default:
 		return nil, fmt.Errorf("invalid content: %v", v)
 		return nil, fmt.Errorf("invalid content: %v", v)
 	}
 	}

+ 6 - 7
internal/plugin/portable/runtime/sink.go

@@ -15,7 +15,6 @@
 package runtime
 package runtime
 
 
 import (
 import (
-	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 )
 )
 
 
@@ -81,12 +80,12 @@ func (ps *PortableSink) Open(ctx api.StreamContext) error {
 
 
 func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("Receive %+v", item)
 	ctx.GetLogger().Debugf("Receive %+v", item)
-	// TODO item type
-	switch input := item.(type) {
-	case []byte:
-		return ps.dataCh.Send(input)
-	default:
-		return ps.dataCh.Send([]byte(fmt.Sprintf("%v", input)))
+	if val, _, err := ctx.TransformOutput(); err == nil {
+		ctx.GetLogger().Debugf("Send %s", val)
+		return ps.dataCh.Send(val)
+	} else {
+		ctx.GetLogger().Errorf("Found error %s", err.Error())
+		return err
 	}
 	}
 }
 }
 
 

+ 42 - 2
internal/topo/context/default_test.go

@@ -44,12 +44,12 @@ func TestState(t *testing.T) {
 		}
 		}
 	)
 	)
 	//initialization
 	//initialization
-	store, err := state.CreateStore(ruleId, api.AtLeastOnce)
+	cStore, err := state.CreateStore(ruleId, api.AtLeastOnce)
 	if err != nil {
 	if err != nil {
 		t.Errorf("Get store for rule %s error: %s", ruleId, err)
 		t.Errorf("Get store for rule %s error: %s", ruleId, err)
 		return
 		return
 	}
 	}
-	ctx := Background().WithMeta("testStateRule", "op1", store).(*DefaultContext)
+	ctx := Background().WithMeta("testStateRule", "op1", cStore).(*DefaultContext)
 	defer cleanStateData()
 	defer cleanStateData()
 	// Do state function
 	// Do state function
 	_ = ctx.IncrCounter("key1", 20)
 	_ = ctx.IncrCounter("key1", 20)
@@ -198,3 +198,43 @@ func TestDynamicProp(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestTransition(t *testing.T) {
+	mockFunc := func(d interface{}) ([]byte, bool, error) {
+		return []byte(fmt.Sprintf("%v", d)), true, nil
+	}
+	var tests = []struct {
+		trans *TransConfig
+		r     []byte
+	}{
+		{
+			trans: &TransConfig{
+				Data:  "hello",
+				TFunc: mockFunc,
+			},
+			r: []byte(`hello`),
+		}, {
+			trans: &TransConfig{
+				Data:  "world",
+				TFunc: mockFunc,
+			},
+			r: []byte(`world`),
+		}, {
+			trans: &TransConfig{
+				Data:  map[string]interface{}{"a": "hello"},
+				TFunc: mockFunc,
+			},
+			r: []byte(`map[a:hello]`),
+		},
+	}
+
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	ctx := Background().WithMeta("testTransRule", "op1", &state.MemoryStore{}).(*DefaultContext)
+	for i, tt := range tests {
+		nc := WithValue(ctx, TransKey, tt.trans)
+		r, _, _ := nc.TransformOutput()
+		if !reflect.DeepEqual(tt.r, r) {
+			t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, string(tt.r), string(r))
+		}
+	}
+}

+ 36 - 0
internal/topo/context/transform.go

@@ -0,0 +1,36 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package context
+
+import (
+	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
+)
+
+const TransKey = "$$trans"
+
+type TransConfig struct {
+	Data  interface{}
+	TFunc transform.TransFunc
+}
+
+// TransformOutput Lazy transform output to bytes
+func (c *DefaultContext) TransformOutput() ([]byte, bool, error) {
+	cc, ok := c.Value(TransKey).(*TransConfig)
+	if ok {
+		return cc.TFunc(cc.Data)
+	}
+	return nil, false, fmt.Errorf("no transform configured")
+}

+ 1 - 6
internal/topo/memory/manager_test.go

@@ -75,12 +75,7 @@ func TestSharedInmemoryNode(t *testing.T) {
 	list := make([]map[string]interface{}, 0)
 	list := make([]map[string]interface{}, 0)
 	list = append(list, data)
 	list = append(list, data)
 	go func() {
 	go func() {
-		var buf []byte
-		buf, err = asJsonBytes(list)
-		if err != nil {
-			t.Error(err)
-		}
-		err = snk.Collect(ctx, buf)
+		err = snk.Collect(ctx, list)
 		if err != nil {
 		if err != nil {
 			t.Error(err)
 			t.Error(err)
 		}
 		}

+ 12 - 20
internal/topo/memory/sink.go

@@ -15,7 +15,6 @@
 package memory
 package memory
 
 
 import (
 import (
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"strings"
 	"strings"
@@ -48,29 +47,22 @@ func (s *sink) Configure(props map[string]interface{}) error {
 
 
 func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 	ctx.GetLogger().Debugf("receive %+v", data)
 	ctx.GetLogger().Debugf("receive %+v", data)
-	if b, casted := data.([]byte); casted {
-		d, err := toMap(b)
-		if err != nil {
-			return err
-		}
-		for _, el := range d {
-			produce(ctx, s.topic, el)
-		}
-		return nil
+	var outs []map[string]interface{}
+	switch d := data.(type) {
+	case []map[string]interface{}:
+		outs = d
+	case map[string]interface{}:
+		outs = append(outs, d)
+	default:
+		return fmt.Errorf("unrecognized format of %s", data)
 	}
 	}
-	return fmt.Errorf("unrecognized format of %s", data)
+	for _, el := range outs {
+		produce(ctx, s.topic, el)
+	}
+	return nil
 }
 }
 
 
 func (s *sink) Close(ctx api.StreamContext) error {
 func (s *sink) Close(ctx api.StreamContext) error {
 	ctx.GetLogger().Debugf("closing memory sink")
 	ctx.GetLogger().Debugf("closing memory sink")
 	return closeSink(s.topic)
 	return closeSink(s.topic)
 }
 }
-
-func toMap(data []byte) ([]map[string]interface{}, error) {
-	res := make([]map[string]interface{}, 0)
-	err := json.Unmarshal(data, &res)
-	if err != nil {
-		return nil, err
-	}
-	return res, nil
-}

+ 120 - 201
internal/topo/node/sink_node.go

@@ -15,19 +15,30 @@
 package node
 package node
 
 
 import (
 import (
-	"bytes"
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/binder/io"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
-	ct "github.com/lf-edge/ekuiper/internal/template"
+	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
+	"strings"
 	"sync"
 	"sync"
-	"text/template"
 	"time"
 	"time"
 )
 )
 
 
+type SinkConf struct {
+	Concurrency       int    `json:"concurrency"`
+	RunAsync          bool   `json:"runAsync"`
+	RetryInterval     int    `json:"retryInterval"`
+	RetryCount        int    `json:"retryCount"`
+	CacheLength       int    `json:"cacheLength"`
+	CacheSaveInterval int    `json:"cacheSaveInterval"`
+	Omitempty         bool   `json:"omitIfEmpty"`
+	SendSingle        bool   `json:"sendSingle"`
+	DataTemplate      string `json:"dataTemplate"`
+}
+
 type SinkNode struct {
 type SinkNode struct {
 	*defaultSinkNode
 	*defaultSinkNode
 	//static
 	//static
@@ -64,7 +75,7 @@ func NewSinkNode(name string, sinkType string, props map[string]interface{}) *Si
 	}
 	}
 }
 }
 
 
-//Only for mock source, do not use it in production
+// NewSinkNodeWithSink Only for mock source, do not use it in production
 func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
 func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{}) *SinkNode {
 	return &SinkNode{
 	return &SinkNode{
 		defaultSinkNode: &defaultSinkNode{
 		defaultSinkNode: &defaultSinkNode{
@@ -89,85 +100,50 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 		m.tch = make(chan struct{})
 		m.tch = make(chan struct{})
 	}
 	}
 	go func() {
 	go func() {
-		if c, ok := m.options["concurrency"]; ok {
-			if t, err := cast.ToInt(c, cast.STRICT); err != nil || t <= 0 {
-				logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", c)
-			} else {
-				m.concurrency = t
-			}
+		sconf := &SinkConf{
+			Concurrency:       1,
+			RunAsync:          false,
+			RetryInterval:     1000,
+			RetryCount:        0,
+			CacheLength:       1024,
+			CacheSaveInterval: 1000,
+			Omitempty:         false,
+			SendSingle:        false,
+			DataTemplate:      "",
 		}
 		}
-		runAsync := false
-		if c, ok := m.options["runAsync"]; ok {
-			if t, ok := c.(bool); !ok {
-				logger.Warnf("invalid type for runAsync property, should be bool but found %t", c)
-			} else {
-				runAsync = t
-			}
+		err := cast.MapToStruct(m.options, sconf)
+		if err != nil {
+			result <- fmt.Errorf("read properties %v fail with error: %v", m.options, err)
+			return
 		}
 		}
-		retryInterval := 1000
-		if c, ok := m.options["retryInterval"]; ok {
-			if t, err := cast.ToInt(c, cast.STRICT); err != nil || t < 0 {
-				logger.Warnf("invalid type for retryInterval property, should be positive integer but found %t", c)
-			} else {
-				retryInterval = t
-			}
+		if sconf.Concurrency <= 0 {
+			logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", sconf.Concurrency)
+			sconf.Concurrency = 1
 		}
 		}
-		retryCount := 3
-		if c, ok := m.options["retryCount"]; ok {
-			if t, err := cast.ToInt(c, cast.STRICT); err != nil || t < 0 {
-				logger.Warnf("invalid type for retryCount property, should be positive integer but found %t", c)
-			} else {
-				retryCount = t
-			}
+		m.concurrency = sconf.Concurrency
+		if sconf.RetryInterval <= 0 {
+			logger.Warnf("invalid type for retryInterval property, should be positive integer but found %t", sconf.RetryInterval)
+			sconf.RetryInterval = 1000
 		}
 		}
-		cacheLength := 1024
-		if c, ok := m.options["cacheLength"]; ok {
-			if t, err := cast.ToInt(c, cast.STRICT); err != nil || t < 0 {
-				logger.Warnf("invalid type for cacheLength property, should be positive integer but found %t", c)
-			} else {
-				cacheLength = t
-			}
+		if sconf.RetryCount < 0 {
+			logger.Warnf("invalid type for retryCount property, should be positive integer but found %t", sconf.RetryCount)
+			sconf.RetryCount = 3
 		}
 		}
-		cacheSaveInterval := 1000
-		if c, ok := m.options["cacheSaveInterval"]; ok {
-			if t, err := cast.ToInt(c, cast.STRICT); err != nil || t < 0 {
-				logger.Warnf("invalid type for cacheSaveInterval property, should be positive integer but found %t", c)
-			} else {
-				cacheSaveInterval = t
-			}
+		if sconf.CacheLength < 0 {
+			logger.Warnf("invalid type for cacheLength property, should be positive integer but found %t", sconf.CacheLength)
+			sconf.CacheLength = 1024
 		}
 		}
-		omitIfEmpty := false
-		if c, ok := m.options["omitIfEmpty"]; ok {
-			if t, ok := c.(bool); !ok {
-				logger.Warnf("invalid type for omitIfEmpty property, should be a bool value 'true/false'.", c)
-			} else {
-				omitIfEmpty = t
-			}
-		}
-		sendSingle := false
-		if c, ok := m.options["sendSingle"]; ok {
-			if t, ok := c.(bool); !ok {
-				logger.Warnf("invalid type for sendSingle property, should be a bool value 'true/false'.", c)
-			} else {
-				sendSingle = t
-			}
+		if sconf.CacheSaveInterval < 0 {
+			logger.Warnf("invalid type for cacheSaveInterval property, should be positive integer but found %t", sconf.CacheSaveInterval)
+			sconf.CacheSaveInterval = 1000
 		}
 		}
-		var tp *template.Template = nil
-		if c, ok := m.options["dataTemplate"]; ok {
-			if t, ok := c.(string); !ok {
-				logger.Warnf("invalid type for dateTemplate property, should be a string value.", c)
-			} else {
 
 
-				temp, err := template.New("sink").Funcs(ct.FuncMap).Parse(t)
-				if err != nil {
-					msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", t, err)
-					logger.Warnf(msg)
-					result <- fmt.Errorf(msg)
-					return
-				} else {
-					tp = temp
-				}
-			}
+		tf, err := transform.GenTransform(sconf.DataTemplate)
+		if err != nil {
+			msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", sconf.DataTemplate, err)
+			logger.Warnf(msg)
+			result <- fmt.Errorf(msg)
+			return
 		}
 		}
 
 
 		m.reset()
 		m.reset()
@@ -210,16 +186,16 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 					for {
 					for {
 						select {
 						select {
 						case data := <-m.input:
 						case data := <-m.input:
-							if newdata, processed := m.preprocess(data); processed {
+							if temp, processed := m.preprocess(data); processed {
 								break
 								break
 							} else {
 							} else {
-								data = newdata
+								data = temp
 							}
 							}
 							stats.SetBufferLength(int64(len(m.input)))
 							stats.SetBufferLength(int64(len(m.input)))
-							if runAsync {
-								go doCollect(sink, data, stats, omitIfEmpty, sendSingle, tp, ctx)
+							if sconf.RunAsync {
+								go doCollect(ctx, sink, data, stats, sconf, tf, nil)
 							} else {
 							} else {
-								doCollect(sink, data, stats, omitIfEmpty, sendSingle, tp, ctx)
+								doCollect(ctx, sink, data, stats, sconf, tf, nil)
 							}
 							}
 						case <-ctx.Done():
 						case <-ctx.Done():
 							logger.Infof("sink node %s instance %d done", m.name, instance)
 							logger.Infof("sink node %s instance %d done", m.name, instance)
@@ -235,23 +211,23 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 					logger.Infof("Creating sink cache")
 					logger.Infof("Creating sink cache")
 					var cache *Cache
 					var cache *Cache
 					if m.qos >= api.AtLeastOnce {
 					if m.qos >= api.AtLeastOnce {
-						cache = NewCheckpointbasedCache(m.input, cacheLength, m.tch, result, ctx)
+						cache = NewCheckpointbasedCache(m.input, sconf.CacheLength, m.tch, result, ctx)
 					} else {
 					} else {
-						cache = NewTimebasedCache(m.input, cacheLength, cacheSaveInterval, result, ctx)
+						cache = NewTimebasedCache(m.input, sconf.CacheLength, sconf.CacheSaveInterval, result, ctx)
 					}
 					}
 					for {
 					for {
 						select {
 						select {
 						case data := <-cache.Out:
 						case data := <-cache.Out:
-							if newdata, processed := m.preprocess(data.data); processed {
+							if temp, processed := m.preprocess(data.data); processed {
 								break
 								break
 							} else {
 							} else {
-								data.data = newdata
+								data.data = temp
 							}
 							}
 							stats.SetBufferLength(int64(len(m.input)))
 							stats.SetBufferLength(int64(len(m.input)))
-							if runAsync {
-								go doCollectCacheTuple(sink, data, stats, retryInterval, retryCount, omitIfEmpty, sendSingle, tp, cache.Complete, ctx)
+							if sconf.RunAsync {
+								go doCollect(ctx, sink, data, stats, sconf, tf, cache.Complete)
 							} else {
 							} else {
-								doCollectCacheTuple(sink, data, stats, retryInterval, retryCount, omitIfEmpty, sendSingle, tp, cache.Complete, ctx)
+								doCollect(ctx, sink, data, stats, sconf, tf, cache.Complete)
 							}
 							}
 						case <-ctx.Done():
 						case <-ctx.Done():
 							logger.Infof("sink node %s instance %d done", m.name, instance)
 							logger.Infof("sink node %s instance %d done", m.name, instance)
@@ -274,131 +250,74 @@ func (m *SinkNode) reset() {
 	m.statManagers = nil
 	m.statManagers = nil
 }
 }
 
 
-func extractInput(v []byte) ([]map[string]interface{}, error) {
-	var j []map[string]interface{}
-	if err := json.Unmarshal(v, &j); err != nil {
-		return nil, fmt.Errorf("fail to decode the input %s as json: %v", v, err)
-	}
-	return j, nil
-}
-
-func doCollect(sink api.Sink, item interface{}, stats StatManager, omitIfEmpty bool, sendSingle bool, tp *template.Template, ctx api.StreamContext) {
+func doCollect(ctx api.StreamContext, sink api.Sink, item interface{}, stats StatManager, sconf *SinkConf, tp transform.TransFunc, signalCh chan<- int) {
 	stats.IncTotalRecordsIn()
 	stats.IncTotalRecordsIn()
 	stats.ProcessTimeStart()
 	stats.ProcessTimeStart()
 	defer stats.ProcessTimeEnd()
 	defer stats.ProcessTimeEnd()
-	logger := ctx.GetLogger()
-	outdatas := getOutData(stats, ctx, item, omitIfEmpty, sendSingle, tp)
-
-	for _, outdata := range outdatas {
-		if err := sink.Collect(ctx, outdata); err != nil {
-			stats.IncTotalExceptions()
-			logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)
-		} else {
-			stats.IncTotalRecordsOut()
-		}
-	}
-}
-
-func getOutData(stats StatManager, ctx api.StreamContext, item interface{}, omitIfEmpty bool, sendSingle bool, tp *template.Template) [][]byte {
-	logger := ctx.GetLogger()
-	var outdatas [][]byte
+	var outs []map[string]interface{}
 	switch val := item.(type) {
 	switch val := item.(type) {
-	case []byte:
-		if omitIfEmpty && string(val) == "[{}]" {
-			return nil
+	case error:
+		outs = []map[string]interface{}{
+			{"error": val.Error()},
 		}
 		}
-		var (
-			err error
-			j   []map[string]interface{}
-		)
-		if sendSingle || tp != nil {
-			j, err = extractInput(val)
-			if err != nil {
-				logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), val, err)
-				stats.IncTotalExceptions()
-				return nil
-			}
-			logger.Debugf("receive %d records", len(j))
+	case []map[string]interface{}:
+		outs = val
+	default:
+		outs = []map[string]interface{}{
+			{"error": fmt.Sprintf("result is not a string but found %#v", val)},
 		}
 		}
-		if !sendSingle {
-			if tp != nil {
-				var output bytes.Buffer
-				err := tp.Execute(&output, j)
-				if err != nil {
-					logger.Warnf("sink node %s instance %d publish %s decode template error: %v", ctx.GetOpId(), ctx.GetInstanceId(), val, err)
-					stats.IncTotalExceptions()
-					return nil
-				}
-				outdatas = append(outdatas, output.Bytes())
-			} else {
-				outdatas = [][]byte{val}
-			}
-		} else {
-			for _, r := range j {
-				if tp != nil {
-					var output bytes.Buffer
-					err := tp.Execute(&output, r)
-					if err != nil {
-						logger.Warnf("sink node %s instance %d publish %s decode template error: %v", ctx.GetOpId(), ctx.GetInstanceId(), val, err)
-						stats.IncTotalExceptions()
-						return nil
-					}
-					outdatas = append(outdatas, output.Bytes())
-				} else {
-					if ot, e := json.Marshal(r); e != nil {
-						logger.Warnf("sink node %s instance %d publish %s marshal error: %v", ctx.GetOpId(), ctx.GetInstanceId(), r, e)
-						stats.IncTotalExceptions()
-						return nil
-					} else {
-						outdatas = append(outdatas, ot)
-					}
-				}
-			}
+	}
+	if sconf.Omitempty && (item == nil || len(outs) == 0) {
+		ctx.GetLogger().Debugf("receive empty in sink")
+		return
+	}
+	if !sconf.SendSingle {
+		doCollectData(ctx, sink, outs, stats, sconf, tp, signalCh)
+	} else {
+		for _, d := range outs {
+			doCollectData(ctx, sink, d, stats, sconf, tp, signalCh)
 		}
 		}
-
-	case error:
-		outdatas = [][]byte{[]byte(fmt.Sprintf(`[{"error":"%s"}]`, val.Error()))}
-	default:
-		outdatas = [][]byte{[]byte(fmt.Sprintf(`[{"error":"result is not a string but found %#v"}]`, val))}
 	}
 	}
-	return outdatas
 }
 }
 
 
-func doCollectCacheTuple(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval, retryCount int, omitIfEmpty bool, sendSingle bool, tp *template.Template, signalCh chan<- int, ctx api.StreamContext) {
-	stats.IncTotalRecordsIn()
-	stats.ProcessTimeStart()
-	defer stats.ProcessTimeEnd()
-	logger := ctx.GetLogger()
-	outdatas := getOutData(stats, ctx, item.data, omitIfEmpty, sendSingle, tp)
-	for _, outdata := range outdatas {
-	outerloop:
-		for {
-			select {
-			case <-ctx.Done():
-				logger.Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
-				return
-			default:
-				if err := sink.Collect(ctx, outdata); err != nil {
-					stats.IncTotalExceptions()
-					logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)
-					if retryInterval > 0 && retryCount > 0 {
-						retryCount--
-						time.Sleep(time.Duration(retryInterval) * time.Millisecond)
-						logger.Debugf("try again")
-					} else {
-						break outerloop
-					}
+// doCollectData outData must be map or []map
+func doCollectData(ctx api.StreamContext, sink api.Sink, outData interface{}, stats StatManager, sconf *SinkConf, tf transform.TransFunc, signalCh chan<- int) {
+	vCtx := context.WithValue(ctx.(*context.DefaultContext), context.TransKey, &context.TransConfig{
+		Data:  outData,
+		TFunc: tf,
+	})
+	retries := sconf.RetryCount
+	for {
+		select {
+		case <-ctx.Done():
+			ctx.GetLogger().Infof("sink node %s instance %d stops data resending", ctx.GetOpId(), ctx.GetInstanceId())
+			return
+		default:
+			if err := sink.Collect(vCtx, outData); err != nil {
+				stats.IncTotalExceptions()
+				ctx.GetLogger().Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outData, err)
+				if sconf.RetryInterval > 0 && retries > 0 && strings.HasPrefix(err.Error(), "io error") {
+					retries--
+					time.Sleep(time.Duration(sconf.RetryInterval) * time.Millisecond)
+					ctx.GetLogger().Debugf("try again")
 				} else {
 				} else {
-					logger.Debugf("success")
-					stats.IncTotalRecordsOut()
+					return
+				}
+			} else {
+				ctx.GetLogger().Debugf("success")
+				stats.IncTotalRecordsOut()
+				if signalCh != nil {
+					cacheTuple, ok := outData.(*CacheTuple)
+					if !ok {
+						ctx.GetLogger().Warnf("got none cache tuple %v, should not happen", outData)
+					}
 					select {
 					select {
-					case signalCh <- item.index:
+					case signalCh <- cacheTuple.index:
 					default:
 					default:
-						logger.Warnf("sink cache missing response for %d", item.index)
+						ctx.GetLogger().Warnf("sink cache missing response for %d", cacheTuple.index)
 					}
 					}
-
-					break outerloop
 				}
 				}
+				return
 			}
 			}
 		}
 		}
 	}
 	}
@@ -425,12 +344,12 @@ func getSink(name string, action map[string]interface{}) (api.Sink, error) {
 	}
 	}
 }
 }
 
 
-//Override defaultNode
+// AddOutput Override defaultNode
 func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error {
 func (m *SinkNode) AddOutput(_ chan<- interface{}, name string) error {
 	return fmt.Errorf("fail to add output %s, sink %s cannot add output", name, m.name)
 	return fmt.Errorf("fail to add output %s, sink %s cannot add output", name, m.name)
 }
 }
 
 
-//Override defaultNode
+// Broadcast Override defaultNode
 func (m *SinkNode) Broadcast(_ interface{}) error {
 func (m *SinkNode) Broadcast(_ interface{}) error {
 	return fmt.Errorf("sink %s cannot add broadcast", m.name)
 	return fmt.Errorf("sink %s cannot add broadcast", m.name)
 }
 }
@@ -458,7 +377,7 @@ func (m *SinkNode) close(ctx api.StreamContext, logger api.Logger) {
 	}
 	}
 }
 }
 
 
-// Only called when checkpoint enabled
+// SaveCache Only called when checkpoint enabled
 func (m *SinkNode) SaveCache() {
 func (m *SinkNode) SaveCache() {
 	m.tch <- struct{}{}
 	m.tch <- struct{}{}
 }
 }

+ 14 - 13
internal/topo/node/sink_node_test.go

@@ -19,6 +19,7 @@ import (
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
 	"github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
+	"github.com/lf-edge/ekuiper/internal/xsql"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
 	"time"
 	"time"
@@ -28,7 +29,7 @@ func TestSinkTemplate_Apply(t *testing.T) {
 	conf.InitConf()
 	conf.InitConf()
 	var tests = []struct {
 	var tests = []struct {
 		config map[string]interface{}
 		config map[string]interface{}
-		data   []byte
+		data   []map[string]interface{}
 		result [][]byte
 		result [][]byte
 	}{
 	}{
 		{
 		{
@@ -36,71 +37,71 @@ func TestSinkTemplate_Apply(t *testing.T) {
 				"sendSingle":   true,
 				"sendSingle":   true,
 				"dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
 				"dataTemplate": `{"wrapper":"w1","content":{{toJson .}},"ab":"{{.ab}}"}`,
 			},
 			},
-			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
 			result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
 			result: [][]byte{[]byte(`{"wrapper":"w1","content":{"ab":"hello1"},"ab":"hello1"}`), []byte(`{"wrapper":"w1","content":{"ab":"hello2"},"ab":"hello2"}`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
 				"dataTemplate": `{"wrapper":"arr","content":{{json .}},"content0":{{json (index . 0)}},ab0":"{{index . 0 "ab"}}"}`,
 			},
 			},
-			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
 			result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
 			result: [][]byte{[]byte(`{"wrapper":"arr","content":[{"ab":"hello1"},{"ab":"hello2"}],"content0":{"ab":"hello1"},ab0":"hello1"}`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
 				"dataTemplate": `<div>results</div><ul>{{range .}}<li>{{.ab}}</li>{{end}}</ul>`,
 			},
 			},
-			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
 			result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
 			result: [][]byte{[]byte(`<div>results</div><ul><li>hello1</li><li>hello2</li></ul>`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"dataTemplate": `{"content":{{toJson .}}}`,
 				"dataTemplate": `{"content":{{toJson .}}}`,
 			},
 			},
-			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
 			result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
 			result: [][]byte{[]byte(`{"content":[{"ab":"hello1"},{"ab":"hello2"}]}`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"sendSingle":   true,
 				"sendSingle":   true,
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 			},
 			},
-			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
 			result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
 			result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"sendSingle":   true,
 				"sendSingle":   true,
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 				"dataTemplate": `{"newab":"{{.ab}}"}`,
 			},
 			},
-			data:   []byte(`[{"ab":"hello1"},{"ab":"hello2"}]`),
+			data:   []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}},
 			result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
 			result: [][]byte{[]byte(`{"newab":"hello1"}`), []byte(`{"newab":"hello2"}`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"sendSingle":   true,
 				"sendSingle":   true,
 				"dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
 				"dataTemplate": `{"__meta":{{toJson .__meta}},"temp":{{.temperature}}}`,
 			},
 			},
-			data:   []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
+			data:   []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
 			result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
 			result: [][]byte{[]byte(`{"__meta":{"messageid":45,"other":"mock"},"temp":33}`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
 				"dataTemplate": `[{"__meta":{{toJson (index . 0 "__meta")}},"temp":{{index . 0 "temperature"}}}]`,
 			},
 			},
-			data:   []byte(`[{"temperature":33,"humidity":70,"__meta": {"messageid":45,"other": "mock"}}]`),
+			data:   []map[string]interface{}{{"temperature": 33, "humidity": 70, "__meta": xsql.Metadata{"messageid": 45, "other": "mock"}}},
 			result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
 			result: [][]byte{[]byte(`[{"__meta":{"messageid":45,"other":"mock"},"temp":33}]`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
 				"dataTemplate": `[{{range $index, $ele := .}}{{if $index}},{{end}}{"result":{{add $ele.temperature $ele.humidity}}}{{end}}]`,
 			},
 			},
-			data:   []byte(`[{"temperature":33,"humidity":70},{"temperature":22.0,"humidity":50},{"temperature":11,"humidity":90}]`),
+			data:   []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
 			result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
 			result: [][]byte{[]byte(`[{"result":103},{"result":72},{"result":101}]`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
-				"dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90.0 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
+				"dataTemplate": `{{$counter := 0}}{{range $index, $ele := .}}{{if ne 90 $ele.humidity}}{{$counter = add $counter 1}}{{end}}{{end}}{"result":{{$counter}}}`,
 			},
 			},
-			data:   []byte(`[{"temperature":33,"humidity":70},{"temperature":22,"humidity":50},{"temperature":11,"humidity":90}]`),
+			data:   []map[string]interface{}{{"temperature": 33, "humidity": 70}, {"temperature": 22.0, "humidity": 50}, {"temperature": 11, "humidity": 90}},
 			result: [][]byte{[]byte(`{"result":2}`)},
 			result: [][]byte{[]byte(`{"result":2}`)},
 		}, {
 		}, {
 			config: map[string]interface{}{
 			config: map[string]interface{}{
 				"dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
 				"dataTemplate": `{"a":"{{base64 .a}}","b":"{{base64 .b}}","c":"{{b64enc .c}}","d":"{{b64enc .d}}","e":"{{base64 .e}}"}`,
 				"sendSingle":   true,
 				"sendSingle":   true,
 			},
 			},
-			data:   []byte(`[{"a":1,"b":3.1415,"c":"hello","d":"{\"hello\" : 3}","e":{"humidity":20,"temperature":30}}]`),
+			data:   []map[string]interface{}{{"a": 1, "b": 3.1415, "c": "hello", "d": "{\"hello\" : 3}", "e": map[string]interface{}{"humidity": 20, "temperature": 30}}},
 			result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
 			result: [][]byte{[]byte(`{"a":"MQ==","b":"My4xNDE1","c":"aGVsbG8=","d":"eyJoZWxsbyIgOiAzfQ==","e":"eyJodW1pZGl0eSI6MjAsInRlbXBlcmF0dXJlIjozMH0="}`)},
 		},
 		},
 	}
 	}

+ 1 - 0
internal/topo/node/source_pool.go

@@ -262,6 +262,7 @@ func (ss *sourceSingleton) detach(instanceKey string) bool {
 	} else {
 	} else {
 		// should not happen
 		// should not happen
 		ss.ctx.GetLogger().Warnf("detach source instance %s, not found", instanceKey)
 		ss.ctx.GetLogger().Warnf("detach source instance %s, not found", instanceKey)
+		return false
 	}
 	}
 	delete(ss.outputs, instanceKey)
 	delete(ss.outputs, instanceKey)
 	if len(ss.outputs) == 0 {
 	if len(ss.outputs) == 0 {

+ 1 - 1
internal/topo/operator/filter_test.go

@@ -402,7 +402,7 @@ func TestFilterPlan_Apply(t *testing.T) {
 			t.Errorf("statement %d parse error %s", i, err)
 			t.Errorf("statement %d parse error %s", i, err)
 			break
 			break
 		}
 		}
-		fv, afv := xsql.NewFunctionValuersForOp(nil)
+		fv, afv := xsql.NewFunctionValuersForOp(ctx)
 		pp := &FilterOp{Condition: stmt.Condition}
 		pp := &FilterOp{Condition: stmt.Condition}
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		if !reflect.DeepEqual(tt.result, result) {
 		if !reflect.DeepEqual(tt.result, result) {

+ 14 - 27
internal/topo/operator/math_func_test.go

@@ -15,7 +15,6 @@
 package operator
 package operator
 
 
 import (
 import (
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -40,7 +39,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(1), //Actually it should be 1, it's caused by json Unmarshal method, which convert int to float64
+				"a": 1,
 			}},
 			}},
 		},
 		},
 
 
@@ -121,7 +120,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(1),
+				"a": 1,
 			}},
 			}},
 		},
 		},
 
 
@@ -141,7 +140,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(1),
+				"a": 1,
 			}},
 			}},
 		},
 		},
 
 
@@ -152,7 +151,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(0),
+				"a": 0,
 			}},
 			}},
 		},
 		},
 
 
@@ -163,7 +162,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(-2),
+				"a": -2,
 			}},
 			}},
 		},
 		},
 
 
@@ -273,9 +272,9 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(1),
-				"b": float64(-1),
-				"c": float64(0),
+				"a": 1,
+				"b": -1,
+				"c": 0,
 			}},
 			}},
 		},
 		},
 
 
@@ -341,7 +340,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(1),
+				"a": 1,
 			}},
 			}},
 		},
 		},
 
 
@@ -352,7 +351,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(5),
+				"a": 5,
 			}},
 			}},
 		},
 		},
 
 
@@ -418,7 +417,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(0),
+				"a": int32(0),
 			}},
 			}},
 		},
 		},
 
 
@@ -429,7 +428,7 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 				Message: nil,
 				Message: nil,
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(97),
+				"a": int32(97),
 			}},
 			}},
 		},
 		},
 
 
@@ -482,20 +481,8 @@ func TestMathAndConversionFunc_Apply1(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields}
 		pp := &ProjectOp{Fields: stmt.Fields}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("The returned result is not type of []byte\n")
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }

+ 17 - 57
internal/topo/operator/misc_func_test.go

@@ -15,12 +15,12 @@
 package operator
 package operator
 
 
 import (
 import (
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/testx"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
+	"github.com/lf-edge/ekuiper/pkg/cast"
 	"reflect"
 	"reflect"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
@@ -170,7 +170,7 @@ func TestMiscFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"r": float64(0),
+				"r": 0,
 			}},
 			}},
 		},
 		},
 
 
@@ -184,7 +184,7 @@ func TestMiscFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"r": float64(5),
+				"r": 5,
 			}},
 			}},
 		},
 		},
 
 
@@ -239,7 +239,7 @@ func TestMiscFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": "2021-05-03T00:45:30Z",
+				"a": cast.TimeFromUnixMilli(1.62000273e+12),
 			}},
 			}},
 		},
 		},
 	}
 	}
@@ -255,18 +255,8 @@ func TestMiscFunc_Apply1(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields}
 		pp := &ProjectOp{Fields: stmt.Fields}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("The returned result is not type of []byte\n")
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }
@@ -307,20 +297,8 @@ func TestMqttFunc_Apply2(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields}
 		pp := &ProjectOp{Fields: stmt.Fields}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("The returned result is not type of []byte\n")
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }
@@ -423,20 +401,8 @@ func TestMetaFunc_Apply1(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields}
 		pp := &ProjectOp{Fields: stmt.Fields}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("The returned result is not type of []byte\n")
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }
@@ -797,7 +763,7 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"powerOnTs": float64(1000),
+				"powerOnTs": 1000,
 			}},
 			}},
 		}, {
 		}, {
 			sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
 			sql: `SELECT json_path_query(equipment, "$.arm_right") AS a FROM test`,
@@ -820,7 +786,7 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 					},
 					},
 				},
 				},
 			},
 			},
-			err: "run Select error: call func json_path_query error: json_path_query function error: the first argument must be a map but got nil",
+			err: "run Select error: call func json_path_query error: json_path_query function error: invalid data nil for jsonpath",
 		},
 		},
 	}
 	}
 
 
@@ -836,25 +802,19 @@ func TestJsonPathFunc_Apply1(t *testing.T) {
 		fv, afv := xsql.NewFunctionValuersForOp(ctx)
 		fv, afv := xsql.NewFunctionValuersForOp(ctx)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		switch rt := result.(type) {
 		switch rt := result.(type) {
-		case []byte:
+		case []map[string]interface{}:
 			if tt.err == "" {
 			if tt.err == "" {
-				var mapRes []map[string]interface{}
-				err := json.Unmarshal(rt, &mapRes)
-				if err != nil {
-					t.Errorf("Failed to parse the input into map.\n")
-					continue
-				}
-				if !reflect.DeepEqual(tt.result, mapRes) {
-					t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
+				if !reflect.DeepEqual(tt.result, result) {
+					t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 				}
 				}
 			} else {
 			} else {
 				t.Errorf("%d: invalid result:\n  exp error %s\n  got=%s\n\n", i, tt.err, result)
 				t.Errorf("%d: invalid result:\n  exp error %s\n  got=%s\n\n", i, tt.err, result)
 			}
 			}
 		case error:
 		case error:
 			if tt.err == "" {
 			if tt.err == "" {
-				t.Errorf("%d: got error:\n  exp=%s\n  got=%s\n\n", i, tt.result, err)
+				t.Errorf("%d: got error:\n  exp=%s\n  got=%s\n\n", i, tt.result, rt)
 			} else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
 			} else if !reflect.DeepEqual(tt.err, testx.Errstring(rt)) {
-				t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, err)
+				t.Errorf("%d: error mismatch:\n  exp=%s\n  got=%s\n\n", i, tt.err, rt)
 			}
 			}
 		default:
 		default:
 			t.Errorf("%d: Invalid returned result found %v", i, result)
 			t.Errorf("%d: Invalid returned result found %v", i, result)

+ 1 - 6
internal/topo/operator/project_operator.go

@@ -15,7 +15,6 @@
 package operator
 package operator
 
 
 import (
 import (
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/internal/xsql"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
@@ -93,11 +92,7 @@ func (pp *ProjectOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Fun
 		return fmt.Errorf("run Select error: invalid input %[1]T(%[1]v)", input)
 		return fmt.Errorf("run Select error: invalid input %[1]T(%[1]v)", input)
 	}
 	}
 
 
-	if ret, err := json.Marshal(results); err == nil {
-		return ret
-	} else {
-		return fmt.Errorf("run Select error: %v", err)
-	}
+	return results
 }
 }
 
 
 func (pp *ProjectOp) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {
 func (pp *ProjectOp) getVE(tuple xsql.DataValuer, agg xsql.AggregateData, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) *xsql.ValuerEval {

+ 112 - 164
internal/topo/operator/project_test.go

@@ -15,7 +15,6 @@
 package operator
 package operator
 
 
 import (
 import (
-	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
@@ -47,8 +46,8 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
 				"a": "val_a",
 				"a": "val_a",
-				"__meta": map[string]interface{}{
-					"id":    float64(45),
+				"__meta": xsql.Metadata{
+					"id":    45,
 					"other": "mock",
 					"other": "mock",
 				},
 				},
 			}},
 			}},
@@ -73,7 +72,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"ts": "2019-09-19T00:56:13.431Z",
+				"ts": cast.TimeFromUnixMilli(1568854573431),
 			}},
 			}},
 		},
 		},
 		//Schemaless may return a message without selecting column
 		//Schemaless may return a message without selecting column
@@ -130,7 +129,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				Message: xsql.Message{},
 				Message: xsql.Message{},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"": 5.0,
+				"": 5,
 			}},
 			}},
 		},
 		},
 		//8
 		//8
@@ -160,7 +159,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				"a":    "val_a",
 				"a":    "val_a",
 				"b":    "value",
 				"b":    "value",
 				"Pi":   3.14,
 				"Pi":   3.14,
-				"Zero": 0.0,
+				"Zero": 0,
 			}},
 			}},
 		},
 		},
 		//10
 		//10
@@ -257,9 +256,9 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"ab": []interface{}{
-					map[string]interface{}{"b": "hello3"},
-					map[string]interface{}{"b": "hello4"},
+				"ab": []map[string]interface{}{
+					{"b": "hello3"},
+					{"b": "hello4"},
 				},
 				},
 			}},
 			}},
 		},
 		},
@@ -279,10 +278,10 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"ab": []interface{}{
-					map[string]interface{}{"b": "hello3"},
-					map[string]interface{}{"b": "hello4"},
-					map[string]interface{}{"b": "hello5"},
+				"ab": []map[string]interface{}{
+					{"b": "hello3"},
+					{"b": "hello4"},
+					{"b": "hello5"},
 				},
 				},
 			}},
 			}},
 		},
 		},
@@ -349,7 +348,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"ab": []interface{}{
+				"ab": []float64{
 					3.14, 3.141, 3.1415, 3.14159,
 					3.14, 3.141, 3.1415, 3.14159,
 				},
 				},
 			}},
 			}},
@@ -366,7 +365,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"ab": []interface{}{
+				"ab": []float64{
 					3.14,
 					3.14,
 				},
 				},
 			}},
 			}},
@@ -505,7 +504,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				Message: xsql.Message{},
 				Message: xsql.Message{},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"f1": float64(12),
+				"f1": int64(12),
 			}},
 			}},
 		},
 		},
 		//32
 		//32
@@ -559,20 +558,8 @@ func TestProjectPlan_Apply1(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
 		pp := &ProjectOp{Fields: stmt.Fields, SendMeta: true}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("%d. The returned result %#v is not type of []byte\n", result, i)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }
@@ -592,7 +579,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"abc": float64(6), //json marshall problem
+				"abc": int64(6),
 			}},
 			}},
 		},
 		},
 		//1
 		//1
@@ -606,7 +593,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"abc": float64(34),
+				"abc": int64(34),
 			}},
 			}},
 		},
 		},
 		//2
 		//2
@@ -632,11 +619,11 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 			}},
 			}},
 		},
 		},
 		//3
 		//3
@@ -662,9 +649,9 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 			}, {}, {
 			}, {}, {
-				"id1": float64(3),
+				"id1": 3,
 			}},
 			}},
 		},
 		},
 		//4
 		//4
@@ -690,13 +677,13 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 				"f1":  "v1",
 				"f1":  "v1",
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 				"f1":  "v2",
 				"f1":  "v2",
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 				"f1":  "v1",
 				"f1":  "v1",
 			}},
 			}},
 		},
 		},
@@ -723,13 +710,13 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 				"f1":  "v1",
 				"f1":  "v1",
 			}, {
 			}, {
-				"id2": float64(2),
+				"id2": 2,
 				"f2":  "v2",
 				"f2":  "v2",
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 				"f1":  "v1",
 				"f1":  "v1",
 			}},
 			}},
 		},
 		},
@@ -756,13 +743,13 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 				"f1":  "v1",
 				"f1":  "v1",
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 				"f1":  "v2",
 				"f1":  "v2",
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 				"f1":  "v1",
 				"f1":  "v1",
 			}},
 			}},
 		},
 		},
@@ -791,11 +778,11 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 			}},
 			}},
 		},
 		},
 		//8
 		//8
@@ -823,9 +810,9 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 			}, {}},
 			}, {}},
 		},
 		},
 		//9
 		//9
@@ -845,7 +832,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"abc": float64(6),
+				"abc": int64(6),
 			}},
 			}},
 		},
 		},
 		//10
 		//10
@@ -891,9 +878,9 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 			}},
 			}},
 		},
 		},
 		//12
 		//12
@@ -922,7 +909,7 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 			}, {}},
 			}, {}},
 		},
 		},
 		//13
 		//13
@@ -960,9 +947,9 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id2": float64(2),
+				"id2": 2,
 			}, {
 			}, {
-				"id2": float64(4),
+				"id2": 4,
 			}, {}},
 			}, {}},
 		},
 		},
 		//14
 		//14
@@ -990,15 +977,15 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 				"f1":  "v1",
 				"f1":  "v1",
 				"f2":  "w2",
 				"f2":  "w2",
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 				"f1":  "v2",
 				"f1":  "v2",
 				"f2":  "w3",
 				"f2":  "w3",
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 				"f1":  "v1",
 				"f1":  "v1",
 			}},
 			}},
 		},
 		},
@@ -1027,15 +1014,15 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id": float64(1),
+				"id": 1,
 				"f1": "v1",
 				"f1": "v1",
 				"f2": "w2",
 				"f2": "w2",
 			}, {
 			}, {
-				"id": float64(2),
+				"id": 2,
 				"f1": "v2",
 				"f1": "v2",
 				"f2": "w3",
 				"f2": "w3",
 			}, {
 			}, {
-				"id": float64(3),
+				"id": 3,
 				"f1": "v1",
 				"f1": "v1",
 			}},
 			}},
 		},
 		},
@@ -1065,10 +1052,10 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id1": float64(1),
+				"id1": 1,
 				"f1":  "v1",
 				"f1":  "v1",
 			}, {
 			}, {
-				"id1": float64(2),
+				"id1": 2,
 				"f1":  "v2",
 				"f1":  "v2",
 			}},
 			}},
 		},
 		},
@@ -1107,15 +1094,15 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id2": float64(2),
-				"id1": float64(1),
+				"id2": 2,
+				"id1": 1,
 				"f1":  "v1",
 				"f1":  "v1",
 			}, {
 			}, {
-				"id2": float64(4),
-				"id1": float64(2),
+				"id2": 4,
+				"id1": 2,
 				"f1":  "v2",
 				"f1":  "v2",
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 				"f1":  "v1",
 				"f1":  "v1",
 			}},
 			}},
 		},
 		},
@@ -1154,15 +1141,15 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"id2": float64(2),
-				"id1": float64(1),
+				"id2": 2,
+				"id1": 1,
 				"f1":  "v1",
 				"f1":  "v1",
 			}, {
 			}, {
-				"id2": float64(4),
-				"id1": float64(2),
+				"id2": 4,
+				"id1": 2,
 				"f1":  "v2",
 				"f1":  "v2",
 			}, {
 			}, {
-				"id1": float64(3),
+				"id1": 3,
 				"f1":  "v1",
 				"f1":  "v1",
 			}},
 			}},
 		},
 		},
@@ -1177,21 +1164,8 @@ func TestProjectPlan_MultiInput(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields}
 		pp := &ProjectOp{Fields: stmt.Fields}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("The returned result is not type of []byte\n")
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }
@@ -1351,7 +1325,7 @@ func TestProjectPlan_Funcs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"r": float64(1),
+				"r": 1,
 			}},
 			}},
 		},
 		},
 		//6
 		//6
@@ -1400,21 +1374,8 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("%d. The returned result is not type of []byte\n", i)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }
@@ -1471,15 +1432,15 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"c":  float64(2),
+				"c":  2,
 				"r":  float64(122),
 				"r":  float64(122),
-				"ws": float64(1541152486013),
-				"we": float64(1541152487013),
+				"ws": int64(1541152486013),
+				"we": int64(1541152487013),
 			}, {
 			}, {
-				"c":  float64(2),
+				"c":  2,
 				"r":  float64(89),
 				"r":  float64(89),
-				"ws": float64(1541152486013),
-				"we": float64(1541152487013),
+				"ws": int64(1541152486013),
+				"we": int64(1541152487013),
 			}},
 			}},
 		},
 		},
 		//1
 		//1
@@ -1520,13 +1481,13 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"c":   float64(1),
+				"c":   1,
 				"a":   122.33,
 				"a":   122.33,
 				"s":   122.33,
 				"s":   122.33,
 				"min": 122.33,
 				"min": 122.33,
 				"max": 122.33,
 				"max": 122.33,
 			}, {
 			}, {
-				"c":   float64(2),
+				"c":   2,
 				"s":   103.63,
 				"s":   103.63,
 				"a":   51.815,
 				"a":   51.815,
 				"min": 14.6,
 				"min": 14.6,
@@ -1669,8 +1630,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 
 
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
 				"min":          68.55,
 				"min":          68.55,
-				"window_start": float64(1541152486013),
-				"window_end":   float64(1541152487013),
+				"window_start": int64(1541152486013),
+				"window_end":   int64(1541152487013),
 			}},
 			}},
 		},
 		},
 		//5
 		//5
@@ -1700,8 +1661,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			},
 			},
 
 
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"all": float64(3),
-				"c":   float64(2),
+				"all": 3,
+				"c":   2,
 				"a":   123.03,
 				"a":   123.03,
 				"s":   246.06,
 				"s":   246.06,
 				"min": 68.55,
 				"min": 68.55,
@@ -1734,9 +1695,9 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"sum":        float64(123203),
-				"ws":         float64(1541152486013),
-				"window_end": float64(1541152487013),
+				"sum":        123203,
+				"ws":         int64(1541152486013),
+				"window_end": int64(1541152487013),
 			}},
 			}},
 		},
 		},
 		//7
 		//7
@@ -1761,7 +1722,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"s": float64(123203),
+				"s": 123203,
 			}},
 			}},
 		},
 		},
 		//8
 		//8
@@ -1786,7 +1747,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"sum": float64(123203),
+				"sum": 123203,
 			}},
 			}},
 		},
 		},
 		//9
 		//9
@@ -1811,12 +1772,12 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"all": float64(3),
-				"c":   float64(2),
-				"a":   float64(40),
-				"s":   float64(80),
-				"min": float64(27),
-				"max": float64(53),
+				"all": 3,
+				"c":   2,
+				"a":   40,
+				"s":   80,
+				"min": 27,
+				"max": 53,
 			}},
 			}},
 		},
 		},
 		//10
 		//10
@@ -1857,10 +1818,10 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"count": float64(2),
+				"count": 2,
 				"meta":  "devicea",
 				"meta":  "devicea",
 			}, {
 			}, {
-				"count": float64(2),
+				"count": 2,
 				"meta":  "devicec",
 				"meta":  "devicec",
 			}},
 			}},
 		},
 		},
@@ -1902,10 +1863,10 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"c": float64(2),
+				"c": 2,
 				"d": "devicea",
 				"d": "devicea",
 			}, {
 			}, {
-				"c": float64(2),
+				"c": 2,
 				"d": "devicec",
 				"d": "devicec",
 			}},
 			}},
 		},
 		},
@@ -1956,16 +1917,16 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
 				"a":     122.33,
 				"a":     122.33,
-				"c":     float64(2),
+				"c":     2,
 				"color": "w2",
 				"color": "w2",
-				"id":    float64(1),
-				"r":     float64(122),
+				"id":    1,
+				"r":     122,
 			}, {
 			}, {
 				"a":     89.03,
 				"a":     89.03,
-				"c":     float64(2),
+				"c":     2,
 				"color": "w1",
 				"color": "w1",
-				"id":    float64(2),
-				"r":     float64(89),
+				"id":    2,
+				"r":     89,
 			}},
 			}},
 		},
 		},
 		//13
 		//13
@@ -2036,8 +1997,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"c1": map[string]interface{}{
-					"a": float64(27),
+				"c1": xsql.Message{
+					"a": 27,
 				},
 				},
 			}},
 			}},
 		},
 		},
@@ -2063,7 +2024,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"c1": float64(27),
+				"c1": 27,
 			}},
 			}},
 		},
 		},
 		//16
 		//16
@@ -2131,12 +2092,12 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			result: []map[string]interface{}{
 			result: []map[string]interface{}{
 				{
 				{
 					"r1": []interface{}{
 					"r1": []interface{}{
-						map[string]interface{}{"a": 122.33, "c": float64(2), "color": "w2", "id": float64(1), "r": float64(122)},
-						map[string]interface{}{"a": 177.51, "color": "w2", "id": float64(5)}},
+						xsql.Message{"a": 122.33, "c": 2, "color": "w2", "id": 1, "r": 122},
+						xsql.Message{"a": 177.51, "color": "w2", "id": 5}},
 				}, {
 				}, {
 					"r1": []interface{}{
 					"r1": []interface{}{
-						map[string]interface{}{"a": 89.03, "c": float64(2), "color": "w1", "id": float64(2), "r": float64(89)},
-						map[string]interface{}{"a": 14.6, "color": "w1", "id": float64(4)}},
+						xsql.Message{"a": 89.03, "c": 2, "color": "w1", "id": 2, "r": 89},
+						xsql.Message{"a": 14.6, "color": "w1", "id": 4}},
 				},
 				},
 			},
 			},
 		},
 		},
@@ -2162,7 +2123,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"c1": float64(123123),
+				"c1": 123123,
 			}},
 			}},
 		},
 		},
 		//19
 		//19
@@ -2232,8 +2193,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
 				"var2": "moduleB topic",
 				"var2": "moduleB topic",
-				"max2": float64(1),
-				"max3": float64(100),
+				"max2": 1,
+				"max3": 100,
 			}},
 			}},
 		},
 		},
 	}
 	}
@@ -2249,21 +2210,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true}
 		pp := &ProjectOp{Fields: stmt.Fields, IsAggregate: true}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }

+ 9 - 22
internal/topo/operator/str_func_test.go

@@ -15,7 +15,6 @@
 package operator
 package operator
 
 
 import (
 import (
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
@@ -157,7 +156,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(2),
+				"a": 2,
 			}},
 			}},
 		},
 		},
 		{
 		{
@@ -171,7 +170,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(-1),
+				"a": -1,
 			}},
 			}},
 		},
 		},
 		{
 		{
@@ -185,7 +184,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(2),
+				"a": 2,
 			}},
 			}},
 		},
 		},
 		{
 		{
@@ -199,7 +198,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(3),
+				"a": 3,
 			}},
 			}},
 		},
 		},
 		{
 		{
@@ -213,7 +212,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(0),
+				"a": 0,
 			}},
 			}},
 		},
 		},
 		{
 		{
@@ -281,7 +280,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(6),
+				"a": 6,
 			}},
 			}},
 		},
 		},
 		{
 		{
@@ -295,7 +294,7 @@ func TestStrFunc_Apply1(t *testing.T) {
 				},
 				},
 			},
 			},
 			result: []map[string]interface{}{{
 			result: []map[string]interface{}{{
-				"a": float64(2),
+				"a": 2,
 			}},
 			}},
 		},
 		},
 		{
 		{
@@ -662,20 +661,8 @@ func TestStrFunc_Apply1(t *testing.T) {
 		pp := &ProjectOp{Fields: stmt.Fields}
 		pp := &ProjectOp{Fields: stmt.Fields}
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		fv, afv := xsql.NewFunctionValuersForOp(nil)
 		result := pp.Apply(ctx, tt.data, fv, afv)
 		result := pp.Apply(ctx, tt.data, fv, afv)
-		var mapRes []map[string]interface{}
-		if v, ok := result.([]byte); ok {
-			err := json.Unmarshal(v, &mapRes)
-			if err != nil {
-				t.Errorf("Failed to parse the input into map.\n")
-				continue
-			}
-			//fmt.Printf("%t\n", mapRes["kuiper_field_0"])
-
-			if !reflect.DeepEqual(tt.result, mapRes) {
-				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
-			}
-		} else {
-			t.Errorf("%d. The returned result is not type of []byte\n", i)
+		if !reflect.DeepEqual(tt.result, result) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, result)
 		}
 		}
 	}
 	}
 }
 }

+ 2 - 2
internal/topo/sink/edgex_sink.go

@@ -438,7 +438,7 @@ func (ems *EdgexMsgBusSink) getMeta(result []map[string]interface{}) *meta {
 
 
 func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	if payload, ok := item.([]byte); ok {
+	if payload, _, err := ctx.TransformOutput(); err == nil {
 		logger.Debugf("EdgeX message bus sink: %s\n", payload)
 		logger.Debugf("EdgeX message bus sink: %s\n", payload)
 		evt, err := ems.produceEvents(ctx, payload)
 		evt, err := ems.produceEvents(ctx, payload)
 		if err != nil {
 		if err != nil {
@@ -475,7 +475,7 @@ func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) err
 		}
 		}
 		logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)
 		logger.Debugf("Published %+v to EdgeX message bus topic %s", evt, topic)
 	} else {
 	} else {
-		return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
+		return fmt.Errorf("Unkown type of data %v, the message cannot be published.\n", err)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 7 - 3
internal/topo/sink/mqtt_sink.go

@@ -241,11 +241,15 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 	return nil
 	return nil
 }
 }
 
 
-func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
+func (ms *MQTTSink) Collect(ctx api.StreamContext, _ interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
+	jsonBytes, _, err := ctx.TransformOutput()
+	if err != nil {
+		return err
+	}
 	c := ms.conn
 	c := ms.conn
-	logger.Debugf("%s publish %s", ctx.GetOpId(), item)
-	if token := c.Publish(ms.tpc, ms.qos, ms.retained, item); token.Wait() && token.Error() != nil {
+	logger.Debugf("%s publish %s", ctx.GetOpId(), jsonBytes)
+	if token := c.Publish(ms.tpc, ms.qos, ms.retained, jsonBytes); token.Wait() && token.Error() != nil {
 		return fmt.Errorf("publish error: %s", token.Error())
 		return fmt.Errorf("publish error: %s", token.Error())
 	}
 	}
 	return nil
 	return nil

+ 10 - 5
internal/topo/sink/rest_sink.go

@@ -176,12 +176,17 @@ func (me MultiErrors) Error() string {
 
 
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	v, ok := item.([]byte)
-	if !ok {
-		logger.Warnf("rest sink receive non []byte data: %v", item)
-	}
 	logger.Debugf("rest sink receive %s", item)
 	logger.Debugf("rest sink receive %s", item)
-	resp, err := ms.Send(v, logger)
+	output, transed, err := ctx.TransformOutput()
+	if err != nil {
+		logger.Warnf("rest sink decode data error: %v", err)
+		return nil
+	}
+	var d = item
+	if transed {
+		d = output
+	}
+	resp, err := ms.Send(d, logger)
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("rest sink fails to send out the data: %s", err)
 		return fmt.Errorf("rest sink fails to send out the data: %s", err)
 	} else {
 	} else {

+ 19 - 14
internal/topo/sink/rest_sink_test.go

@@ -15,10 +15,10 @@
 package sink
 package sink
 
 
 import (
 import (
-	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
 	"github.com/lf-edge/ekuiper/internal/topo/context"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
 	"net/http/httptest"
 	"net/http/httptest"
@@ -185,6 +185,7 @@ func TestRestSink_Apply(t *testing.T) {
 		contextLogger.Debugf(string(body))
 		contextLogger.Debugf(string(body))
 		fmt.Fprintf(w, string(body))
 		fmt.Fprintf(w, string(body))
 	}))
 	}))
+	tf, _ := transform.GenTransform("")
 	defer ts.Close()
 	defer ts.Close()
 	for i, tt := range tests {
 	for i, tt := range tests {
 		requests = nil
 		requests = nil
@@ -198,20 +199,18 @@ func TestRestSink_Apply(t *testing.T) {
 		s.Open(ctx)
 		s.Open(ctx)
 		if ss.(bool) {
 		if ss.(bool) {
 			for _, d := range tt.data {
 			for _, d := range tt.data {
-				input, err := json.Marshal(d)
-				if err != nil {
-					t.Errorf("Failed to parse the input into []byte]")
-					continue
-				}
-				s.Collect(ctx, input)
+				vCtx := context.WithValue(ctx, context.TransKey, &context.TransConfig{
+					Data:  d,
+					TFunc: tf,
+				})
+				s.Collect(vCtx, d)
 			}
 			}
 		} else {
 		} else {
-			input, err := json.Marshal(tt.data)
-			if err != nil {
-				t.Errorf("Failed to parse the input into []byte]")
-				continue
-			}
-			s.Collect(ctx, input)
+			vCtx := context.WithValue(ctx, context.TransKey, &context.TransConfig{
+				Data:  tt.data,
+				TFunc: tf,
+			})
+			s.Collect(vCtx, tt.data)
 		}
 		}
 
 
 		s.Close(ctx)
 		s.Close(ctx)
@@ -361,7 +360,13 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 		s.Configure(tt.config)
 		s.Configure(tt.config)
 		s.Open(ctx)
 		s.Open(ctx)
 		for _, d := range tt.data {
 		for _, d := range tt.data {
-			s.Collect(ctx, d)
+			vCtx := context.WithValue(ctx, context.TransKey, &context.TransConfig{
+				Data: d,
+				TFunc: func(_ interface{}) ([]byte, bool, error) {
+					return d, true, nil
+				},
+			})
+			s.Collect(vCtx, d)
 		}
 		}
 		s.Close(ctx)
 		s.Close(ctx)
 		if !reflect.DeepEqual(tt.result, requests) {
 		if !reflect.DeepEqual(tt.result, requests) {

+ 1 - 1
internal/topo/topotest/mock_topo.go

@@ -97,7 +97,7 @@ func CommonResultFunc(result [][]byte) interface{} {
 		var mapRes []map[string]interface{}
 		var mapRes []map[string]interface{}
 		err := json.Unmarshal(v, &mapRes)
 		err := json.Unmarshal(v, &mapRes)
 		if err != nil {
 		if err != nil {
-			panic("Failed to parse the input into map")
+			panic(fmt.Sprintf("Failed to parse the input %v into map", string(v)))
 		}
 		}
 		maps = append(maps, mapRes)
 		maps = append(maps, mapRes)
 	}
 	}

+ 2 - 2
internal/topo/topotest/mocknode/mock_sink.go

@@ -36,11 +36,11 @@ func (m *MockSink) Open(ctx api.StreamContext) error {
 
 
 func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	if v, ok := item.([]byte); ok {
+	if v, _, err := ctx.TransformOutput(); err == nil {
 		logger.Debugf("mock sink receive %s", item)
 		logger.Debugf("mock sink receive %s", item)
 		m.results = append(m.results, v)
 		m.results = append(m.results, v)
 	} else {
 	} else {
-		logger.Info("mock sink receive non byte data")
+		logger.Info("mock sink tranform data error: %v", err)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 49 - 0
internal/topo/transform/template.go

@@ -0,0 +1,49 @@
+// Copyright 2021 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package transform
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	ct "github.com/lf-edge/ekuiper/internal/template"
+	"text/template"
+)
+
+type TransFunc func(interface{}) ([]byte, bool, error)
+
+func GenTransform(dt string) (TransFunc, error) {
+	var tp *template.Template = nil
+	if dt != "" {
+		temp, err := template.New("sink").Funcs(ct.FuncMap).Parse(dt)
+		if err != nil {
+			return nil, err
+		}
+		tp = temp
+	}
+	return func(d interface{}) ([]byte, bool, error) {
+		if tp != nil {
+			var output bytes.Buffer
+			err := tp.Execute(&output, d)
+			if err != nil {
+				return nil, false, fmt.Errorf("fail to encode data %v with dataTemplate", d)
+			}
+			return output.Bytes(), true, nil
+		} else {
+			j, err := json.Marshal(d)
+			return j, false, err
+		}
+	}, nil
+}

+ 2 - 1
pkg/api/stream.go

@@ -154,7 +154,8 @@ type StreamContext interface {
 	ReleaseConnection(connectSelector string)
 	ReleaseConnection(connectSelector string)
 	// Properties processing, prop is a json path
 	// Properties processing, prop is a json path
 	ParseDynamicProp(prop string, data interface{}) (interface{}, error)
 	ParseDynamicProp(prop string, data interface{}) (interface{}, error)
-	//TransformOutput(data []map[string]interface{}) interface{}
+	// Transform output according to the properties like syntax
+	TransformOutput() ([]byte, bool, error)
 }
 }
 
 
 type Operator interface {
 type Operator interface {

+ 1 - 6
tools/plugin_server/plugin_test_server.go

@@ -182,12 +182,7 @@ func startSymbolHandler(w http.ResponseWriter, r *http.Request) {
 			}()
 			}()
 			for {
 			for {
 				for _, m := range mockSinkData {
 				for _, m := range mockSinkData {
-					b, err := json.Marshal(m)
-					if err != nil {
-						fmt.Printf("cannot marshall data: %v\n", err)
-						continue
-					}
-					err = sink.Collect(newctx, b)
+					err = sink.Collect(newctx, m)
 					if err != nil {
 					if err != nil {
 						fmt.Printf("cannot collect data: %v\n", err)
 						fmt.Printf("cannot collect data: %v\n", err)
 						continue
 						continue