Procházet zdrojové kódy

feat: add sink common property: fields (#1834)

* add field property

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix trans

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix selectmap

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix ut

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix TransFunc

Signed-off-by: Rui-Gan <1171530954@qq.com>

* fix selectmap

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina před 1 rokem
rodič
revize
462acdb54e
38 změnil soubory, kde provedl 467 přidání a 194 odebrání
  1. 18 17
      docs/en_US/guide/sinks/overview.md
  2. 1 2
      docs/en_US/guide/sinks/plugin/influx.md
  3. 1 2
      docs/en_US/guide/sinks/plugin/influx2.md
  4. 2 1
      docs/zh_CN/guide/sinks/overview.md
  5. 10 11
      docs/zh_CN/guide/sinks/plugin/influx.md
  6. 1 2
      docs/zh_CN/guide/sinks/plugin/influx2.md
  7. 1 1
      docs/zh_CN/guide/sinks/plugin/tdengine.md
  8. 17 13
      extensions/sinks/influx/influx.go
  9. 0 14
      extensions/sinks/influx/influx.json
  10. 17 13
      extensions/sinks/influx2/influx2.go
  11. 0 14
      extensions/sinks/influx2/influx2.json
  12. 3 3
      extensions/sinks/kafka/kafka.go
  13. 1 1
      extensions/sinks/sql/sql.go
  14. 0 14
      extensions/sinks/sql/sql.json
  15. 2 2
      extensions/sinks/tdengine/tdengine.go
  16. 0 15
      extensions/sinks/tdengine/tdengine.json
  17. 2 2
      extensions/sinks/zmq/zmq.go
  18. 9 1
      internal/io/edgex/edgex_sink.go
  19. 1 1
      internal/io/edgex/edgex_sink_test.go
  20. 18 13
      internal/io/file/file_sink.go
  21. 5 4
      internal/io/file/file_sink_test.go
  22. 1 1
      internal/io/file/file_stream_test.go
  23. 1 1
      internal/io/http/rest_sink.go
  24. 4 4
      internal/io/http/rest_sink_test.go
  25. 15 5
      internal/io/memory/sink.go
  26. 1 1
      internal/io/mqtt/mqtt_sink.go
  27. 1 1
      internal/io/neuron/sink.go
  28. 9 1
      internal/io/redis/sink.go
  29. 2 2
      internal/io/sink/log_sink.go
  30. 1 1
      internal/plugin/portable/runtime/sink.go
  31. 3 3
      internal/topo/context/default_test.go
  32. 3 3
      internal/topo/context/transform.go
  33. 12 11
      internal/topo/node/sink_node.go
  34. 64 0
      internal/topo/node/sink_node_test.go
  35. 1 1
      internal/topo/topotest/mocknode/mock_sink.go
  36. 101 11
      internal/topo/transform/template.go
  37. 134 0
      internal/topo/transform/template_test.go
  38. 5 2
      pkg/api/stream.go

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 18 - 17
docs/en_US/guide/sinks/overview.md


+ 1 - 2
docs/en_US/guide/sinks/plugin/influx.md

@@ -31,7 +31,6 @@ Restart the eKuiper server to activate the plugin.
 | databasename  | true     | The database of the InfluxDB                      |
 | databasename  | true     | The database of the InfluxDB                      |
 | tagkey        | true     | The tag key of the InfluxDB                       |
 | tagkey        | true     | The tag key of the InfluxDB                       |
 | tagvalue      | true     | The tag value of the InfluxDB                     |
 | tagvalue      | true     | The tag value of the InfluxDB                     |
-| fields        | true     | The column of the InfluxDB,split with ","         |
 
 
 Other common sink properties are supported. Please refer to the [sink common properties](../overview.md#common-properties) for more information.
 Other common sink properties are supported. Please refer to the [sink common properties](../overview.md#common-properties) for more information.
 
 
@@ -55,7 +54,7 @@ Below is a sample for selecting temperature great than 50 degree, and some profi
        "databasename": "databasename",
        "databasename": "databasename",
        "tagkey": "tagkey",
        "tagkey": "tagkey",
        "tagvalue": "tagvalue",
        "tagvalue": "tagvalue",
-       "fields": "humidity,temperature,pressure"
+       "fields": ["humidity", "temperature", "pressure"]
       }
       }
     }
     }
   ]
   ]

+ 1 - 2
docs/en_US/guide/sinks/plugin/influx2.md

@@ -62,7 +62,6 @@ Restart the eKuiper server to activate the plugin.
 | token         | false    | The token of access InfluxDB                      |
 | token         | false    | The token of access InfluxDB                      |
 | tagKey        | true     | The tag key of the InfluxDB                       |
 | tagKey        | true     | The tag key of the InfluxDB                       |
 | tagValue      | true     | The tag value of the InfluxDB                     |
 | tagValue      | true     | The tag value of the InfluxDB                     |
-| fields        | true     | The column of the InfluxDB,split with ","         |
 
 
 Other common sink properties are supported. Please refer to the [sink common properties](../overview.md#common-properties) for more information.
 Other common sink properties are supported. Please refer to the [sink common properties](../overview.md#common-properties) for more information.
 
 
@@ -86,7 +85,7 @@ Below is a sample for selecting temperature great than 50 degree, and some profi
         "bucket": "bucketName",
         "bucket": "bucketName",
         "tagKey": "tagKey",
         "tagKey": "tagKey",
         "tagValue": "tagValue",
         "tagValue": "tagValue",
-        "fields": "humidity,temperature,pressure"
+        "fields": ["humidity", "temperature", "pressure"]
       }
       }
     }
     }
   ]
   ]

+ 2 - 1
docs/zh_CN/guide/sinks/overview.md

@@ -69,13 +69,14 @@
 |----------------------|------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 |----------------------|------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | concurrency          | int: 1                             | 设置运行的线程数。该参数值大于1时,消息发出的顺序可能无法保证。                                                                                                                                                                                                                 |
 | concurrency          | int: 1                             | 设置运行的线程数。该参数值大于1时,消息发出的顺序可能无法保证。                                                                                                                                                                                                                 |
 | bufferLength         | int: 1024                          | 设置可缓存消息数目。若缓存消息数超过此限制,sink将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。                                                                                                                                                                                       |
 | bufferLength         | int: 1024                          | 设置可缓存消息数目。若缓存消息数超过此限制,sink将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。                                                                                                                                                                                       |
