Browse Source

feat: add sink common property: dataField (#1896)

* add dataField property

Signed-off-by: Rui-Gan <1171530954@qq.com>

* remove unnecessary comments

Signed-off-by: Rui-Gan <1171530954@qq.com>

* add dataField doc

Signed-off-by: Rui-Gan <1171530954@qq.com>

---------

Signed-off-by: Rui-Gan <1171530954@qq.com>
Regina 1 year ago
parent
commit
b716d04957

File diff suppressed because it is too large
+ 21 - 20
docs/en_US/guide/sinks/overview.md


File diff suppressed because it is too large
+ 21 - 20
docs/zh_CN/guide/sinks/overview.md


+ 9 - 3
extensions/sinks/influx/influx.go

@@ -49,6 +49,7 @@ type influxSink struct {
 	databaseName string
 	tagKey       string
 	tagValue     string
+	dataField    string
 	fields       []string
 	cli          client.Client
 	fieldMap     map[string]interface{}
@@ -91,6 +92,11 @@ func (m *influxSink) Configure(props map[string]interface{}) error {
 			m.tagValue = i
 		}
 	}
+	if i, ok := props["dataField"]; ok {
+		if i, ok := i.(string); ok {
+			m.dataField = i
+		}
+	}
 	if i, ok := props["fields"]; ok {
 		if i, ok := i.([]interface{}); ok {
 			for _, v := range i {
@@ -126,7 +132,7 @@ func (m *influxSink) Open(ctx api.StreamContext) (err error) {
 func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	if m.hasTransform {
-		jsonBytes, _, err := ctx.TransformOutput(data, true)
+		jsonBytes, _, err := ctx.TransformOutput(data)
 		if err != nil {
 			return err
 		}
@@ -136,8 +142,8 @@ func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		data = m
-	} else if len(m.fields) > 0 {
-		d, err := transform.SelectMap(data, m.fields)
+	} else {
+		d, _, err := transform.TransItem(data, m.dataField, m.fields)
 		if err != nil {
 			return fmt.Errorf("fail to select fields %v for data %v", m.fields, data)
 		}

+ 9 - 3
extensions/sinks/influx2/influx2.go

@@ -46,6 +46,7 @@ type influxSink2 struct {
 	bucket       string
 	tagKey       string
 	tagValue     string
+	dataField    string
 	fields       []string
 	cli          client.Client
 	fieldMap     map[string]interface{}
@@ -73,6 +74,11 @@ func (m *influxSink2) Configure(props map[string]interface{}) error {
 			m.tagValue = i
 		}
 	}
+	if i, ok := props["dataField"]; ok {
+		if i, ok := i.(string); ok {
+			m.dataField = i
+		}
+	}
 	if i, ok := props["fields"]; ok {
 		if i, ok := i.([]interface{}); ok {
 			for _, v := range i {
@@ -118,7 +124,7 @@ func (m *influxSink2) Open(ctx api.StreamContext) (err error) {
 func (m *influxSink2) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	if m.hasTransform {
-		jsonBytes, _, err := ctx.TransformOutput(data, true)
+		jsonBytes, _, err := ctx.TransformOutput(data)
 		if err != nil {
 			return err
 		}
@@ -128,8 +134,8 @@ func (m *influxSink2) Collect(ctx api.StreamContext, data interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		data = m
-	} else if len(m.fields) > 0 {
-		d, err := transform.SelectMap(data, m.fields)
+	} else {
+		d, _, err := transform.TransItem(data, m.dataField, m.fields)
 		if err != nil {
 			return fmt.Errorf("fail to select fields %v for data %v", m.fields, data)
 		}

+ 2 - 2
extensions/sinks/kafka/kafka.go

@@ -119,14 +119,14 @@ func (m *kafkaSink) Collect(ctx api.StreamContext, item interface{}) error {
 	switch d := item.(type) {
 	case []map[string]interface{}:
 		for _, el := range d {
-			decodedBytes, _, err := ctx.TransformOutput(el, true)
+			decodedBytes, _, err := ctx.TransformOutput(el)
 			if err != nil {
 				return fmt.Errorf("kafka sink transform data error: %v", err)
 			}
 			messages = append(messages, kafkago.Message{Value: decodedBytes})
 		}
 	case map[string]interface{}:
-		decodedBytes, _, err := ctx.TransformOutput(d, true)
+		decodedBytes, _, err := ctx.TransformOutput(d)
 		if err != nil {
 			return fmt.Errorf("kafka sink transform data error: %v", err)
 		}

+ 12 - 4
extensions/sinks/sql/sql.go

@@ -18,6 +18,7 @@ import (
 	"database/sql"
 	"encoding/json"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"reflect"
 	"strings"
 
@@ -35,6 +36,7 @@ type sqlConfig struct {
 	Fields         []string `json:"fields"`
 	DataTemplate   string   `json:"dataTemplate"`
 	TableDataField string   `json:"tableDataField"`
+	DataField      string   `json:"dataField"`
 	RowkindField   string   `json:"rowkindField"`
 	KeyField       string   `json:"keyField"`
 }
@@ -99,6 +101,9 @@ func (m *sqlSink) Configure(props map[string]interface{}) error {
 	if err != nil {
 		return fmt.Errorf("read properties %v fail with error: %v", props, err)
 	}
+	if cfg.DataField == "" {
+		cfg.DataField = cfg.TableDataField
+	}
 	if cfg.Url == "" {
 		return fmt.Errorf("property Url is required")
 	}
@@ -141,7 +146,7 @@ func (m *sqlSink) writeToDB(ctx api.StreamContext, sqlStr *string) error {
 func (m *sqlSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("sql sink receive %s", item)
 	if m.conf.DataTemplate != "" {
-		jsonBytes, _, err := ctx.TransformOutput(item, false)
+		jsonBytes, _, err := ctx.TransformOutput(item)
 		if err != nil {
 			return err
 		}
@@ -151,6 +156,12 @@ func (m *sqlSink) Collect(ctx api.StreamContext, item interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		item = tm
+	} else {
+		tm, _, err := transform.TransItem(item, m.conf.DataField, m.conf.Fields)
+		if err != nil {
+			return fmt.Errorf("fail to 1 transform data %v for error %v!!!!!", item, err)
+		}
+		item = tm
 	}
 
 	var (
@@ -164,9 +175,6 @@ func (m *sqlSink) Collect(ctx api.StreamContext, item interface{}) error {
 			ctx.GetLogger().Errorf("parse template for table %s error: %v", m.conf.Table, err)
 			return err
 		}
-		if m.conf.TableDataField != "" {
-			item = v[m.conf.TableDataField]
-		}
 	case []map[string]interface{}:
 		if len(v) == 0 {
 			ctx.GetLogger().Warnf("empty data array")

+ 11 - 7
extensions/sinks/tdengine/tdengine.go

@@ -18,6 +18,7 @@ import (
 	"database/sql"
 	"encoding/json"
 	"fmt"
+	"github.com/lf-edge/ekuiper/internal/topo/transform"
 	"reflect"
 	"strings"
 
@@ -45,6 +46,7 @@ type (
 		TagFields      []string `json:"tagFields"`
 		DataTemplate   string   `json:"dataTemplate"`
 		TableDataField string   `json:"tableDataField"`
+		DataField      string   `json:"dataField"`
 	}
 	taosSink struct {
 		conf *taosConfig
@@ -189,6 +191,9 @@ func (m *taosSink) Configure(props map[string]interface{}) error {
 		return fmt.Errorf("property tagFields is required when sTable is set")
 	}
 	m.url = fmt.Sprintf(`%s:%s@tcp(%s:%d)/%s`, cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
+	if cfg.DataField == "" {
+		cfg.DataField = cfg.TableDataField
+	}
 	m.conf = cfg
 	return nil
 }
@@ -202,7 +207,7 @@ func (m *taosSink) Open(ctx api.StreamContext) (err error) {
 func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("tdengine sink receive %s", item)
 	if m.conf.DataTemplate != "" {
-		jsonBytes, _, err := ctx.TransformOutput(item, false)
+		jsonBytes, _, err := ctx.TransformOutput(item)
 		if err != nil {
 			return err
 		}
@@ -212,13 +217,12 @@ func (m *taosSink) Collect(ctx api.StreamContext, item interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		item = tm
-	}
-
-	if m.conf.TableDataField != "" {
-		mapData, ok := item.(map[string]interface{})
-		if ok {
-			item = mapData[m.conf.TableDataField]
+	} else {
+		tm, _, err := transform.TransItem(item, m.conf.DataField, m.conf.Fields)
+		if err != nil {
+			return fmt.Errorf("fail to transform data %v for error %v", item, err)
 		}
+		item = tm
 	}
 
 	switch v := item.(type) {

+ 1 - 1
extensions/sinks/zmq/zmq.go

@@ -71,7 +71,7 @@ func (m *zmqSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	var v []byte
 	var err error
-	v, _, err = ctx.TransformOutput(item, true)
+	v, _, err = ctx.TransformOutput(item)
 	if err != nil {
 		logger.Debug("zmq sink receive non byte data %v", item)
 		return err

+ 4 - 3
internal/io/edgex/edgex_sink.go

@@ -46,6 +46,7 @@ type SinkConf struct {
 	Metadata     string      `json:"metadata"`
 	DataTemplate string      `json:"dataTemplate"`
 	Fields       []string    `json:"fields"`
+	DataField    string      `json:"dataField"`
 }
 
 type EdgexMsgBusSink struct {
@@ -116,7 +117,7 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 
 func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, item interface{}) (*dtos.Event, error) {
 	if ems.c.DataTemplate != "" {
-		jsonBytes, _, err := ctx.TransformOutput(item, true)
+		jsonBytes, _, err := ctx.TransformOutput(item)
 		if err != nil {
 			return nil, err
 		}
@@ -126,8 +127,8 @@ func (ems *EdgexMsgBusSink) produceEvents(ctx api.StreamContext, item interface{
 			return nil, fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		item = tm
-	} else if len(ems.c.Fields) > 0 {
-		tm, err := transform.SelectMap(item, ems.c.Fields)
+	} else {
+		tm, _, err := transform.TransItem(item, ems.c.DataField, ems.c.Fields)
 		if err != nil {
 			return nil, fmt.Errorf("fail to select fields %v for data %v", ems.c.Fields, item)
 		}

+ 1 - 1
internal/io/edgex/edgex_sink_test.go

@@ -602,7 +602,7 @@ func TestEdgeXTemplate_Apply(t1 *testing.T) {
 		var payload []map[string]interface{}
 		json.Unmarshal([]byte(t.input), &payload)
 		dt := t.conf["dataTemplate"]
-		tf, _ := transform.GenTransform(cast.ToStringAlways(dt), "json", "", "", []string{})
+		tf, _ := transform.GenTransform(cast.ToStringAlways(dt), "json", "", "", "", []string{})
 		vCtx := context.WithValue(ctx, context.TransKey, tf)
 		result, err := ems.produceEvents(vCtx, payload[0])
 		if !reflect.DeepEqual(t.error, testx.Errstring(err)) {

+ 1 - 1
internal/io/file/file_sink.go

@@ -160,7 +160,7 @@ func (m *fileSink) Collect(ctx api.StreamContext, item interface{}) error {
 	if err != nil {
 		return err
 	}
-	if v, _, err := ctx.TransformOutput(item, true); err == nil {
+	if v, _, err := ctx.TransformOutput(item); err == nil {
 		ctx.GetLogger().Debugf("file sink transform data %s", v)
 		m.mux.Lock()
 		defer m.mux.Unlock()

+ 5 - 5
internal/io/file/file_sink_test.go

@@ -308,7 +308,7 @@ func TestFileSink_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "test2")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
-	tf, _ := transform.GenTransform("", "json", "", "", []string{})
+	tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 	for _, tt := range tests {
@@ -415,7 +415,7 @@ func TestFileSinkFields_Collect(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			tf, _ := transform.GenTransform("", tt.format, "", tt.delimiter, tt.fields)
+			tf, _ := transform.GenTransform("", tt.format, "", tt.delimiter, "", tt.fields)
 			vCtx := context.WithValue(ctx, context.TransKey, tf)
 			// Create a temporary file for testing
 			tmpfile, err := os.CreateTemp("", tt.fname)
@@ -550,7 +550,7 @@ func TestFileSinkRolling_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "testRolling")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
-	tf, _ := transform.GenTransform("", "json", "", "", []string{})
+	tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 	for _, tt := range tests {
@@ -680,7 +680,7 @@ func TestFileSinkRollingCount_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "testRollingCount")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
-	tf, _ := transform.GenTransform("", "delimited", "", ",", []string{})
+	tf, _ := transform.GenTransform("", "delimited", "", ",", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 	for _, tt := range tests {
@@ -771,7 +771,7 @@ func TestFileSinkReopen(t *testing.T) {
 	// Create a stream context for testing
 	contextLogger := conf.Log.WithField("rule", "testRollingCount")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
-	tf, _ := transform.GenTransform("", "json", "", "", []string{})
+	tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 	sink := &fileSink{}

+ 1 - 1
internal/io/file/file_stream_test.go

@@ -89,7 +89,7 @@ func TestFileSinkCompress_Collect(t *testing.T) {
 	contextLogger := conf.Log.WithField("rule", "test2")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
-	tf, _ := transform.GenTransform("", "json", "", "", []string{})
+	tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
 	vCtx := context.WithValue(ctx, context.TransKey, tf)
 
 	for _, tt := range tests {

+ 1 - 1
internal/io/http/rest_sink.go

@@ -58,7 +58,7 @@ func (me MultiErrors) Error() string {
 func (ms *RestSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	logger.Debugf("rest sink receive %s", item)
-	decodedData, _, err := ctx.TransformOutput(item, true)
+	decodedData, _, err := ctx.TransformOutput(item)
 	if err != nil {
 		logger.Warnf("rest sink decode data error: %v", err)
 		return fmt.Errorf("rest sink decode data error: %v", err)

+ 4 - 4
internal/io/http/rest_sink_test.go

@@ -189,7 +189,7 @@ func TestRestSink_Apply(t *testing.T) {
 		contextLogger.Debugf(string(body))
 		fmt.Fprint(w, string(body))
 	}))
-	tf, _ := transform.GenTransform("", "json", "", "", []string{})
+	tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
 	defer ts.Close()
 	for i, tt := range tests {
 		requests = nil
@@ -356,7 +356,7 @@ func TestRestSinkTemplate_Apply(t *testing.T) {
 		tt.config["url"] = ts.URL
 		s.Configure(tt.config)
 		s.Open(ctx)
-		vCtx := context.WithValue(ctx, context.TransKey, transform.TransFunc(func(d interface{}, s bool) ([]byte, bool, error) {
+		vCtx := context.WithValue(ctx, context.TransKey, transform.TransFunc(func(d interface{}) ([]byte, bool, error) {
 			return d.([]byte), true, nil
 		}))
 		for _, d := range tt.data {
@@ -387,7 +387,7 @@ func TestRestSinkErrorLog(t *testing.T) {
 		s.Configure(config)
 		s.Open(context.Background())
 
-		tf, _ := transform.GenTransform("", "json", "", "", []string{})
+		tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
 		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
 		reqBody := []map[string]interface{}{
 			{"ab": "hello1"},
@@ -413,7 +413,7 @@ func TestRestSinkErrorLog(t *testing.T) {
 		}
 		s.Configure(config)
 		s.Open(context.Background())
-		tf, _ := transform.GenTransform("", "json", "", "", []string{})
+		tf, _ := transform.GenTransform("", "json", "", "", "", []string{})
 		vCtx := context.WithValue(context.Background(), context.TransKey, tf)
 		err := s.Collect(vCtx, []map[string]interface{}{
 			{"ab": "hello1"},

+ 6 - 3
internal/io/memory/sink.go

@@ -32,6 +32,7 @@ type config struct {
 	RowkindField string   `json:"rowkindField"`
 	KeyField     string   `json:"keyField"`
 	Fields       []string `json:"fields"`
+	DataField    string   `json:"dataField"`
 }
 
 type sink struct {
@@ -40,6 +41,7 @@ type sink struct {
 	keyField     string
 	rowkindField string
 	fields       []string
+	dataField    string
 }
 
 func (s *sink) Open(ctx api.StreamContext) error {
@@ -61,6 +63,7 @@ func (s *sink) Configure(props map[string]interface{}) error {
 	if cfg.DataTemplate != "" {
 		s.hasTransform = true
 	}
+	s.dataField = cfg.DataField
 	s.fields = cfg.Fields
 	s.rowkindField = cfg.RowkindField
 	s.keyField = cfg.KeyField
@@ -77,7 +80,7 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 		return err
 	}
 	if s.hasTransform {
-		jsonBytes, _, err := ctx.TransformOutput(data, true)
+		jsonBytes, _, err := ctx.TransformOutput(data)
 		if err != nil {
 			return err
 		}
@@ -87,8 +90,8 @@ func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 			return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
 		}
 		data = m
-	} else if len(s.fields) > 0 {
-		m, err := transform.SelectMap(data, s.fields)
+	} else {
+		m, _, err := transform.TransItem(data, s.dataField, s.fields)
 		if err != nil {
 			return fmt.Errorf("fail to select fields %v for data %v", s.fields, data)
 		}

+ 1 - 1
internal/io/mqtt/mqtt_sink.go

@@ -85,7 +85,7 @@ func (ms *MQTTSink) Open(ctx api.StreamContext) error {
 
 func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
-	jsonBytes, _, err := ctx.TransformOutput(item, true)
+	jsonBytes, _, err := ctx.TransformOutput(item)
 	if err != nil {
 		return err
 	}

+ 1 - 1
internal/io/neuron/sink.go

@@ -80,7 +80,7 @@ func (s *sink) Open(ctx api.StreamContext) error {
 func (s *sink) Collect(ctx api.StreamContext, data interface{}) error {
 	ctx.GetLogger().Debugf("receive %+v", data)
 	if s.c.Raw {
-		r, _, err := ctx.TransformOutput(data, true)
+		r, _, err := ctx.TransformOutput(data)
 		if err != nil {
 			return err
 		}

+ 4 - 3
internal/io/redis/sink.go

@@ -48,6 +48,7 @@ type config struct {
 	RowkindField string        `json:"rowkindField"`
 	DataTemplate string        `json:"dataTemplate"`
 	Fields       []string      `json:"fields"`
+	DataField    string        `json:"dataField"`
 }
 
 type RedisSink struct {
@@ -92,7 +93,7 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
 	var val string
 	if r.c.DataTemplate != "" { // The result is a string
-		v, _, err := ctx.TransformOutput(data, true)
+		v, _, err := ctx.TransformOutput(data)
 		if err != nil {
 			logger.Error(err)
 			return err
@@ -104,8 +105,8 @@ func (r *RedisSink) Collect(ctx api.StreamContext, data interface{}) error {
 		}
 		data = m
 		val = string(v)
-	} else if len(r.c.Fields) > 0 {
-		m, err := transform.SelectMap(data, r.c.Fields)
+	} else {
+		m, _, err := transform.TransItem(data, r.c.DataField, r.c.Fields)
 		if err != nil {
 			return fmt.Errorf("fail to select fields %v for data %v", r.c.Fields, data)
 		}

+ 2 - 2
internal/io/sink/log_sink.go

@@ -28,7 +28,7 @@ import (
 func NewLogSink() api.Sink {
 	return collector.Func(func(ctx api.StreamContext, data interface{}) error {
 		log := ctx.GetLogger()
-		if v, _, err := ctx.TransformOutput(data, true); err == nil {
+		if v, _, err := ctx.TransformOutput(data); err == nil {
 			log.Infof("sink result for rule %s: %s", ctx.GetRuleId(), v)
 			return nil
 		} else {
@@ -49,7 +49,7 @@ func NewLogSinkToMemory() api.Sink {
 	QR.Results = make([]string, 0, 10)
 	return collector.Func(func(ctx api.StreamContext, data interface{}) error {
 		var result string
-		if v, _, err := ctx.TransformOutput(data, true); err == nil {
+		if v, _, err := ctx.TransformOutput(data); err == nil {
 			result = string(v)
 		} else {
 			return fmt.Errorf("transform data error: %v", err)

+ 1 - 1
internal/plugin/portable/runtime/sink.go

@@ -92,7 +92,7 @@ func (ps *PortableSink) Open(ctx api.StreamContext) error {
 
 func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error {
 	ctx.GetLogger().Debugf("Receive %+v", item)
-	if val, _, err := ctx.TransformOutput(item, true); err == nil {
+	if val, _, err := ctx.TransformOutput(item); err == nil {
 		ctx.GetLogger().Debugf("Send %s", val)
 		e := ps.dataCh.Send(val)
 		if e != nil {

+ 2 - 2
internal/topo/context/default_test.go

@@ -227,7 +227,7 @@ func TestParseTemplate(t *testing.T) {
 }
 
 func TestTransition(t *testing.T) {
-	var mockFunc transform.TransFunc = func(d interface{}, s bool) ([]byte, bool, error) {
+	var mockFunc transform.TransFunc = func(d interface{}) ([]byte, bool, error) {
 		return []byte(fmt.Sprintf("%v", d)), true, nil
 	}
 	tests := []struct {
@@ -250,7 +250,7 @@ func TestTransition(t *testing.T) {
 	ctx := Background().WithMeta("testTransRule", "op1", &state.MemoryStore{}).(*DefaultContext)
 	nc := WithValue(ctx, TransKey, mockFunc)
 	for i, tt := range tests {
-		r, _, _ := nc.TransformOutput(tt.data, true)
+		r, _, _ := nc.TransformOutput(tt.data)
 		if !reflect.DeepEqual(tt.r, r) {
 			t.Errorf("%d\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, string(tt.r), string(r))
 		}

+ 2 - 2
internal/topo/context/transform.go

@@ -23,11 +23,11 @@ import (
 const TransKey = "$$trans"
 
 // TransformOutput Lazy transform output to bytes
-func (c *DefaultContext) TransformOutput(data interface{}, selected bool) ([]byte, bool, error) {
+func (c *DefaultContext) TransformOutput(data interface{}) ([]byte, bool, error) {
 	v := c.Value(TransKey)
 	f, ok := v.(transform.TransFunc)
 	if ok {
-		return f(data, selected)
+		return f(data)
 	}
 	return nil, false, fmt.Errorf("no transform configured")
 }

+ 7 - 1
internal/topo/node/sink_node.go

@@ -46,6 +46,7 @@ type SinkConf struct {
 	Delimiter      string   `json:"delimiter"`
 	BufferLength   int      `json:"bufferLength"`
 	Fields         []string `json:"fields"`
+	DataField      string   `json:"dataField"`
 	BatchSize      int      `json:"batchSize"`
 	LingerInterval int      `json:"lingerInterval"`
 	conf.SinkConf
@@ -121,7 +122,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
 				return err
 			}
 
-			tf, err := transform.GenTransform(sconf.DataTemplate, sconf.Format, sconf.SchemaId, sconf.Delimiter, sconf.Fields)
+			tf, err := transform.GenTransform(sconf.DataTemplate, sconf.Format, sconf.SchemaId, sconf.Delimiter, sconf.DataField, sconf.Fields)
 			if err != nil {
 				msg := fmt.Sprintf("property dataTemplate %v is invalid: %v", sconf.DataTemplate, err)
 				logger.Warnf(msg)
@@ -304,6 +305,11 @@ func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) {
 		conf.Log.Warnf("RunAsync is deprecated and ignored.")
 		return nil, fmt.Errorf("cache is not supported for async sink, do not use enableCache and runAsync properties together")
 	}
+	if sconf.DataField == "" {
+		if v, ok := m.options["tableDataField"]; ok {
+			sconf.DataField = v.(string)
+		}
+	}
 	err = sconf.SinkConf.Validate()
 	if err != nil {
 		return nil, fmt.Errorf("invalid cache properties: %v", err)

+ 26 - 1
internal/topo/node/sink_node_test.go

@@ -550,6 +550,7 @@ func TestSinkFields_Apply(t *testing.T) {
 		format    string
 		schemaId  string
 		delimiter string
+		dataField string
 		fields    []string
 		data      interface{}
 		result    [][]byte
@@ -596,12 +597,36 @@ func TestSinkFields_Apply(t *testing.T) {
 			data:     map[string]interface{}{"a": "1", "b": "2", "c": "3"},
 			result:   [][]byte{[]byte(`{"a":null,"b":null}`)},
 		},
+		{
+			format:    "json",
+			dataField: "device",
+			fields:    []string{"a", "b"},
+			data:      map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
+			result:    [][]byte{[]byte(`{"a":"1","b":"2"}`)},
+		},
+		{
+			format:    "delimited",
+			delimiter: ",",
+			fields:    []string{"a", "b"},
+			dataField: "device",
+			data:      map[string]interface{}{"device": map[string]interface{}{"a": "1", "b": "2", "c": "3"}, "a": 11, "b": 22, "c": 33},
+			result:    [][]byte{[]byte(`1,2`)},
+		},
+		{
+			format:    "json",
+			schemaId:  "",
+			fields:    []string{"a", "b"},
+			dt:        `{"device": {"a": {{.a}}}}`,
+			dataField: "device",
+			data:      map[string]interface{}{"a": "1", "b": "2", "c": "3"},
+			result:    [][]byte{[]byte(`{"a":1,"b":null}`)},
+		},
 	}
 	contextLogger := conf.Log.WithField("rule", "TestSinkFields_Apply")
 	ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger)
 
 	for i, tt := range tests {
-		tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.fields)
+		tf, _ := transform.GenTransform(tt.dt, tt.format, tt.schemaId, tt.delimiter, tt.dataField, tt.fields)
 		vCtx := context.WithValue(ctx, context.TransKey, tf)
 		mockSink := mocknode.NewMockSink()
 		mockSink.Collect(vCtx, tt.data)

+ 1 - 1
internal/topo/topotest/mocknode/mock_sink.go

@@ -39,7 +39,7 @@ func (m *MockSink) Open(ctx api.StreamContext) error {
 func (m *MockSink) Collect(ctx api.StreamContext, item interface{}) error {
 	logger := ctx.GetLogger()
 	fmt.Println("mock sink receive ", item)
-	if v, _, err := ctx.TransformOutput(item, true); err == nil {
+	if v, _, err := ctx.TransformOutput(item); err == nil {
 		logger.Debugf("mock sink receive %s", item)
 		m.results = append(m.results, v)
 	} else {

+ 53 - 36
internal/topo/transform/template.go

@@ -28,14 +28,9 @@ import (
 )
 
 // TransFunc is the function to transform data
+type TransFunc func(interface{}) ([]byte, bool, error)
 
-// The second parameter indicates whether to select fields based on the fields property.
-// If it is false, then after the dataTemplate, output the result directly.
-// If it is true, then after the dataTemplate, select the fields based on the fields property.
-
-type TransFunc func(interface{}, bool) ([]byte, bool, error)
-
-func GenTransform(dt string, format string, schemaId string, delimiter string, fields []string) (TransFunc, error) {
+func GenTransform(dt string, format string, schemaId string, delimiter string, dataField string, fields []string) (TransFunc, error) {
 	var (
 		tp  *template.Template = nil
 		c   message.Converter
@@ -67,11 +62,13 @@ func GenTransform(dt string, format string, schemaId string, delimiter string, f
 		}
 		tp = temp
 	}
-	return func(d interface{}, s bool) ([]byte, bool, error) {
+	return func(d interface{}) ([]byte, bool, error) {
 		var (
 			bs          []byte
 			transformed bool
 			selected    bool
+			m           interface{}
+			e           error
 		)
 		if tp != nil {
 			var output bytes.Buffer
@@ -82,28 +79,17 @@ func GenTransform(dt string, format string, schemaId string, delimiter string, f
 			bs = output.Bytes()
 			transformed = true
 		}
-		// just for sinks like tdengine and sql.
-		if !s {
-			if transformed {
-				return bs, true, nil
-			}
-			outBytes, err := json.Marshal(d)
-			return outBytes, false, err
+
+		if transformed {
+			m, selected, e = TransItem(bs, dataField, fields)
 		} else {
-			// Consider that if only the dataTemplate is needed, and the data after trans cannot be converted into map[string]interface
-			var m interface{}
-			var err error
-			if transformed {
-				m, err = SelectMap(bs, fields)
-			} else {
-				m, err = SelectMap(d, fields)
-			}
-			if err != nil && err.Error() != "fields cannot be empty" {
-				return nil, false, fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(bs), err)
-			} else if err == nil {
-				d = m
-				selected = true
-			}
+			m, selected, e = TransItem(d, dataField, fields)
+		}
+		if e != nil {
+			return nil, false, fmt.Errorf("fail to TransItem data %v for error %v", d, e)
+		}
+		if selected {
+			d = m
 		}
 
 		switch format {
@@ -134,22 +120,53 @@ func GenTp(dt string) (*template.Template, error) {
 	return template.New("sink").Funcs(conf.FuncMap).Parse(dt)
 }
 
-// SelectMap select fields from input map or array of map.
 // If you do not need to convert data to []byte, you can use this function directly. Otherwise, use TransFunc.
-func SelectMap(input interface{}, fields []string) (interface{}, error) {
-	if len(fields) == 0 {
-		return input, fmt.Errorf("fields cannot be empty")
+func TransItem(input interface{}, dataField string, fields []string) (interface{}, bool, error) {
+	if dataField == "" && len(fields) == 0 {
+		return input, false, nil
 	}
-
 	if _, ok := input.([]byte); ok {
-		var m map[string]interface{}
+		var m interface{}
 		err := json.Unmarshal(input.([]byte), &m)
 		if err != nil {
-			return input, fmt.Errorf("fail to decode data %s for error %v", string(input.([]byte)), err)
+			return input, false, fmt.Errorf("fail to decode data %s for error %v", string(input.([]byte)), err)
 		}
 		input = m
 	}
 
+	if dataField != "" {
+		switch input.(type) {
+		case map[string]interface{}:
+			input = input.(map[string]interface{})[dataField]
+		case []interface{}:
+			if len(input.([]interface{})) == 0 {
+				return nil, false, nil
+			}
+			input = input.([]interface{})[0].(map[string]interface{})[dataField]
+		case []map[string]interface{}:
+			if len(input.([]map[string]interface{})) == 0 {
+				return nil, false, nil
+			}
+			input = input.([]map[string]interface{})[0][dataField]
+		default:
+			return nil, false, fmt.Errorf("fail to decode data %v", input)
+		}
+	}
+
+	m, err := selectMap(input, fields)
+	if err != nil && err.Error() != "fields cannot be empty" {
+		return nil, false, fmt.Errorf("fail to decode data %v for error %v", input, err)
+	} else {
+		return m, true, nil
+	}
+}
+
+// selectMap select fields from input map or array of map.
+func selectMap(input interface{}, fields []string) (interface{}, error) {
+	if len(fields) == 0 {
+		return input, fmt.Errorf("fields cannot be empty")
+	}
+
 	outputs := make([]map[string]interface{}, 0)
 	switch input.(type) {
 	case map[string]interface{}:

+ 140 - 9
internal/topo/transform/template_test.go

@@ -15,7 +15,6 @@
 package transform
 
 import (
-	"fmt"
 	"reflect"
 	"testing"
 )
@@ -111,23 +110,155 @@ func Test_SelectMap(t *testing.T) {
 			},
 			want: []byte(`{"a": 1, "b": 2, "c": 3}`),
 		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got, _ := selectMap(tt.args.input, tt.args.fields); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("selectMap() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestTransItem(t *testing.T) {
+	type args struct {
+		input     interface{}
+		dataField string
+		fields    []string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    interface{}
+		wantErr bool
+	}{
 		{
-			name: "test6",
+			name: "test1",
 			args: args{
-				input:  []byte(`{"a": 1, "b": 2, "c": 3}`),
-				fields: []string{"d"},
+				input: map[string]interface{}{
+					"device": map[string]interface{}{
+						"device_id":          1,
+						"device_temperature": 31.2,
+						"device_humidity":    80,
+					},
+					"ts": 1625040000,
+				},
+				dataField: "device",
+				fields:    []string{"device_temperature", "device_humidity"},
 			},
 			want: map[string]interface{}{
-				"d": nil,
+				"device_temperature": 31.2,
+				"device_humidity":    80,
+			},
+			wantErr: false,
+		},
+		{
+			name: "test2",
+			args: args{
+				input: []map[string]interface{}{
+					{
+						"device": map[string]interface{}{
+							"device_id":          1,
+							"device_temperature": 31.2,
+							"device_humidity":    80,
+						},
+						"ts": 1625040000,
+					},
+				},
+				dataField: "device",
+				fields:    []string{"device_temperature", "device_humidity"},
+			},
+			want: map[string]interface{}{
+				"device_temperature": 31.2,
+				"device_humidity":    80,
+			},
+			wantErr: false,
+		},
+		{
+			name: "test3",
+			args: args{
+				input: map[string]interface{}{
+					"telemetry": []map[string]interface{}{
+						{
+							"temperature": 32.32,
+							"humidity":    80.8,
+							"f3":          "f3tagValue",
+							"f4":          "f4tagValue",
+							"ts":          1388082430,
+						},
+						{
+							"temperature": 34.32,
+							"humidity":    81.8,
+							"f3":          "f3tagValue",
+							"f4":          "f4tagValue",
+							"ts":          1388082440,
+						},
+					},
+					"device": map[string]interface{}{
+						"device_id":          1,
+						"device_temperature": 31.2,
+						"device_humidity":    80,
+					},
+				},
+				dataField: "telemetry",
+				fields:    []string{"temperature", "humidity"},
+			},
+			want: []map[string]interface{}{
+				{
+					"temperature": 32.32,
+					"humidity":    80.8,
+				},
+				{
+					"temperature": 34.32,
+					"humidity":    81.8,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "test4",
+			args: args{
+				input: []interface{}{
+					map[string]interface{}{
+						"telemetry": []map[string]interface{}{
+							{
+								"temperature": 32.32,
+								"humidity":    80.8,
+								"ts":          1388082430,
+							},
+							{
+								"temperature": 34.32,
+								"humidity":    81.8,
+								"ts":          1388082440,
+							},
+						},
+					},
+				},
+				dataField: "telemetry",
+				fields:    []string{"temperature", "humidity"},
 			},
+			want: []map[string]interface{}{
+				{
+					"temperature": 32.32,
+					"humidity":    80.8,
+				},
+				{
+					"temperature": 34.32,
+					"humidity":    81.8,
+				},
+			},
+			wantErr: false,
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if got, _ := SelectMap(tt.args.input, tt.args.fields); !reflect.DeepEqual(got, tt.want) {
-				t.Errorf("SelectMap() = %v, want %v", got, tt.want)
-				fmt.Println(reflect.TypeOf(got), reflect.TypeOf(tt.want))
-
+			got, _, err := TransItem(tt.args.input, tt.args.dataField, tt.args.fields)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("TransItem() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("TransItem() got = %v, want %v", got, tt.want)
 			}
 		})
 	}

+ 1 - 2
pkg/api/stream.go

@@ -224,9 +224,8 @@ type StreamContext interface {
 	// TransformOutput Transform output according to the properties including dataTemplate, sendSingle, fields
 	// TransformOutput first transform data through the dataTemplate property,and then select data based on the fields property
 	// It is recommended that you do not configure both the dataTemplate property and the fields property.
-	// selected: whether to select data based on the fields property
 	// The second parameter is whether the data is transformed or just return as its json format.
-	TransformOutput(data interface{}, selected bool) ([]byte, bool, error)
+	TransformOutput(data interface{}) ([]byte, bool, error)
 	// Decode is set in the source according to the format.
 	// It decodes byte array into map or map slice.
 	Decode(data []byte) (map[string]interface{}, error)