mqtt_sink.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package sinks
  2. import (
  3. "crypto/tls"
  4. "engine/xstream/api"
  5. "fmt"
  6. MQTT "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/google/uuid"
  8. "os"
  9. "path/filepath"
  10. "strings"
  11. )
  12. type MQTTSink struct {
  13. srv string
  14. tpc string
  15. clientid string
  16. pVersion uint
  17. uName string
  18. password string
  19. certPath string
  20. pkeyPath string
  21. conn MQTT.Client
  22. }
  23. func NewMqttSink(properties interface{}) (*MQTTSink, error) {
  24. ps, ok := properties.(map[string]interface{})
  25. if !ok {
  26. return nil, fmt.Errorf("expect map[string]interface{} type for the mqtt sink properties")
  27. }
  28. srv, ok := ps["server"]
  29. if !ok {
  30. return nil, fmt.Errorf("mqtt sink is missing property server")
  31. }
  32. tpc, ok := ps["topic"]
  33. if !ok {
  34. return nil, fmt.Errorf("mqtt sink is missing property topic")
  35. }
  36. clientid, ok := ps["clientId"]
  37. if !ok{
  38. if uuid, err := uuid.NewUUID(); err != nil {
  39. return nil, fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  40. }else{
  41. clientid = uuid.String()
  42. }
  43. }
  44. var pVersion uint = 3
  45. pVersionStr, ok := ps["protocol_version"];
  46. if ok {
  47. v, _ := pVersionStr.(string)
  48. if v == "3.1" {
  49. pVersion = 3
  50. } else if v == "3.1.1" {
  51. pVersion = 4
  52. } else {
  53. return nil, fmt.Errorf("Unknown protocol version {0}, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4).", pVersionStr)
  54. }
  55. }
  56. uName := ""
  57. un, ok := ps["username"];
  58. if ok {
  59. v, _ := un.(string)
  60. if strings.Trim(v, " ") != "" {
  61. uName = v
  62. }
  63. }
  64. password := ""
  65. pwd, ok := ps["password"];
  66. if ok {
  67. v, _ := pwd.(string)
  68. if strings.Trim(v, " ") != "" {
  69. password = v
  70. }
  71. }
  72. certPath := ""
  73. if cp, ok := ps["certification_path"]; ok {
  74. if v, ok := cp.(string); ok {
  75. certPath = v
  76. }
  77. }
  78. pKeyPath := ""
  79. if pk, ok := ps["private_key_path"]; ok {
  80. if v, ok := pk.(string); ok {
  81. pKeyPath = v
  82. }
  83. }
  84. ms := &MQTTSink{srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password, certPath:certPath, pkeyPath:pKeyPath}
  85. return ms, nil
  86. }
  87. func processPath(p string) (string, error) {
  88. if abs, err := filepath.Abs(p); err != nil {
  89. return "", nil
  90. } else {
  91. if _, err := os.Stat(abs); os.IsNotExist(err) {
  92. return "", err;
  93. }
  94. return abs, nil
  95. }
  96. }
  97. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  98. log := ctx.GetLogger()
  99. log.Printf("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  100. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  101. if ms.certPath != "" || ms.pkeyPath != "" {
  102. log.Printf("Connect MQTT broker with certification and keys.")
  103. if cp, err := processPath(ms.certPath); err == nil {
  104. if kp, err1 := processPath(ms.pkeyPath); err1 == nil {
  105. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  106. return err2
  107. } else {
  108. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  109. }
  110. } else {
  111. return err1
  112. }
  113. } else {
  114. return err
  115. }
  116. } else {
  117. log.Printf("Connect MQTT broker with username and password.")
  118. if ms.uName != "" {
  119. opts = opts.SetUsername(ms.uName)
  120. }
  121. if ms.password != "" {
  122. opts = opts.SetPassword(ms.password)
  123. }
  124. }
  125. c := MQTT.NewClient(opts)
  126. if token := c.Connect(); token.Wait() && token.Error() != nil {
  127. return fmt.Errorf("Found error: %s", token.Error())
  128. }
  129. log.Printf("The connection to server %s was established successfully", ms.srv)
  130. ms.conn = c
  131. return nil
  132. }
  133. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  134. logger := ctx.GetLogger()
  135. c := ms.conn
  136. logger.Infof("publish %s", item)
  137. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  138. return fmt.Errorf("publish error: %s", token.Error())
  139. }
  140. return nil
  141. }
  142. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  143. logger := ctx.GetLogger()
  144. logger.Infof("Closing mqtt sink")
  145. if ms.conn != nil && ms.conn.IsConnected() {
  146. ms.conn.Disconnect(5000)
  147. }
  148. return nil
  149. }