|
@@ -44,12 +44,12 @@ type influxSink struct {
|
|
username string
|
|
username string
|
|
password string
|
|
password string
|
|
measurement string
|
|
measurement string
|
|
- databasename string
|
|
|
|
- tagkey string
|
|
|
|
- tagvalue string
|
|
|
|
|
|
+ databaseName string
|
|
|
|
+ tagKey string
|
|
|
|
+ tagValue string
|
|
fields string
|
|
fields string
|
|
cli client.Client
|
|
cli client.Client
|
|
- fieldmap map[string]interface{}
|
|
|
|
|
|
+ fieldMap map[string]interface{}
|
|
hasTransform bool
|
|
hasTransform bool
|
|
}
|
|
}
|
|
|
|
|
|
@@ -76,17 +76,17 @@ func (m *influxSink) Configure(props map[string]interface{}) error {
|
|
}
|
|
}
|
|
if i, ok := props["databasename"]; ok {
|
|
if i, ok := props["databasename"]; ok {
|
|
if i, ok := i.(string); ok {
|
|
if i, ok := i.(string); ok {
|
|
- m.databasename = i
|
|
|
|
|
|
+ m.databaseName = i
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if i, ok := props["tagkey"]; ok {
|
|
if i, ok := props["tagkey"]; ok {
|
|
if i, ok := i.(string); ok {
|
|
if i, ok := i.(string); ok {
|
|
- m.tagkey = i
|
|
|
|
|
|
+ m.tagKey = i
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if i, ok := props["tagvalue"]; ok {
|
|
if i, ok := props["tagvalue"]; ok {
|
|
if i, ok := i.(string); ok {
|
|
if i, ok := i.(string); ok {
|
|
- m.tagvalue = i
|
|
|
|
|
|
+ m.tagValue = i
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if i, ok := props["fields"]; ok {
|
|
if i, ok := props["fields"]; ok {
|
|
@@ -94,8 +94,10 @@ func (m *influxSink) Configure(props map[string]interface{}) error {
|
|
m.fields = i
|
|
m.fields = i
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if _, ok := props["dataTemplate"]; ok {
|
|
|
|
- m.hasTransform = true
|
|
|
|
|
|
+ if i, ok := props["dataTemplate"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok && i != "" {
|
|
|
|
+ m.hasTransform = true
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
@@ -143,23 +145,23 @@ func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
|
|
}
|
|
}
|
|
|
|
|
|
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
|
|
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
|
|
- Database: m.databasename,
|
|
|
|
|
|
+ Database: m.databaseName,
|
|
Precision: "ns",
|
|
Precision: "ns",
|
|
})
|
|
})
|
|
if err != nil {
|
|
if err != nil {
|
|
logger.Debug(err)
|
|
logger.Debug(err)
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
- tags := map[string]string{m.tagkey: m.tagvalue}
|
|
|
|
|
|
+ tags := map[string]string{m.tagKey: m.tagValue}
|
|
fields := strings.Split(m.fields, ",")
|
|
fields := strings.Split(m.fields, ",")
|
|
- m.fieldmap = make(map[string]interface{}, 100)
|
|
|
|
|
|
+ m.fieldMap = make(map[string]interface{}, 100)
|
|
for _, field := range fields {
|
|
for _, field := range fields {
|
|
if output[field] != nil {
|
|
if output[field] != nil {
|
|
- m.fieldmap[field] = output[field]
|
|
|
|
|
|
+ m.fieldMap[field] = output[field]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- pt, err := client.NewPoint(m.measurement, tags, m.fieldmap, time.Now())
|
|
|
|
|
|
+ pt, err := client.NewPoint(m.measurement, tags, m.fieldMap, time.Now())
|
|
if err != nil {
|
|
if err != nil {
|
|
logger.Debug(err)
|
|
logger.Debug(err)
|
|
return err
|
|
return err
|