influx.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. // Licensed under the Apache License, Version 2.0 (the "License");
  2. // you may not use this file except in compliance with the License.
  3. // You may obtain a copy of the License at
  4. //
  5. // http://www.apache.org/licenses/LICENSE-2.0
  6. //
  7. // Unless required by applicable law or agreed to in writing, software
  8. // distributed under the License is distributed on an "AS IS" BASIS,
  9. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. // See the License for the specific language governing permissions and
  11. // limitations under the License.
  12. // +build plugins
  13. package main
  14. import (
  15. "encoding/json"
  16. _ "github.com/influxdata/influxdb1-client/v2"
  17. client "github.com/influxdata/influxdb1-client/v2"
  18. api "github.com/lf-edge/ekuiper/pkg/api"
  19. "strings"
  20. "time"
  21. )
  22. type influxSink struct {
  23. addr string
  24. username string
  25. password string
  26. measurement string
  27. databasename string
  28. tagkey string
  29. tagvalue string
  30. fields string
  31. cli client.Client
  32. fieldmap map[string]interface{}
  33. }
  34. type ListMap []map[string]interface{}
  35. func (m *influxSink) Configure(props map[string]interface{}) error {
  36. if i, ok := props["addr"]; ok {
  37. if i, ok := i.(string); ok {
  38. m.addr = i
  39. }
  40. }
  41. if i, ok := props["username"]; ok {
  42. if i, ok := i.(string); ok {
  43. m.username = i
  44. }
  45. }
  46. if i, ok := props["password"]; ok {
  47. if i, ok := i.(string); ok {
  48. m.password = i
  49. }
  50. }
  51. if i, ok := props["measurement"]; ok {
  52. if i, ok := i.(string); ok {
  53. m.measurement = i
  54. }
  55. }
  56. if i, ok := props["databasename"]; ok {
  57. if i, ok := i.(string); ok {
  58. m.databasename = i
  59. }
  60. }
  61. if i, ok := props["tagkey"]; ok {
  62. if i, ok := i.(string); ok {
  63. m.tagkey = i
  64. }
  65. }
  66. if i, ok := props["tagvalue"]; ok {
  67. if i, ok := i.(string); ok {
  68. m.tagvalue = i
  69. }
  70. }
  71. if i, ok := props["fields"]; ok {
  72. if i, ok := i.(string); ok {
  73. m.fields = i
  74. }
  75. }
  76. return nil
  77. }
  78. func (m *influxSink) Open(ctx api.StreamContext) (err error) {
  79. logger := ctx.GetLogger()
  80. logger.Debug("Opening influx sink")
  81. m.cli, err = client.NewHTTPClient(client.HTTPConfig{
  82. Addr: m.addr,
  83. Username: m.username,
  84. Password: m.password,
  85. })
  86. if err != nil {
  87. logger.Debug(err)
  88. return err
  89. }
  90. return nil
  91. }
  92. func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
  93. logger := ctx.GetLogger()
  94. if v, ok := data.([]byte); ok {
  95. var out ListMap
  96. if err := json.Unmarshal([]byte(v), &out); err != nil {
  97. logger.Debug("Failed to unmarshal data with error %s.\n", err)
  98. return err
  99. }
  100. bp, err := client.NewBatchPoints(client.BatchPointsConfig{
  101. Database: m.databasename,
  102. Precision: "ns",
  103. })
  104. if err != nil {
  105. logger.Debug(err)
  106. return err
  107. }
  108. tags := map[string]string{m.tagkey: m.tagvalue}
  109. fields := strings.Split(m.fields, ",")
  110. m.fieldmap = make(map[string]interface{}, 100)
  111. for _, field := range fields {
  112. if out[0][field] != nil {
  113. m.fieldmap[field] = out[0][field]
  114. }
  115. }
  116. pt, err := client.NewPoint(m.measurement, tags, m.fieldmap, time.Now())
  117. if err != nil {
  118. logger.Debug(err)
  119. return err
  120. }
  121. bp.AddPoint(pt)
  122. err = m.cli.Write(bp)
  123. if err != nil {
  124. logger.Debug(err)
  125. return err
  126. }
  127. logger.Debug("insert success")
  128. } else {
  129. logger.Debug("insert failed")
  130. }
  131. return nil
  132. }
  133. func (m *influxSink) Close(ctx api.StreamContext) error {
  134. m.cli.Close()
  135. return nil
  136. }
  137. func Influx() api.Sink {
  138. return &influxSink{}
  139. }