mqtt_sink.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package sinks
  2. import (
  3. "crypto/tls"
  4. "engine/common"
  5. "engine/xstream/api"
  6. "fmt"
  7. MQTT "github.com/eclipse/paho.mqtt.golang"
  8. "github.com/google/uuid"
  9. "strings"
  10. )
  11. type MQTTSink struct {
  12. srv string
  13. tpc string
  14. clientid string
  15. pVersion uint
  16. uName string
  17. password string
  18. certPath string
  19. pkeyPath string
  20. conn MQTT.Client
  21. }
  22. func NewMqttSink(properties interface{}) (*MQTTSink, error) {
  23. ps, ok := properties.(map[string]interface{})
  24. if !ok {
  25. return nil, fmt.Errorf("expect map[string]interface{} type for the mqtt sink properties")
  26. }
  27. srv, ok := ps["server"]
  28. if !ok {
  29. return nil, fmt.Errorf("mqtt sink is missing property server")
  30. }
  31. tpc, ok := ps["topic"]
  32. if !ok {
  33. return nil, fmt.Errorf("mqtt sink is missing property topic")
  34. }
  35. clientid, ok := ps["clientId"]
  36. if !ok{
  37. if uuid, err := uuid.NewUUID(); err != nil {
  38. return nil, fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  39. }else{
  40. clientid = uuid.String()
  41. }
  42. }
  43. var pVersion uint = 3
  44. pVersionStr, ok := ps["protocolVersion"];
  45. if ok {
  46. v, _ := pVersionStr.(string)
  47. if v == "3.1" {
  48. pVersion = 3
  49. } else if v == "3.1.1" {
  50. pVersion = 4
  51. } else {
  52. return nil, fmt.Errorf("Unknown protocol version {0}, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4).", pVersionStr)
  53. }
  54. }
  55. uName := ""
  56. un, ok := ps["username"];
  57. if ok {
  58. v, _ := un.(string)
  59. if strings.Trim(v, " ") != "" {
  60. uName = v
  61. }
  62. }
  63. password := ""
  64. pwd, ok := ps["password"];
  65. if ok {
  66. v, _ := pwd.(string)
  67. if strings.Trim(v, " ") != "" {
  68. password = v
  69. }
  70. }
  71. certPath := ""
  72. if cp, ok := ps["certificationPath"]; ok {
  73. if v, ok := cp.(string); ok {
  74. certPath = v
  75. }
  76. }
  77. pKeyPath := ""
  78. if pk, ok := ps["privateKeyPath"]; ok {
  79. if v, ok := pk.(string); ok {
  80. pKeyPath = v
  81. }
  82. }
  83. ms := &MQTTSink{srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password, certPath:certPath, pkeyPath:pKeyPath}
  84. return ms, nil
  85. }
  86. func (ms *MQTTSink) Configure(props map[string]interface{}) error {
  87. return nil
  88. }
  89. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  90. log := ctx.GetLogger()
  91. log.Infof("Opening mqtt sink for rule %s.", ctx.GetRuleId())
  92. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  93. if ms.certPath != "" || ms.pkeyPath != "" {
  94. log.Infof("Connect MQTT broker with certification and keys.")
  95. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  96. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  97. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  98. return err2
  99. } else {
  100. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  101. }
  102. } else {
  103. return err1
  104. }
  105. } else {
  106. return err
  107. }
  108. } else {
  109. log.Infof("Connect MQTT broker with username and password.")
  110. if ms.uName != "" {
  111. opts = opts.SetUsername(ms.uName)
  112. }
  113. if ms.password != "" {
  114. opts = opts.SetPassword(ms.password)
  115. }
  116. }
  117. c := MQTT.NewClient(opts)
  118. if token := c.Connect(); token.Wait() && token.Error() != nil {
  119. return fmt.Errorf("Found error: %s", token.Error())
  120. }
  121. log.Infof("The connection to server %s was established successfully", ms.srv)
  122. ms.conn = c
  123. return nil
  124. }
  125. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  126. logger := ctx.GetLogger()
  127. c := ms.conn
  128. logger.Infof("publish %s", item)
  129. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  130. return fmt.Errorf("publish error: %s", token.Error())
  131. }
  132. return nil
  133. }
  134. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  135. logger := ctx.GetLogger()
  136. logger.Infof("Closing mqtt sink")
  137. if ms.conn != nil && ms.conn.IsConnected() {
  138. ms.conn.Disconnect(5000)
  139. }
  140. return nil
  141. }