influxdb.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // +build plugins
  2. package main
  3. import (
  4. "encoding/json"
  5. api "github.com/emqx/kuiper/xstream/api"
  6. _ "github.com/influxdata/influxdb1-client/v2"
  7. client "github.com/influxdata/influxdb1-client/v2"
  8. "strings"
  9. "time"
  10. )
  11. type influxSink struct {
  12. addr string
  13. username string
  14. password string
  15. measurement string
  16. databasename string
  17. tagkey string
  18. tagvalue string
  19. fields string
  20. }
  21. var cli client.Client
  22. var fieldmap map[string]interface{}
  23. type ListMap []map[string]interface{}
  24. func (m *influxSink) Configure(props map[string]interface{}) error {
  25. if i, ok := props["addr"]; ok {
  26. if i, ok := i.(string); ok {
  27. m.addr = i
  28. }
  29. }
  30. if i, ok := props["username"]; ok {
  31. if i, ok := i.(string); ok {
  32. m.username = i
  33. }
  34. }
  35. if i, ok := props["password"]; ok {
  36. if i, ok := i.(string); ok {
  37. m.password = i
  38. }
  39. }
  40. if i, ok := props["measurement"]; ok {
  41. if i, ok := i.(string); ok {
  42. m.measurement = i
  43. }
  44. }
  45. if i, ok := props["databasename"]; ok {
  46. if i, ok := i.(string); ok {
  47. m.databasename = i
  48. }
  49. }
  50. if i, ok := props["tagkey"]; ok {
  51. if i, ok := i.(string); ok {
  52. m.tagkey = i
  53. }
  54. }
  55. if i, ok := props["tagvalue"]; ok {
  56. if i, ok := i.(string); ok {
  57. m.tagvalue = i
  58. }
  59. }
  60. if i, ok := props["fields"]; ok {
  61. if i, ok := i.(string); ok {
  62. m.fields = i
  63. }
  64. }
  65. return nil
  66. }
  67. func (m *influxSink) Open(ctx api.StreamContext) (err error) {
  68. logger := ctx.GetLogger()
  69. logger.Debug("Opening influx sink")
  70. cli, err = client.NewHTTPClient(client.HTTPConfig{
  71. Addr: m.addr,
  72. Username: m.username,
  73. Password: m.password,
  74. })
  75. if err != nil {
  76. logger.Debug(err)
  77. return err
  78. }
  79. return nil
  80. }
  81. func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
  82. logger := ctx.GetLogger()
  83. if v, ok := data.([]byte); ok {
  84. var out ListMap
  85. if err := json.Unmarshal([]byte(v), &out); err != nil {
  86. logger.Debug("Failed to unmarshal data with error %s.\n", err)
  87. return err
  88. }
  89. bp, err := client.NewBatchPoints(client.BatchPointsConfig{
  90. Database: m.databasename,
  91. Precision: "ns",
  92. })
  93. if err != nil {
  94. logger.Debug(err)
  95. return err
  96. }
  97. tags := map[string]string{m.tagkey: m.tagvalue}
  98. fields := strings.Split(m.fields, ",")
  99. fieldmap = make(map[string]interface{}, 10)
  100. for _, field := range fields {
  101. fieldmap[field] = out[0][field]
  102. }
  103. pt, err := client.NewPoint(m.measurement, tags, fieldmap, time.Now())
  104. if err != nil {
  105. logger.Debug(err)
  106. return err
  107. }
  108. bp.AddPoint(pt)
  109. err = cli.Write(bp)
  110. if err != nil {
  111. logger.Debug(err)
  112. return err
  113. }
  114. logger.Debug("insert success")
  115. } else {
  116. logger.Debug("insert failed")
  117. }
  118. return nil
  119. }
  120. func (m *influxSink) Close(ctx api.StreamContext) error {
  121. cli.Close()
  122. return nil
  123. }
  124. func Influx() api.Sink {
  125. return &influxSink{}
  126. }