-| runAsync(deprecated)             | bool:false                         | 设置是否异步运行输出操作以提升性能。请注意,异步运行的情况下,输出结果顺序不能保证。                                                                                                                                                                                                       |
+| runAsync(deprecated) | bool:false                         | 设置是否异步运行输出操作以提升性能。请注意,异步运行的情况下,输出结果顺序不能保证。                                                                                                                                                                                                       |
 | omitIfEmpty          | bool: false                        | 如果配置项设置为 true,则当 SELECT 结果为空时,该结果将不提供给目标运算符。                                                                                                                                                                                                     |
 | omitIfEmpty          | bool: false                        | 如果配置项设置为 true,则当 SELECT 结果为空时,该结果将不提供给目标运算符。                                                                                                                                                                                                     |
 | sendSingle           | bool: false                        | 输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为false,则输出消息将为`{"result":"${the string of received message}"}`。 例如,`{"result":"[{\"count\":30},"\"count\":20}]"}`。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 `{"count":30}`,然后发送`{"count":20}`到 RESTful 端点。默认为 false。 |
 | sendSingle           | bool: false                        | 输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为false,则输出消息将为`{"result":"${the string of received message}"}`。 例如,`{"result":"[{\"count\":30},"\"count\":20}]"}`。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 `{"count":30}`,然后发送`{"count":20}`到 RESTful 端点。默认为 false。 |
 | dataTemplate         | string: ""                         | [golang 模板](https://golang.org/pkg/html/template)格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是映射数组。 如果未指定数据模板,则将数据作为原始输入。                                                                                                                                  |
 | dataTemplate         | string: ""                         | [golang 模板](https://golang.org/pkg/html/template)格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是映射数组。 如果未指定数据模板,则将数据作为原始输入。                                                                                                                                  |
 | format               | string: "json"                     | 编码格式,支持 "json" 和 "protobuf"。若使用 "protobuf", 需通过 "schemaId" 参数设置模式,并确保模式已注册。                                                                                                                                                                      |
 | format               | string: "json"                     | 编码格式,支持 "json" 和 "protobuf"。若使用 "protobuf", 需通过 "schemaId" 参数设置模式,并确保模式已注册。                                                                                                                                                                      |
 | schemaId             | string: ""                         | 编码使用的模式。                                                                                                                                                                                                                                         |
 | schemaId             | string: ""                         | 编码使用的模式。                                                                                                                                                                                                                                         |
 | delimiter            | string: ","                        | 仅在使用 `delimited` 格式时生效,用于指定分隔符,默认为逗号。                                                                                                                                                                                                            |
 | delimiter            | string: ","                        | 仅在使用 `delimited` 格式时生效,用于指定分隔符,默认为逗号。                                                                                                                                                                                                            |
+| fields               | []string: nil                      | 用于选择输出消息的字段。例如,sql查询的结果是`{"temperature": 31.2, humidity": 45}`, fields为`["humidity"]`,那么最终输出为`{"humidity": 45}`。建议不要同时配置`dataTemplate`和`fields`。如果同时配置,先根据`dataTemplate`得到输出数据,再通过`fields`得到最终结果。                                                |
 | enableCache          | bool: 默认值为`etc/kuiper.yaml` 中的全局配置 | 是否启用sink cache。缓存存储配置遵循 `etc/kuiper.yaml` 中定义的元数据存储的配置。                                                                                                                                                                                          |
 | enableCache          | bool: 默认值为`etc/kuiper.yaml` 中的全局配置 | 是否启用sink cache。缓存存储配置遵循 `etc/kuiper.yaml` 中定义的元数据存储的配置。                                                                                                                                                                                          |
 | memoryCacheThreshold | int: 默认值为全局配置                      | 要缓存在内存中的消息数量。出于性能方面的考虑,最早的缓存信息被存储在内存中,以便在故障恢复时立即重新发送。这里的数据会因为断电等故障而丢失。                                                                                                                                                                           |
 | memoryCacheThreshold | int: 默认值为全局配置                      | 要缓存在内存中的消息数量。出于性能方面的考虑,最早的缓存信息被存储在内存中,以便在故障恢复时立即重新发送。这里的数据会因为断电等故障而丢失。                                                                                                                                                                           |
 | maxDiskCache         | int: 默认值为全局配置                      | 缓存在磁盘中的信息的最大数量。磁盘缓存是先进先出的。如果磁盘缓存满了,最早的一页信息将被加载到内存缓存中,取代旧的内存缓存。                                                                                                                                                                                   |
 | maxDiskCache         | int: 默认值为全局配置                      | 缓存在磁盘中的信息的最大数量。磁盘缓存是先进先出的。如果磁盘缓存满了,最早的一页信息将被加载到内存缓存中,取代旧的内存缓存。                                                                                                                                                                                   |

+ 10 - 11
docs/zh_CN/guide/sinks/plugin/influx.md

@@ -21,16 +21,15 @@
 
 
 ## 属性
 ## 属性
 
 
-| 属性名称         | 会否可选 | 说明                 |
-|--------------|------|--------------------|
-| addr         | 是    | InfluxDB的地址        |
-| measurement  | 是    | InfluxDb的测量(如表名)   |
-| username     | 否    | InfluxDB登陆用户名      |
-| password     | 否    | InfluxDB登陆密码       |
-| databasename | 是    | InfluxDB的数据库       |
-| tagkey       | 是    | InfluxDB的标签键       |
-| tagvalue     | 是    | InfluxDB的标签值       |
-| fields       | 是    | InfluxDB的列名,用","隔开 |
+| 属性名称         | 会否可选 | 说明               |
+|--------------|------|------------------|
+| addr         | 是    | InfluxDB的地址      |
+| measurement  | 是    | InfluxDb的测量(如表名) |
+| username     | 否    | InfluxDB登陆用户名    |
+| password     | 否    | InfluxDB登陆密码     |
+| databasename | 是    | InfluxDB的数据库     |
+| tagkey       | 是    | InfluxDB的标签键     |
+| tagvalue     | 是    | InfluxDB的标签值     |
 
 
 其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)。
 其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)。
 
 
@@ -54,7 +53,7 @@
        "databasename": "databasename",
        "databasename": "databasename",
        "tagkey": "tagkey",
        "tagkey": "tagkey",
        "tagvalue": "tagvalue",
        "tagvalue": "tagvalue",
-       "fields": "humidity,temperature,pressure"
+       "fields": ["humidity", "temperature", "pressure"]
       }
       }
     }
     }
   ]
   ]

+ 1 - 2
docs/zh_CN/guide/sinks/plugin/influx2.md

@@ -60,7 +60,6 @@ $(PLUGINS_CUSTOM):
 | token       | 否    | InfluxDB访问Token    |
 | token       | 否    | InfluxDB访问Token    |
 | tagKey      | 是    | InfluxDB的标签键       |
 | tagKey      | 是    | InfluxDB的标签键       |
 | tagValue    | 是    | InfluxDB的标签值       |
 | tagValue    | 是    | InfluxDB的标签值       |
-| fields      | 是    | InfluxDB的列名,用","隔开 |
 
 
 其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)。
 其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)。
 
 
@@ -84,7 +83,7 @@ $(PLUGINS_CUSTOM):
        "bucket": "bucketName",
        "bucket": "bucketName",
        "tagKey": "tagKey",
        "tagKey": "tagKey",
        "tagValue": "tagValue",
        "tagValue": "tagValue",
-       "fields": "humidity,temperature,pressure"
+       "fields": ["humidity", "temperature", "pressure"]
       }
       }
     }
     }
   ]
   ]

+ 1 - 1
docs/zh_CN/guide/sinks/plugin/tdengine.md

