Browse Source

Optimization InfluxDB Sink (#395)

* modify qos type

* 1.configure fields of InfluxDB
2.support data type string,float,int

Co-authored-by: 726518972@qq.com <Smart33690>
安逸的程序猿 4 years atrás
parent
commit
3643d0ca23

+ 3 - 1
docs/en_US/plugins/sinks/influxdb.md

@@ -31,6 +31,7 @@ Restart the Kuiper server to activate the plugin.
 | databasename  | true     | The database of the InfluxDB |
 | tagkey        | true     | The tag key of the InfluxDB |
 | tagvalue      | true     | The tag value of the InfluxDB |
+| fields     | true       | The column of the InfluxDB,split with ","  |
 ## Sample usage
 
 Below is a sample for selecting temperature great than 50 degree, and some profiles only for your reference.
@@ -50,7 +51,8 @@ Below is a sample for selecting temperature great than 50 degree, and some profi
        "measurement": "test",
        "databasename": "databasename",
        "tagkey": "tagkey",
-       "tagvalue": "tagvalue"
+       "tagvalue": "tagvalue",
+       "fields": "humidity,temperature,pressure"
       }
     }
   ]

+ 3 - 1
docs/zh_CN/plugins/sinks/influxdb.md

@@ -30,6 +30,7 @@
 | databasename | 是       | InfluxDB的数据库         |
 | tagkey       | 是       | InfluxDB的标签键         |
 | tagvalue     | 是       | InfluxDB的标签值         |
+| fields     | 是       | InfluxDB的列名,用","隔开         |
 ## 示例用法
 
 下面是选择温度大于50度的样本规则,和一些配置文件仅供参考。
@@ -49,7 +50,8 @@
        "measurement": "test",
        "databasename": "databasename",
        "tagkey": "tagkey",
-       "tagvalue": "tagvalue"
+       "tagvalue": "tagvalue",
+       "fields": "humidity,temperature,pressure"
       }
     }
   ]

+ 18 - 9
plugins/sinks/influxdb.go

@@ -7,6 +7,7 @@ import (
 	api "github.com/emqx/kuiper/xstream/api"
 	_ "github.com/influxdata/influxdb1-client/v2"
 	client "github.com/influxdata/influxdb1-client/v2"
+	"strings"
 	"time"
 )
 
@@ -18,11 +19,14 @@ type influxSink struct {
 	databasename string
 	tagkey       string
 	tagvalue     string
+	fields       string
 }
 
 var cli client.Client
 
-type ListMap []map[string]float64
+var fieldmap map[string]interface{}
+
+type ListMap []map[string]interface{}
 
 func (m *influxSink) Configure(props map[string]interface{}) error {
 	if i, ok := props["addr"]; ok {
@@ -60,6 +64,11 @@ func (m *influxSink) Configure(props map[string]interface{}) error {
 			m.tagvalue = i
 		}
 	}
+	if i, ok := props["fields"]; ok {
+		if i, ok := i.(string); ok {
+			m.fields = i
+		}
+	}
 	return nil
 }
 
@@ -80,6 +89,7 @@ func (m *influxSink) Open(ctx api.StreamContext) (err error) {
 
 func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 	logger := ctx.GetLogger()
+
 	if v, ok := data.([]byte); ok {
 		var out ListMap
 		if err := json.Unmarshal([]byte(v), &out); err != nil {
@@ -88,19 +98,20 @@ func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 		}
 		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
 			Database:  m.databasename,
-			Precision: "ns", //default is ns
+			Precision: "ns",
 		})
 		if err != nil {
 			logger.Debug(err)
 			return err
 		}
 		tags := map[string]string{m.tagkey: m.tagvalue}
-		fields := map[string]interface{}{
-			"temperature": out[0]["temperature"],
-			"humidity":    out[0]["humidity"],
+		fields := strings.Split(m.fields, ",")
+		fieldmap = make(map[string]interface{}, 10)
+		for _, field := range fields {
+			fieldmap[field] = out[0][field]
 		}
 
-		pt, err := client.NewPoint(m.measurement, tags, fields, time.Now())
+		pt, err := client.NewPoint(m.measurement, tags, fieldmap, time.Now())
 		if err != nil {
 			logger.Debug(err)
 			return err
@@ -113,14 +124,12 @@ func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
 		}
 		logger.Debug("insert success")
 	} else {
-		logger.Debug("insert faild")
+		logger.Debug("insert failed")
 	}
 	return nil
 }
 
 func (m *influxSink) Close(ctx api.StreamContext) error {
-	// Close the client
-	// Close the client
 	cli.Close()
 	return nil
 }