|
@@ -0,0 +1,169 @@
|
|
|
|
+// Copyright 2021-2022 EMQ Technologies Co., Ltd.
|
|
|
|
+//
|
|
|
|
+// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
+// you may not use this file except in compliance with the License.
|
|
|
|
+// You may obtain a copy of the License at
|
|
|
|
+//
|
|
|
|
+// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+//
|
|
|
|
+// Unless required by applicable law or agreed to in writing, software
|
|
|
|
+// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+// See the License for the specific language governing permissions and
|
|
|
|
+// limitations under the License.
|
|
|
|
+
|
|
|
|
+// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
+// you may not use this file except in compliance with the License.
|
|
|
|
+// You may obtain a copy of the License at
|
|
|
|
+//
|
|
|
|
+// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+//
|
|
|
|
+// Unless required by applicable law or agreed to in writing, software
|
|
|
|
+// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+// See the License for the specific language governing permissions and
|
|
|
|
+// limitations under the License.
|
|
|
|
+
|
|
|
|
+package main
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "encoding/json"
|
|
|
|
+ "fmt"
|
|
|
|
+ _ "github.com/influxdata/influxdb-client-go/v2"
|
|
|
|
+ client "github.com/influxdata/influxdb-client-go/v2"
|
|
|
|
+ "github.com/lf-edge/ekuiper/pkg/api"
|
|
|
|
+ "strings"
|
|
|
|
+ "time"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+type influxSink2 struct {
|
|
|
|
+ addr string
|
|
|
|
+ token string
|
|
|
|
+ measurement string
|
|
|
|
+ org string
|
|
|
|
+ bucket string
|
|
|
|
+ tagKey string
|
|
|
|
+ tagValue string
|
|
|
|
+ fields string
|
|
|
|
+ cli client.Client
|
|
|
|
+ fieldMap map[string]interface{}
|
|
|
|
+ hasTransform bool
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *influxSink2) Configure(props map[string]interface{}) error {
|
|
|
|
+ if i, ok := props["addr"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.addr = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if i, ok := props["measurement"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.measurement = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if i, ok := props["tagKey"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.tagKey = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if i, ok := props["tagValue"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.tagValue = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if i, ok := props["fields"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.fields = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if i, ok := props["dataTemplate"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok && i != "" {
|
|
|
|
+ m.hasTransform = true
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if i, ok := props["token"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.token = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if i, ok := props["org"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.org = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if i, ok := props["bucket"]; ok {
|
|
|
|
+ if i, ok := i.(string); ok {
|
|
|
|
+ m.bucket = i
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *influxSink2) Open(ctx api.StreamContext) (err error) {
|
|
|
|
+ logger := ctx.GetLogger()
|
|
|
|
+ logger.Debug("Opening influx2 sink")
|
|
|
|
+ options := client.DefaultOptions().SetBatchSize(100)
|
|
|
|
+ m.cli = client.NewClientWithOptions(m.addr, m.token, options)
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *influxSink2) Collect(ctx api.StreamContext, data interface{}) error {
|
|
|
|
+ logger := ctx.GetLogger()
|
|
|
|
+ if m.hasTransform {
|
|
|
|
+ jsonBytes, _, err := ctx.TransformOutput(data)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ m := make(map[string]interface{})
|
|
|
|
+ err = json.Unmarshal(jsonBytes, &m)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return fmt.Errorf("fail to decode data %s after applying dataTemplate for error %v", string(jsonBytes), err)
|
|
|
|
+ }
|
|
|
|
+ data = m
|
|
|
|
+ }
|
|
|
|
+ var output map[string]interface{}
|
|
|
|
+ switch v := data.(type) {
|
|
|
|
+ case map[string]interface{}:
|
|
|
|
+ output = v
|
|
|
|
+ case []map[string]interface{}:
|
|
|
|
+ if len(v) > 0 {
|
|
|
|
+ output = v[0]
|
|
|
|
+ } else {
|
|
|
|
+ ctx.GetLogger().Warnf("Get empty data %v, just return", data)
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ writeAPI := m.cli.WriteAPIBlocking(m.org, m.bucket)
|
|
|
|
+
|
|
|
|
+ tags := map[string]string{m.tagKey: m.tagValue}
|
|
|
|
+ fields := strings.Split(m.fields, ",")
|
|
|
|
+ m.fieldMap = make(map[string]interface{}, 100)
|
|
|
|
+ for _, field := range fields {
|
|
|
|
+ if output[field] != nil {
|
|
|
|
+ m.fieldMap[field] = output[field]
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ pt := client.NewPoint(m.measurement, tags, m.fieldMap, time.Now())
|
|
|
|
+
|
|
|
|
+ err := writeAPI.WritePoint(ctx, pt)
|
|
|
|
+ if err != nil {
|
|
|
|
+ logger.Debug(err)
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ logger.Debug("insert data into influxdb2 success")
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (m *influxSink2) Close(ctx api.StreamContext) error {
|
|
|
|
+ m.cli.Close()
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func Influx2() api.Sink {
|
|
|
|
+ return &influxSink2{}
|
|
|
|
+}
|