@@ -21,7 +21,7 @@ go build -trimpath --buildmode=plugin -o plugins/sinks/Tdengine@v1.0.0.so extens
 | password       | string   | 否    | 密码,默认值为 `taosdata` 。                                                                                  |
 | password       | string   | 否    | 密码,默认值为 `taosdata` 。                                                                                  |
 | database       | string   | 是    | 数据库名                                                                                                  |
 | database       | string   | 是    | 数据库名                                                                                                  |
 | table          | string   | 是    | 表名,可设置[动态属性](../overview.md#动态属性)。                                                                    |
 | table          | string   | 是    | 表名,可设置[动态属性](../overview.md#动态属性)。                                                                    |
-| fields         | []string | 否    | 将要插入的表字段集合。sink 收到的数据和数据库表中均有该字段。若设置,则所有结果字段写入数据库。                                                   |
+| fields         | []string | 否    | 将要插入的表字段集合。sink 收到的数据和数据库表中均有该字段。若设置,则所有结果字段写入数据库。                                                   |
 | provideTs      | Bool     | 否    | 用户是否提供时间戳字段,默认为否。                                                                                     |
 | provideTs      | Bool     | 否    | 用户是否提供时间戳字段,默认为否。                                                                                     |
 | tsFieldName    | String   | 是    | 时间戳字段名称                                                                                               |
 | tsFieldName    | String   | 是    | 时间戳字段名称                                                                                               |
 | sTable         | String   | 否    | 使用的超级表,可设置[动态属性](../overview.md#动态属性)。                                                                |
 | sTable         | String   | 否    | 使用的超级表,可设置[动态属性](../overview.md#动态属性)。                                                                |

+ 17 - 13
extensions/sinks/influx/influx.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -34,8 +34,8 @@ import (
 	"fmt"
 	"fmt"
 	_ "github.com/influxdata/influxdb1-client/v2"
 	_ "github.com/influxdata/influxdb1-client/v2"
 	client "github.com/influxdata/influxdb1-client/v2"
 	client "github.com/influxdata/influxdb1-client/v2"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"strings"
 	"time"
 	"time"
 )
 )
 
 
@@ -47,7 +47,7 @@ type influxSink struct {
 	databaseName string
 	databaseName string
 	tagKey       string
 	tagKey       string
 	tagValue     string
 	tagValue     string
-	fields       string
+	fields       []string
 	cli          client.Client
 	cli          client.Client
 	fieldMap     map[string]interface{}
 	fieldMap     map[string]interface{}
 	hasTransform bool
 	hasTransform bool
@@ -90,8 +90,12 @@ func (m *influxSink) Configure(props map[string]interface{}) error {
 		}
 		}
 	}
 	}
 	if i, ok := props["fields"]; ok {
 	if i, ok := props["fields"]; ok {
-		if i, ok := i.(string); ok {
-			m.fields = i
+		if i, ok := i.([]interface{}); ok {
+			for _, v := range i {
+				if v, ok := v.(string); ok {
+					m.fields = append(m.fields, v)
+				}
+			}
 		}
 		}
 	}
 	}
 	if i, ok := props["dataTemplate"]; ok {
 	if i, ok := props["dataTemplate"]; ok {
@@ -120,7 +124,7 @@ func (m *influxSink) Open(ctx api.StreamContext) (err error) {
 func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	if m.hasTransform {
 	if m.hasTransform {
-		jsonBytes, _, err := ctx.TransformOutput(data)
+		jsonBytes, _, err := ctx.TransformOutput(data, true)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -130,6 +134,12 @@ func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		}
 		data = m
 		data = m
+	} else if len(m.fields) > 0 {
+		d, err := transform.SelectMap(data, m.fields)
+		if err != nil {
+			return fmt.Errorf("fail to select fields %v for data %v", m.fields, data)
+		}
+		data = d
 	}
 	}
 	var output map[string]interface{}
 	var output map[string]interface{}
 	switch v := data.(type) {
 	switch v := data.(type) {
@@ -153,13 +163,7 @@ func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 		return err
 		return err
 	}
 	}
 	tags := map[string]string{m.tagKey: m.tagValue}
 	tags := map[string]string{m.tagKey: m.tagValue}
-	fields := strings.Split(m.fields, ",")
-	m.fieldMap = make(map[string]interface{}, 100)
-	for _, field := range fields {
-		if output[field] != nil {
-			m.fieldMap[field] = output[field]
-		}
-	}
+	m.fieldMap = output
 
 
 	pt, err := client.NewPoint(m.measurement, tags, m.fieldMap, time.Now())
 	pt, err := client.NewPoint(m.measurement, tags, m.fieldMap, time.Now())
 	if err != nil {
 	if err != nil {

+ 0 - 14
extensions/sinks/influx/influx.json

@@ -117,20 +117,6 @@
 			"en_US": "Tag value",
 			"en_US": "Tag value",
 			"zh_CN": "标签值"
 			"zh_CN": "标签值"
 		}
 		}
-	}, {
-		"name": "fields",
-		"default": "humidity,temperature,pressure",
-		"optional": true,
-		"control": "text",
-		"type": "string",
-		"hint": {
-			"en_US": "The column of the InfluxDB",
-			"zh_CN": "InfluxDB 的列名"
-		},
-		"label": {
-			"en_US": "Column",
-			"zh_CN": "列名"
-		}
 	}],
 	}],
 	"node": {
 	"node": {
 		"category": "sink",
 		"category": "sink",

+ 17 - 13
extensions/sinks/influx2/influx2.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -31,8 +31,8 @@ import (
 	"fmt"
 	"fmt"
 	_ "github.com/influxdata/influxdb-client-go/v2"
 	_ "github.com/influxdata/influxdb-client-go/v2"
 	client "github.com/influxdata/influxdb-client-go/v2"
 	client "github.com/influxdata/influxdb-client-go/v2"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
-	"strings"
 	"time"
 	"time"
 )
 )
 
 
@@ -44,7 +44,7 @@ type influxSink2 struct {
 	bucket       string
 	bucket       string
 	tagKey       string
 	tagKey       string
 	tagValue     string
 	tagValue     string
-	fields       string
+	fields       []string
 	cli          client.Client
 	cli          client.Client
 	fieldMap     map[string]interface{}
 	fieldMap     map[string]interface{}
 	hasTransform bool
 	hasTransform bool
@@ -72,8 +72,12 @@ func (m *influxSink2) Configure(props map[string]interface{}) error {
 		}
 		}
 	}
 	}
 	if i, ok := props["fields"]; ok {
 	if i, ok := props["fields"]; ok {
-		if i, ok := i.(string); ok {
-			m.fields = i
+		if i, ok := i.([]interface{}); ok {
+			for _, v := range i {
+				if v, ok := v.(string); ok {
+					m.fields = append(m.fields, v)
+				}
+			}
 		}
 		}
 	}
 	}
 	if i, ok := props["dataTemplate"]; ok {
 	if i, ok := props["dataTemplate"]; ok {
@@ -112,7 +116,7 @@ func (m *influxSink2) Open(ctx api.StreamContext) (err error) {
 func (m *influxSink2) Collect(ctx api.StreamContext, data interface{}) error {
 func (m *influxSink2) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	if m.hasTransform {
 	if m.hasTransform {
-		jsonBytes, _, err := ctx.TransformOutput(data)
+		jsonBytes, _, err := ctx.TransformOutput(data, true)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -122,6 +126,12 @@ func (m *influxSink2) Collect(ctx api.StreamContext, data interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		}
 		data = m
 		data = m
+	} else if len(m.fields) > 0 {
+		d, err := transform.SelectMap(data, m.fields)
+		if err != nil {
+			return fmt.Errorf("fail to select fields %v for data %v", m.fields, data)
+		}
+		data = d
 	}
 	}
 	var output map[string]interface{}
 	var output map[string]interface{}
 	switch v := data.(type) {
 	switch v := data.(type) {
@@ -139,13 +149,7 @@ func (m *influxSink2) Collect(ctx api.StreamContext, data interface{}) error {
 	writeAPI := m.cli.WriteAPIBlocking(m.org, m.bucket)
 	writeAPI := m.cli.WriteAPIBlocking(m.org, m.bucket)
 
 
 	tags := map[string]string{m.tagKey: m.tagValue}
 	tags := map[string]string{m.tagKey: m.tagValue}
-	fields := strings.Split(m.fields, ",")
-	m.fieldMap = make(map[string]interface{}, 100)
-	for _, field := range fields {
-		if output[field] != nil {
-			m.fieldMap[field] = output[field]
-		}
-	}
+	m.fieldMap = output
 
 
 	pt := client.NewPoint(m.measurement, tags, m.fieldMap, time.Now())
 	pt := client.NewPoint(m.measurement, tags, m.fieldMap, time.Now())
 
 

+ 0 - 14
extensions/sinks/influx2/influx2.json

@@ -117,20 +117,6 @@
 			"en_US": "Tag value",
 			"en_US": "Tag value",
 			"zh_CN": "标签值"
 			"zh_CN": "标签值"
 		}
 		}
-	}, {
-		"name": "fields",
-		"default": "humidity,temperature,pressure",
-		"optional": true,
-		"control": "text",
-		"type": "string",
-		"hint": {
-			"en_US": "The column of the InfluxDB",
-			"zh_CN": "InfluxDB 的列名"
-		},
-		"label": {
-			"en_US": "Column",
-			"zh_CN": "列名"
-		}
 	}],
 	}],
 	"node": {
 	"node": {
 		"category": "sink",
 		"category": "sink",

+ 3 - 3
extensions/sinks/kafka/kafka.go

@@ -1,4 +1,4 @@
-// Copyright 2023 carlclone@gmail.com
+// Copyright 2023-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -117,14 +117,14 @@ func (m *kafkaSink) Collect(ctx api.StreamContext, item interface{}) error {
 	switch d := item.(type) {
 	switch d := item.(type) {
 	case []map[string]interface{}:
 	case []map[string]interface{}:
 		for _, el := range d {
 		for _, el := range d {
-			decodedBytes, _, err := ctx.TransformOutput(el)
+			decodedBytes, _, err := ctx.TransformOutput(el, true)
 			if err != nil {
 			if err != nil {
 				return fmt.Errorf("kafka sink transform data error: %v", err)
 				return fmt.Errorf("kafka sink transform data error: %v", err)
 			}
 			}
 			messages = append(messages, kafkago.Message{Value: decodedBytes})
 			messages = append(messages, kafkago.Message{Value: decodedBytes})
 		}
 		}
 	case map[string]interface{}:
 	case map[string]interface{}:
-		decodedBytes, _, err := ctx.TransformOutput(d)
+		decodedBytes, _, err := ctx.TransformOutput(d, true)
 		if err != nil {
 		if err != nil {
 			return fmt.Errorf("kafka sink transform data error: %v", err)
 			return fmt.Errorf("kafka sink transform data error: %v", err)
 		}
 		}

+ 1 - 1
extensions/sinks/sql/sql.go

@@ -141,7 +141,7 @@ func (m *sqlSink) writeToDB(ctx api.StreamContext, sqlStr *string) error {
 func (m *sqlSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *sqlSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("sql sink receive %s", item)
 	ctx.GetLogger().Debugf("sql sink receive %s", item)
 	if m.conf.DataTemplate != "" {
 	if m.conf.DataTemplate != "" {
-		jsonBytes, _, err := ctx.TransformOutput(item)
+		jsonBytes, _, err := ctx.TransformOutput(item, false)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 0 - 14
extensions/sinks/sql/sql.json

@@ -60,20 +60,6 @@
 			"en_US": "Table Data from Stream Field",
 			"en_US": "Table Data from Stream Field",
 			"zh_CN": "表数据来源"
 			"zh_CN": "表数据来源"
 		}
 		}
-	}, {
-		"name": "fields",
-		"default": [],
-		"optional": true,
-		"control": "list",
-		"type": "list_string",
-		"hint": {
-			"en_US": "Field of table",
-			"zh_CN": "表字段"
-		},
-		"label": {
-			"en_US": "Table field",
-			"zh_CN": "表字段"
-		}
 	},
 	},
 	{
 	{
 		"name": "rowkindField",
 		"name": "rowkindField",

+ 2 - 2
extensions/sinks/tdengine/tdengine.go

@@ -1,4 +1,4 @@
-// Copyright 2021-2022 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -200,7 +200,7 @@ func (m *taosSink) Open(ctx api.StreamContext) (err error) {
 func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("tdengine sink receive %s", item)
 	ctx.GetLogger().Debugf("tdengine sink receive %s", item)
 	if m.conf.DataTemplate != "" {
 	if m.conf.DataTemplate != "" {
-		jsonBytes, _, err := ctx.TransformOutput(item)
+		jsonBytes, _, err := ctx.TransformOutput(item, false)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 0 - 15
extensions/sinks/tdengine/tdengine.json

@@ -110,21 +110,6 @@
       }
       }
     },
     },
     {
     {
-      "name": "fields",
-      "default": [],
-      "optional": true,
-      "control": "list",
-      "type": "list_string",
-      "hint": {
-        "en_US": "Field of table",
-        "zh_CN": "表字段"
-      },
-      "label": {
-        "en_US": "Table field",
-        "zh_CN": "表字段"
-      }
-    },
-    {
       "name": "provideTs",
       "name": "provideTs",
       "default": false,
       "default": false,
       "optional": false,
       "optional": false,

+ 2 - 2
extensions/sinks/zmq/zmq.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -69,7 +69,7 @@ func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	var v []byte
 	var v []byte
 	var err error
 	var err error
-	v, _, err = ctx.TransformOutput(item)
+	v, _, err = ctx.TransformOutput(item, true)
 	if err != nil {
 	if err != nil {
 		logger.Debug("zmq sink receive non byte data %v", item)
 		logger.Debug("zmq sink receive non byte data %v", item)
 		return err
 		return err

+ 9 - 1
internal/io/edgex/edgex_sink.go

@@ -26,6 +26,7 @@ import (
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
 	"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/conf"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
 	"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
 	"github.com/lf-edge/ekuiper/pkg/errorx"
@@ -42,6 +43,7 @@ type SinkConf struct {
 	SourceName   string      `json:"sourceName"`
 	SourceName   string      `json:"sourceName"`
 	Metadata     string      `json:"metadata"`
 	Metadata     string      `json:"metadata"`
 	DataTemplate string      `json:"dataTemplate"`
 	DataTemplate string      `json:"dataTemplate"`
+	Fields       []string    `json:"fields"`
 }
 }
 
 
 type EdgexMsgBusSink struct {
 type EdgexMsgBusSink struct {
@@ -113,7 +115,7 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 
 
 func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, item interface{}) (*dtos.Event, error) {
 func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, item interface{}) (*dtos.Event, error) {
 	if ems.c.DataTemplate != "" {
 	if ems.c.DataTemplate != "" {
-		jsonBytes, _, err := ctx.TransformOutput(item)
+		jsonBytes, _, err := ctx.TransformOutput(item, true)
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
@@ -123,6 +125,12 @@ func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, item interface{
 			return nil, fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 			return nil, fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		}
 		item = tm
 		item = tm
+	} else if len(ems.c.Fields) > 0 {
+		tm, err := transform.SelectMap(item, ems.c.Fields)
+		if err != nil {
+			return nil, fmt.Errorf("fail to select fields %v for data %v", ems.c.Fields, item)
+		}
+		item = tm
 	}
 	}
 	var m []map[string]interface{}
 	var m []map[string]interface{}
 	switch payload := item.(type) {
 	switch payload := item.(type) {

+ 1 - 1
internal/io/edgex/edgex_sink_test.go

@@ -598,7 +598,7 @@ func TestEdgeXTemplate_Apply(t1 *testing.T) {
 		var payload []map[string]interface{}
 		var payload []map[string]interface{}
 		json.Unmarshal([]byte(t.input), &payload)
 		json.Unmarshal([]byte(t.input), &payload)
 		dt := t.conf["dataTemplate"]
 		dt := t.conf["dataTemplate"]
-		tf, _ := transform.GenTransform(cast.ToStringAlways(dt), "json", "", "")
+		tf, _ := transform.GenTransform(cast.ToStringAlways(dt), "json", "", "", []string{})
 		vCtx := context.WithValue(ctx, context.TransKey, tf)
 		vCtx := context.WithValue(ctx, context.TransKey, tf)
 		result, err := ems.produceEvents(vCtx, payload[0])
 		result, err := ems.produceEvents(vCtx, payload[0])
 		if !reflect.DeepEqual(t.error, testx.Errstring(err)) {
 		if !reflect.DeepEqual(t.error, testx.Errstring(err)) {

+ 18 - 13
internal/io/file/file_sink.go

@@ -40,6 +40,7 @@ type sinkConf struct {
 	Delimiter          string   `json:"delimiter"`
 	Delimiter          string   `json:"delimiter"`
 	Format             string   `json:"format"` // only use for validation; transformation is done in sink_node
 	Format             string   `json:"format"` // only use for validation; transformation is done in sink_node
 	Compression        string   `json:"compression"`
 	Compression        string   `json:"compression"`
+	Fields             []string `json:"fields"` // only use for extracting header for csv; transformation is done in sink_node
 }
 }
 
 
 type fileSink struct {
 type fileSink struct {
@@ -158,7 +159,7 @@ func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	if v, _, err := ctx.TransformOutput(item); err == nil {
+	if v, _, err := ctx.TransformOutput(item, true); err == nil {
 		ctx.GetLogger().Debugf("file sink transform data %s", v)
 		ctx.GetLogger().Debugf("file sink transform data %s", v)
 		m.mux.Lock()
 		m.mux.Lock()
 		defer m.mux.Unlock()
 		defer m.mux.Unlock()
@@ -216,22 +217,26 @@ func (m *fileSink) GetFws(ctx api.StreamContext, fn string, item interface{}) (*
 		var headers string
 		var headers string
 		if m.c.FileType == CSV_TYPE && m.c.HasHeader {
 		if m.c.FileType == CSV_TYPE && m.c.HasHeader {
 			var header []string
 			var header []string
-			switch v := item.(type) {
-			case map[string]interface{}:
-				header = make([]string, len(v))
-				i := 0
-				for k := range item.(map[string]interface{}) {
-					header[i] = k
-					i++
-				}
-			case []map[string]interface{}:
-				if len(v) > 0 {
-					header = make([]string, len(v[0]))
+			if len(m.c.Fields) > 0 {
+				header = m.c.Fields
+			} else {
+				switch v := item.(type) {
+				case map[string]interface{}:
+					header = make([]string, len(v))
 					i := 0
 					i := 0
-					for k := range v[0] {
+					for k := range item.(map[string]interface{}) {
 						header[i] = k
 						header[i] = k
 						i++
 						i++
 					}
 					}
+				case []map[string]interface{}:
+					if len(v) > 0 {
+						header = make([]string, len(v[0]))
+						i := 0
+						for k := range v[0] {
+							header[i] = k
+							i++
+						}
+					}
 				}
 				}
 			}
 			}
 			sort.Strings(header)
 			sort.Strings(header)

+ 5 - 4
internal/io/file/file_sink_test.go

@@ -283,7 +283,7 @@ func TestFileSink_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "test2")
 	contextLogger := conf.Log.WithField("rule", "test2")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
 
-	tf, _ := transform.GenTransform("", "json", "", "")
+	tf, _ := transform.GenTransform("", "json", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 
 	for _, tt := range tests {
 	for _, tt := range tests {
@@ -307,6 +307,7 @@ func TestFileSink_Collect(t *testing.T) {
 				"format":             f,
 				"format":             f,
 				"rollingNamePattern": "none",
 				"rollingNamePattern": "none",
 				"compression":        tt.compress,
 				"compression":        tt.compress,
+				"fields":             []string{"key"},
 			})
 			})
 			if err != nil {
 			if err != nil {
 				t.Fatal(err)
 				t.Fatal(err)
@@ -442,7 +443,7 @@ func TestFileSinkRolling_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "testRolling")
 	contextLogger := conf.Log.WithField("rule", "testRolling")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
 
-	tf, _ := transform.GenTransform("", "json", "", "")
+	tf, _ := transform.GenTransform("", "json", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 
 	for _, tt := range tests {
 	for _, tt := range tests {
@@ -572,7 +573,7 @@ func TestFileSinkRollingCount_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "testRollingCount")
 	contextLogger := conf.Log.WithField("rule", "testRollingCount")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
 
-	tf, _ := transform.GenTransform("", "delimited", "", ",")
+	tf, _ := transform.GenTransform("", "delimited", "", ",", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 
 	for _, tt := range tests {
 	for _, tt := range tests {
@@ -663,7 +664,7 @@ func TestFileSinkReopen(t *testing.T) {
 	// Create a stream context for testing
 	// Create a stream context for testing
 	contextLogger := conf.Log.WithField("rule", "testRollingCount")
 	contextLogger := conf.Log.WithField("rule", "testRollingCount")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
-	tf, _ := transform.GenTransform("", "json", "", "")
+	tf, _ := transform.GenTransform("", "json", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 
 	sink := &fileSink{}
 	sink := &fileSink{}

+ 1 - 1
internal/io/file/file_stream_test.go

@@ -87,7 +87,7 @@ func TestFileSinkCompress_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "test2")
 	contextLogger := conf.Log.WithField("rule", "test2")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
 
-	tf, _ := transform.GenTransform("", "json", "", "")
+	tf, _ := transform.GenTransform("", "json", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 
 	for _, tt := range tests {
 	for _, tt := range tests {

+ 1 - 1
internal/io/http/rest_sink.go

@@ -57,7 +57,7 @@ func (me MultiErrors) Error() string {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	logger.Debugf("rest sink receive %s", item)
 	logger.Debugf("rest sink receive %s", item)
-	decodedData, _, err := ctx.TransformOutput(item)
+	decodedData, _, err := ctx.TransformOutput(item, true)
 	if err != nil {
 	if err != nil {
 		logger.Warnf("rest sink decode data error: %v", err)
 		logger.Warnf("rest sink decode data error: %v", err)
 		return fmt.Errorf("rest sink decode data error: %v", err)
 		return fmt.Errorf("rest sink decode data error: %v", err)

+ 4 - 4
internal/io/http/rest_sink_test.go

@@ -188,7 +188,7 @@ func TestRestSink_Apply(t *testing.T) {
 		contextLogger.Debugf(string(body))
 		contextLogger.Debugf(string(body))
 		fmt.Fprint(w, string(body))
 		fmt.Fprint(w, string(body))
 	}))
 	}))
-	tf, _ := transform.GenTransform("", "json", "", "")
+	tf, _ := transform.GenTransform("", "json", "", "", []string{})
 	defer ts.Close()
 	defer ts.Close()
 	for i, tt := range tests {
 	for i, tt := range tests {
 		requests = nil
 		requests = nil
@@ -355,7 +355,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 		tt.config["url"] = ts.URL
 		tt.config["url"] = ts.URL
 		s.Configure(tt.config)
 		s.Configure(tt.config)
 		s.Open(ctx)
 		s.Open(ctx)
-		vCtx := context.WithValue(ctx, context.TransKey, transform.TransFunc(func(d interface{}) ([]byte, bool, error) {
+		vCtx := context.WithValue(ctx, context.TransKey, transform.TransFunc(func(d interface{}, s bool) ([]byte, bool, error) {
 			return d.([]byte), true, nil
 			return d.([]byte), true, nil
 		}))
 		}))
 		for _, d := range tt.data {
 		for _, d := range tt.data {
@@ -386,7 +386,7 @@ func TestRestSinkErrorLog(t *testing.T) {
 		s.Configure(config)
 		s.Configure(config)
 		s.Open(context.Background())
 		s.Open(context.Background())
 
 
-		tf, _ := transform.GenTransform("", "json", "", "")
+		tf, _ := transform.GenTransform("", "json", "", "", []string{})
 		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
 		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
 		reqBody := []map[string]interface{}{
 		reqBody := []map[string]interface{}{
 			{"ab": "hello1"},
 			{"ab": "hello1"},
@@ -412,7 +412,7 @@ func TestRestSinkErrorLog(t *testing.T) {
 		}
 		}
 		s.Configure(config)
 		s.Configure(config)
 		s.Open(context.Background())
 		s.Open(context.Background())
-		tf, _ := transform.GenTransform("", "json", "", "")
+		tf, _ := transform.GenTransform("", "json", "", "", []string{})
 		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
 		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
 		err := s.Collect(vCtx, []map[string]interface{}{
 		err := s.Collect(vCtx, []map[string]interface{}{
 			{"ab": "hello1"},
 			{"ab": "hello1"},

+ 15 - 5
internal/io/memory/sink.go

@@ -18,6 +18,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
 	"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -25,10 +26,11 @@ import (
 )
 )
 
 
 type config struct {
 type config struct {
-	Topic        string `json:"topic"`
-	DataTemplate string `json:"dataTemplate"`
-	RowkindField string `json:"rowkindField"`
-	KeyField     string `json:"keyField"`
+	Topic        string   `json:"topic"`
+	DataTemplate string   `json:"dataTemplate"`
+	RowkindField string   `json:"rowkindField"`
+	KeyField     string   `json:"keyField"`
+	Fields       []string `json:"fields"`
 }
 }
 
 
 type sink struct {
 type sink struct {
@@ -36,6 +38,7 @@ type sink struct {
 	hasTransform bool
 	hasTransform bool
 	keyField     string
 	keyField     string
 	rowkindField string
 	rowkindField string
+	fields       []string
 }
 }
 
 
 func (s *sink) Open(ctx api.StreamContext) error {
 func (s *sink) Open(ctx api.StreamContext) error {
@@ -57,6 +60,7 @@ func (s *sink) Configure(props map[string]interface{}) error {
 	if cfg.DataTemplate != "" {
 	if cfg.DataTemplate != "" {
 		s.hasTransform = true
 		s.hasTransform = true
 	}
 	}
+	s.fields = cfg.Fields
 	s.rowkindField = cfg.RowkindField
 	s.rowkindField = cfg.RowkindField
 	s.keyField = cfg.KeyField
 	s.keyField = cfg.KeyField
 	if s.rowkindField != "" && s.keyField == "" {
 	if s.rowkindField != "" && s.keyField == "" {
@@ -72,7 +76,7 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 		return err
 		return err
 	}
 	}
 	if s.hasTransform {
 	if s.hasTransform {
-		jsonBytes, _, err := ctx.TransformOutput(data)
+		jsonBytes, _, err := ctx.TransformOutput(data, true)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -82,6 +86,12 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		}
 		data = m
 		data = m
+	} else if len(s.fields) > 0 {
+		m, err := transform.SelectMap(data, s.fields)
+		if err != nil {
+			return fmt.Errorf("fail to select fields %v for data %v", s.fields, data)
+		}
+		data = m
 	}
 	}
 	switch d := data.(type) {
 	switch d := data.(type) {
 	case []map[string]interface{}:
 	case []map[string]interface{}:

+ 1 - 1
internal/io/mqtt/mqtt_sink.go

@@ -84,7 +84,7 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 
 
 func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
-	jsonBytes, _, err := ctx.TransformOutput(item)
+	jsonBytes, _, err := ctx.TransformOutput(item, true)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 1 - 1
internal/io/neuron/sink.go

@@ -79,7 +79,7 @@ func (s *sink) Open(ctx api.StreamContext) error {
 func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 	ctx.GetLogger().Debugf("receive %+v", data)
 	ctx.GetLogger().Debugf("receive %+v", data)
 	if s.c.Raw {
 	if s.c.Raw {
-		r, _, err := ctx.TransformOutput(data)
+		r, _, err := ctx.TransformOutput(data, true)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 9 - 1
internal/io/redis/sink.go

@@ -20,6 +20,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/ast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
 	"github.com/lf-edge/ekuiper/pkg/cast"
@@ -44,6 +45,7 @@ type config struct {
 	Expiration   time.Duration `json:"expiration,omitempty"`
 	Expiration   time.Duration `json:"expiration,omitempty"`
 	RowkindField string        `json:"rowkindField"`
 	RowkindField string        `json:"rowkindField"`
 	DataTemplate string        `json:"dataTemplate"`
 	DataTemplate string        `json:"dataTemplate"`
+	Fields       []string      `json:"fields"`
 }
 }
 
 
 type RedisSink struct {
 type RedisSink struct {
@@ -88,7 +90,7 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	var val string
 	var val string
 	if r.c.DataTemplate != "" { // The result is a string
 	if r.c.DataTemplate != "" { // The result is a string
-		v, _, err := ctx.TransformOutput(data)
+		v, _, err := ctx.TransformOutput(data, true)
 		if err != nil {
 		if err != nil {
 			logger.Error(err)
 			logger.Error(err)
 			return err
 			return err
@@ -100,6 +102,12 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 		}
 		}
 		data = m
 		data = m
 		val = string(v)
 		val = string(v)
+	} else if len(r.c.Fields) > 0 {
+		m, err := transform.SelectMap(data, r.c.Fields)
+		if err != nil {
+			return fmt.Errorf("fail to select fields %v for data %v", r.c.Fields, data)
+		}
+		data = m
 	}
 	}
 	switch d := data.(type) {
 	switch d := data.(type) {
 	case []map[string]interface{}:
 	case []map[string]interface{}:

+ 2 - 2
internal/io/sink/log_sink.go

@@ -27,7 +27,7 @@ import (
 func NewLogSink() api.Sink {
 func NewLogSink() api.Sink {
 	return collector.Func(func(ctx api.StreamContext, data interface{}) error {
 	return collector.Func(func(ctx api.StreamContext, data interface{}) error {
 		log := ctx.GetLogger()
 		log := ctx.GetLogger()
-		if v, _, err := ctx.TransformOutput(data); err == nil {
+		if v, _, err := ctx.TransformOutput(data, true); err == nil {
 			log.Infof("sink result for rule %s: %s", ctx.GetRuleId(), v)
 			log.Infof("sink result for rule %s: %s", ctx.GetRuleId(), v)
 			return nil
 			return nil
 		} else {
 		} else {
@@ -48,7 +48,7 @@ func NewLogSinkToMemory() api.Sink {
 	QR.Results = make([]string, 0, 10)
 	QR.Results = make([]string, 0, 10)
 	return collector.Func(func(ctx api.StreamContext, data interface{}) error {
 	return collector.Func(func(ctx api.StreamContext, data interface{}) error {
 		var result string
 		var result string
-		if v, _, err := ctx.TransformOutput(data); err == nil {
+		if v, _, err := ctx.TransformOutput(data, true); err == nil {
 			result = string(v)
 			result = string(v)
 		} else {
 		} else {
 			return fmt.Errorf("transform data error: %v", err)
 			return fmt.Errorf("transform data error: %v", err)

+ 1 - 1
internal/plugin/portable/runtime/sink.go

@@ -91,7 +91,7 @@ func (ps *PortableSink) Open(ctx api.StreamContext) error {
 
 
 func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("Receive %+v", item)
 	ctx.GetLogger().Debugf("Receive %+v", item)
-	if val, _, err := ctx.TransformOutput(item); err == nil {
+	if val, _, err := ctx.TransformOutput(item, true); err == nil {
 		ctx.GetLogger().Debugf("Send %s", val)
 		ctx.GetLogger().Debugf("Send %s", val)
 		e := ps.dataCh.Send(val)
 		e := ps.dataCh.Send(val)
 		if e != nil {
 		if e != nil {

+ 3 - 3
internal/topo/context/default_test.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -226,7 +226,7 @@ func TestParseTemplate(t *testing.T) {
 }
 }
 
 
 func TestTransition(t *testing.T) {
 func TestTransition(t *testing.T) {
-	var mockFunc transform.TransFunc = func(d interface{}) ([]byte, bool, error) {
+	var mockFunc transform.TransFunc = func(d interface{}, s bool) ([]byte, bool, error) {
 		return []byte(fmt.Sprintf("%v", d)), true, nil
 		return []byte(fmt.Sprintf("%v", d)), true, nil
 	}
 	}
 	var tests = []struct {
 	var tests = []struct {
@@ -249,7 +249,7 @@ func TestTransition(t *testing.T) {
 	ctx := Background().WithMeta("testTransRule", "op1", &state.MemoryStore{}).(*DefaultContext)
 	ctx := Background().WithMeta("testTransRule", "op1", &state.MemoryStore{}).(*DefaultContext)
 	nc := WithValue(ctx, TransKey, mockFunc)
 	nc := WithValue(ctx, TransKey, mockFunc)
 	for i, tt := range tests {
 	for i, tt := range tests {
-		r, _, _ := nc.TransformOutput(tt.data)
+		r, _, _ := nc.TransformOutput(tt.data, true)
 		if !reflect.DeepEqual(tt.r, r) {
 		if !reflect.DeepEqual(tt.r, r) {
 			t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, string(tt.r), string(r))
 			t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, string(tt.r), string(r))
 		}
 		}

+ 3 - 3
internal/topo/context/transform.go

@@ -1,4 +1,4 @@
-// Copyright 2021 EMQ Technologies Co., Ltd.
+// Copyright 2021-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -22,11 +22,11 @@ import (
 const TransKey = "$$trans"
 const TransKey = "$$trans"
 
 
 // TransformOutput Lazy transform output to bytes
 // TransformOutput Lazy transform output to bytes
-func (c *DefaultContext) TransformOutput(data interface{}) ([]byte, bool, error) {
+func (c *DefaultContext) TransformOutput(data interface{}, selected bool) ([]byte, bool, error) {
 	v := c.Value(TransKey)
 	v := c.Value(TransKey)
 	f, ok := v.(transform.TransFunc)
 	f, ok := v.(transform.TransFunc)
 	if ok {
 	if ok {
-		return f(data)
+		return f(data, selected)
 	}
 	}
 	return nil, false, fmt.Errorf("no transform configured")
 	return nil, false, fmt.Errorf("no transform configured")
 }
 }

+ 12 - 11
internal/topo/node/sink_node.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -35,15 +35,16 @@ import (
 )
 )
 
 
 type SinkConf struct {
 type SinkConf struct {
-	Concurrency  int    `json:"concurrency"`
-	RunAsync     bool   `json:"runAsync"` // deprecated, will remove in the next release
-	Omitempty    bool   `json:"omitIfEmpty"`
-	SendSingle   bool   `json:"sendSingle"`
-	DataTemplate string `json:"dataTemplate"`
-	Format       string `json:"format"`
-	SchemaId     string `json:"schemaId"`
-	Delimiter    string `json:"delimiter"`
-	BufferLength int    `json:"bufferLength"`
+	Concurrency  int      `json:"concurrency"`
+	RunAsync     bool     `json:"runAsync"` // deprecated, will remove in the next release
+	Omitempty    bool     `json:"omitIfEmpty"`
+	SendSingle   bool     `json:"sendSingle"`
+	DataTemplate string   `json:"dataTemplate"`
+	Format       string   `json:"format"`
+	SchemaId     string   `json:"schemaId"`
+	Delimiter    string   `json:"delimiter"`
+	BufferLength int      `json:"bufferLength"`
+	Fields       []string `json:"fields"`
 	conf.SinkConf
 	conf.SinkConf
 }
 }
 
 
@@ -110,7 +111,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 				return err
 				return err
 			}
 			}
 
 
-			tf, err := transform.GenTransform(sconf.DataTemplate, sconf.Format, sconf.SchemaId, sconf.Delimiter)
+			tf, err := transform.GenTransform(sconf.DataTemplate, sconf.Format, sconf.SchemaId, sconf.Delimiter, sconf.Fields)
 			if err != nil {
 			if err != nil {
 				msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", sconf.DataTemplate, err)
 				msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", sconf.DataTemplate, err)
 				logger.Warnf(msg)
 				logger.Warnf(msg)

+ 64 - 0
internal/topo/node/sink_node_test.go

@@ -490,3 +490,67 @@ func Test_itemToMap(t *testing.T) {
 		})
 		})
 	}
 	}
 }
 }
+
+func TestSinkFields_Apply(t *testing.T) {
+	conf.InitConf()
+	transform.RegisterAdditionalFuncs()
+	var tests = []struct {
+		dt        string
+		format    string
+		schemaId  string
+		delimiter string
+		fields    []string
+		data      interface{}
+		result    [][]byte
+	}{
+		{
+			format: "json",
+			fields: []string{"a", "b"},
+			data:   map[string]interface{}{"a": "1", "b": "2", "c": "3"},
+			result: [][]byte{[]byte(`{"a":"1","b":"2"}`)},
+		},
+		{
+			format: "json",
+			fields: []string{"a", "b"},
+			data:   []map[string]interface{}{{"a": "1", "b": "2", "c": "3"}},
+			result: [][]byte{[]byte(`[{"a":"1","b":"2"}]`)},
+		},
+		{
+			format:    "delimited",
+			delimiter: ",",
+			fields:    []string{"a", "b"},
+			data:      map[string]interface{}{"a": "1", "b": "2", "c": "3"},
+			result:    [][]byte{[]byte(`1,2`)},
+		},
+		{
+			format:   "json",
+			schemaId: "",
+			fields:   []string{"ax", "bx"},
+			dt:       `{"ax": {{.a}}, "bx": {{.b}}}`,
+			data:     map[string]interface{}{"a": "1", "b": "2", "c": "3"},
+			result:   [][]byte{[]byte(`{"ax":1,"bx":2}`)},
+		},
+		{
+			format:   "json",
+			schemaId: "",
+			fields:   []string{"a", "b"},
+			dt:       `{"ax": {{.a}}, "bx": {{.b}}}`,
+			data:     map[string]interface{}{"a": "1", "b": "2", "c": "3"},
+			result:   [][]byte{[]byte(`{"a":null,"b":null}`)},
+		},
+	}
+	contextLogger := conf.Log.WithField("rule", "TestSinkFields_Apply")
+	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
+
+	for i, tt := range tests {
+		tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.fields)
+		vCtx := context.WithValue(ctx, context.TransKey, tf)
+		mockSink := mocknode.NewMockSink()
+		mockSink.Collect(vCtx, tt.data)
+		time.Sleep(1 * time.Second)
+		results := mockSink.GetResults()
+		if !reflect.DeepEqual(tt.result, results) {
+			t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results)
+		}
+	}
+}

+ 1 - 1
internal/topo/topotest/mocknode/mock_sink.go

@@ -38,7 +38,7 @@ func (m *MockSink) Open(ctx api.StreamContext) error {
 func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger := ctx.GetLogger()
 	fmt.Println("mock sink receive ", item)
 	fmt.Println("mock sink receive ", item)
-	if v, _, err := ctx.TransformOutput(item); err == nil {
+	if v, _, err := ctx.TransformOutput(item, true); err == nil {
 		logger.Debugf("mock sink receive %s", item)
 		logger.Debugf("mock sink receive %s", item)
 		m.results = append(m.results, v)
 		m.results = append(m.results, v)
 	} else {
 	} else {

+ 101 - 11
internal/topo/transform/template.go

@@ -1,4 +1,4 @@
-// Copyright 2022 EMQ Technologies Co., Ltd.
+// Copyright 2022-2023 EMQ Technologies Co., Ltd.
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -25,9 +25,15 @@ import (
 	"text/template"
 	"text/template"
 )
 )
 
 
-type TransFunc func(interface{}) ([]byte, bool, error)
+// TransFunc is the function to transform data
 
 
-func GenTransform(dt string, format string, schemaId string, delimiter string) (TransFunc, error) {
+// The second parameter indicates whether to select fields based on the fields property.
+// If it is false, then after the dataTemplate, output the result directly.
+// If it is true, then after the dataTemplate, select the fields based on the fields property.
+
+type TransFunc func(interface{}, bool) ([]byte, bool, error)
+
+func GenTransform(dt string, format string, schemaId string, delimiter string, fields []string) (TransFunc, error) {
 	var (
 	var (
 		tp  *template.Template = nil
 		tp  *template.Template = nil
 		c   message.Converter
 		c   message.Converter
@@ -44,6 +50,11 @@ func GenTransform(dt string, format string, schemaId string, delimiter string) (
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
+	case message.FormatJson:
+		c, err = converter.GetOrCreateConverter(&ast.Options{FORMAT: format})
+		if err != nil {
+			return nil, err
+		}
 	}
 	}
 
 
 	if dt != "" {
 	if dt != "" {
@@ -53,10 +64,11 @@ func GenTransform(dt string, format string, schemaId string, delimiter string) (
 		}
 		}
 		tp = temp
 		tp = temp
 	}
 	}
-	return func(d interface{}) ([]byte, bool, error) {
+	return func(d interface{}, s bool) ([]byte, bool, error) {
 		var (
 		var (
 			bs          []byte
 			bs          []byte
 			transformed bool
 			transformed bool
+			selected    bool
 		)
 		)
 		if tp != nil {
 		if tp != nil {
 			var output bytes.Buffer
 			var output bytes.Buffer
@@ -67,15 +79,39 @@ func GenTransform(dt string, format string, schemaId string, delimiter string) (
 			bs = output.Bytes()
 			bs = output.Bytes()
 			transformed = true
 			transformed = true
 		}
 		}
+		// just for sinks like tdengine and sql.
+		if !s {
+			if transformed {
+				return bs, true, nil
+			}
+			outBytes, err := json.Marshal(d)
+			return outBytes, false, err
+		} else {
+			// Consider that if only the dataTemplate is needed, and the data after trans cannot be converted into map[string]interface
+			var m interface{}
+			var err error
+			if transformed {
+				m, err = SelectMap(bs, fields)
+			} else {
+				m, err = SelectMap(d, fields)
+			}
+			if err != nil && err.Error() != "fields cannot be empty" {
+				return nil, false, fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(bs), err)
+			} else if err == nil {
+				d = m
+				selected = true
+			}
+		}
+
 		switch format {
 		switch format {
 		case message.FormatJson:
 		case message.FormatJson:
-			if transformed {
-				return bs, transformed, nil
+			if transformed && !selected {
+				return bs, true, nil
 			}
 			}
-			j, err := json.Marshal(d)
-			return j, false, err
+			outBytes, err := c.Encode(d)
+			return outBytes, transformed || selected, err
 		case message.FormatProtobuf, message.FormatCustom, message.FormatDelimited:
 		case message.FormatProtobuf, message.FormatCustom, message.FormatDelimited:
-			if transformed {
+			if transformed && !selected {
 				m := make(map[string]interface{})
 				m := make(map[string]interface{})
 				err := json.Unmarshal(bs, &m)
 				err := json.Unmarshal(bs, &m)
 				if err != nil {
 				if err != nil {
@@ -83,8 +119,9 @@ func GenTransform(dt string, format string, schemaId string, delimiter string) (
 				}
 				}
 				d = m
 				d = m
 			}
 			}
-			b, err := c.Encode(d)
-			return b, transformed, err
+			// TODO: if headers are defined by user, find a way to keep the order
+			outBytes, err := c.Encode(d)
+			return outBytes, transformed || selected, err
 		default: // should not happen
 		default: // should not happen
 			return nil, false, fmt.Errorf("unsupported format %v", format)
 			return nil, false, fmt.Errorf("unsupported format %v", format)
 		}
 		}
@@ -94,3 +131,56 @@ func GenTransform(dt string, format string, schemaId string, delimiter string) (
 func GenTp(dt string) (*template.Template, error) {
 func GenTp(dt string) (*template.Template, error) {
 	return template.New("sink").Funcs(conf.FuncMap).Parse(dt)
 	return template.New("sink").Funcs(conf.FuncMap).Parse(dt)
 }
 }
+
+// SelectMap select fields from input map or array of map.
+// If you do not need to convert data to []byte, you can use this function directly. Otherwise, use TransFunc.
+func SelectMap(input interface{}, fields []string) (interface{}, error) {
+	if len(fields) == 0 {
+		return input, fmt.Errorf("fields cannot be empty")
+	}
+
+	if _, ok := input.([]byte); ok {
+		var m map[string]interface{}
+		err := json.Unmarshal(input.([]byte), &m)
+		if err != nil {
+			return input, fmt.Errorf("fail to decode data %s for error %v", string(input.([]byte)), err)
+		}
+		input = m
+	}
+
+	outputs := make([]map[string]interface{}, 0)
+	switch input.(type) {
+	case map[string]interface{}:
+		output := make(map[string]interface{})
+		for _, field := range fields {
+			output[field] = input.(map[string]interface{})[field]
+		}
+		return output, nil
+	case []interface{}:
+		for _, v := range input.([]interface{}) {
+			output := make(map[string]interface{})
+			if out, ok := v.(map[string]interface{}); !ok {
+				return input, fmt.Errorf("unsupported type %v", input)
+			} else {
+				for _, field := range fields {
+					output[field] = out[field]
+				}
+				outputs = append(outputs, output)
+			}
+		}
+		return outputs, nil
+	case []map[string]interface{}:
+		for _, v := range input.([]map[string]interface{}) {
+			output := make(map[string]interface{})
+			for _, field := range fields {
+				output[field] = v[field]
+			}
+			outputs = append(outputs, output)
+		}
+		return outputs, nil
+	default:
+		return input, fmt.Errorf("unsupported type %v", input)
+	}
+
+	return input, nil
+}

+ 134 - 0
internal/topo/transform/template_test.go

@@ -0,0 +1,134 @@
+// Copyright 2023 EMQ Technologies Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package transform
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+)
+
+func Test_SelectMap(t *testing.T) {
+	type args struct {
+		input  interface{}
+		fields []string
+	}
+	tests := []struct {
+		name string
+		args args
+		want interface{}
+	}{
+		{
+			name: "test1",
+			args: args{
+				input: map[string]interface{}{
+					"a": 1,
+					"b": 2,
+					"c": 3,
+				},
+				fields: []string{"a", "b"},
+			},
+			want: map[string]interface{}{
+				"a": 1,
+				"b": 2,
+			},
+		},
+		{
+			name: "test2",
+			args: args{
+				input: []map[string]interface{}{
+					{
+						"a": 1,
+						"b": 2,
+						"c": 3,
+					},
+				},
+				fields: []string{"a", "b"},
+			},
+			want: []map[string]interface{}{
+				{
+					"a": 1,
+					"b": 2,
+				},
+			},
+		},
+		{
+			name: "test3",
+			args: args{
+				input: []interface{}{
+					map[string]interface{}{
+						"a": 1,
+						"b": 2,
+						"c": 3,
+					},
+				},
+				fields: []string{"a", "b"},
+			},
+			want: []map[string]interface{}{
+				{
+					"a": 1,
+					"b": 2,
+				},
+			},
+		},
+		{
+			name: "test4",
+			args: args{
+				input: []map[string]interface{}{
+					{
+						"a": 1,
+						"b": 2,
+						"c": 3,
+					},
+				},
+				fields: nil,
+			},
+			want: []map[string]interface{}{
+				{
+					"a": 1,
+					"b": 2,
+					"c": 3,
+				},
+			},
+		},
+		{
+			name: "test5",
+			args: args{
+				input:  []byte(`{"a": 1, "b": 2, "c": 3}`),
+				fields: nil,
+			},
+			want: []byte(`{"a": 1, "b": 2, "c": 3}`),
+		},
+		{
+			name: "test6",
+			args: args{
+				input:  []byte(`{"a": 1, "b": 2, "c": 3}`),
+				fields: []string{"d"},
+			},
+			want: map[string]interface{}{
+				"d": nil,
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got, _ := SelectMap(tt.args.input, tt.args.fields); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("SelectMap() = %v, want %v", got, tt.want)
+				fmt.Println(reflect.TypeOf(got), reflect.TypeOf(tt.want))
+
+			}
+		})
+	}
+}

+ 5 - 2
pkg/api/stream.go

@@ -201,9 +201,12 @@ type StreamContext interface {
 	ParseTemplate(template string, data interface{}) (string, error)
 	ParseTemplate(template string, data interface{}) (string, error)
 	// ParseJsonPath parse the jsonPath string with the given data
 	// ParseJsonPath parse the jsonPath string with the given data
 	ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
 	ParseJsonPath(jsonPath string, data interface{}) (interface{}, error)
-	// TransformOutput Transform output according to the properties including dataTemplate, sendSingle
+	// TransformOutput Transform output according to the properties including dataTemplate, sendSingle, fields
+	// TransformOutput first transform data through the dataTemplate property,and then select data based on the fields property
+	// It is recommended that you do not configure both the dataTemplate property and the fields property.
+	// selected: whether to select data based on the fields property
 	// The second parameter is whether the data is transformed or just return as its json format.
 	// The second parameter is whether the data is transformed or just return as its json format.
-	TransformOutput(data interface{}) ([]byte, bool, error)
+	TransformOutput(data interface{}, selected bool) ([]byte, bool, error)
 	// Decode is set in the source according to the format.
 	// Decode is set in the source according to the format.
 	// It decodes byte array into map or map slice.
 	// It decodes byte array into map or map slice.
 	Decode(data []byte) (map[string]interface{}, error)
 	Decode(data []byte) (map[string]interface{}, error)