influxdb.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. // +build influxdb
  2. package main
  3. import (
  4. "encoding/json"
  5. api "github.com/emqx/kuiper/xstream/api"
  6. _ "github.com/influxdata/influxdb1-client/v2"
  7. client "github.com/influxdata/influxdb1-client/v2"
  8. "time"
  9. )
  10. type influxSink struct {
  11. addr string
  12. username string
  13. password string
  14. measurement string
  15. databasename string
  16. tagkey string
  17. tagvalue string
  18. }
  19. var cli client.Client
  20. type ListMap []map[string]float64
  21. func (m *influxSink) Configure(props map[string]interface{}) error {
  22. if i, ok := props["addr"]; ok {
  23. if i, ok := i.(string); ok {
  24. m.addr = i
  25. }
  26. }
  27. if i, ok := props["username"]; ok {
  28. if i, ok := i.(string); ok {
  29. m.username = i
  30. }
  31. }
  32. if i, ok := props["password"]; ok {
  33. if i, ok := i.(string); ok {
  34. m.password = i
  35. }
  36. }
  37. if i, ok := props["measurement"]; ok {
  38. if i, ok := i.(string); ok {
  39. m.measurement = i
  40. }
  41. }
  42. if i, ok := props["databasename"]; ok {
  43. if i, ok := i.(string); ok {
  44. m.databasename = i
  45. }
  46. }
  47. if i, ok := props["tagkey"]; ok {
  48. if i, ok := i.(string); ok {
  49. m.tagkey = i
  50. }
  51. }
  52. if i, ok := props["tagvalue"]; ok {
  53. if i, ok := i.(string); ok {
  54. m.tagvalue = i
  55. }
  56. }
  57. return nil
  58. }
  59. func (m *influxSink) Open(ctx api.StreamContext) (err error) {
  60. logger := ctx.GetLogger()
  61. logger.Debug("Opening influx sink")
  62. cli, err = client.NewHTTPClient(client.HTTPConfig{
  63. Addr: m.addr,
  64. Username: m.username,
  65. Password: m.password,
  66. })
  67. if err != nil {
  68. logger.Debug(err)
  69. return err
  70. }
  71. return nil
  72. }
  73. func (m *influxSink) Collect(ctx api.StreamContext, data interface{}) error {
  74. logger := ctx.GetLogger()
  75. if v, ok := data.([]byte); ok {
  76. var out ListMap
  77. if err := json.Unmarshal([]byte(v), &out); err != nil {
  78. logger.Debug("Failed to unmarshal data with error %s.\n", err)
  79. return err
  80. }
  81. bp, err := client.NewBatchPoints(client.BatchPointsConfig{
  82. Database: m.databasename,
  83. Precision: "ns", //default is ns
  84. })
  85. if err != nil {
  86. logger.Debug(err)
  87. return err
  88. }
  89. tags := map[string]string{m.tagkey: m.tagvalue}
  90. fields := map[string]interface{}{
  91. "temperature": out[0]["temperature"],
  92. "humidity": out[0]["humidity"],
  93. }
  94. pt, err := client.NewPoint(m.measurement, tags, fields, time.Now())
  95. if err != nil {
  96. logger.Debug(err)
  97. return err
  98. }
  99. bp.AddPoint(pt)
  100. err = cli.Write(bp)
  101. if err != nil {
  102. logger.Debug(err)
  103. return err
  104. }
  105. logger.Debug("insert success")
  106. } else {
  107. logger.Debug("insert faild")
  108. }
  109. return nil
  110. }
  111. func (m *influxSink) Close(ctx api.StreamContext) error {
  112. // Close the client
  113. // Close the client
  114. cli.Close()
  115. return nil
  116. }
  117. func Influx() api.Sink {
  118. return &influxSink{}
  119. }