mqtt_sink.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package sinks
  2. import (
  3. "context"
  4. "engine/common"
  5. "fmt"
  6. MQTT "github.com/eclipse/paho.mqtt.golang"
  7. "github.com/google/uuid"
  8. "strings"
  9. )
  10. type MQTTSink struct {
  11. srv string
  12. tpc string
  13. clientid string
  14. pVersion uint
  15. uName string
  16. password string
  17. input chan interface{}
  18. conn MQTT.Client
  19. ruleId string
  20. name string
  21. //ctx context.Context
  22. }
  23. func NewMqttSink(name string, ruleId string, properties interface{}) (*MQTTSink, error) {
  24. ps, ok := properties.(map[string]interface{})
  25. if !ok {
  26. return nil, fmt.Errorf("expect map[string]interface{} type for the mqtt sink properties")
  27. }
  28. srv, ok := ps["server"]
  29. if !ok {
  30. return nil, fmt.Errorf("mqtt sink is missing property server")
  31. }
  32. tpc, ok := ps["topic"]
  33. if !ok {
  34. return nil, fmt.Errorf("mqtt sink is missing property topic")
  35. }
  36. clientid, ok := ps["clientId"]
  37. if !ok{
  38. if uuid, err := uuid.NewUUID(); err != nil {
  39. return nil, fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  40. }else{
  41. clientid = uuid.String()
  42. }
  43. }
  44. var pVersion uint = 3
  45. pVersionStr, ok := ps["protocol_version"];
  46. if ok {
  47. v, _ := pVersionStr.(string)
  48. if v == "3.1" {
  49. pVersion = 3
  50. } else if v == "3.1.1" {
  51. pVersion = 4
  52. } else {
  53. return nil, fmt.Errorf("Unknown protocol version {0}, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4).", pVersionStr)
  54. }
  55. }
  56. uName := ""
  57. un, ok := ps["username"];
  58. if ok {
  59. v, _ := un.(string)
  60. if strings.Trim(v, " ") != "" {
  61. uName = v
  62. }
  63. }
  64. password := ""
  65. pwd, ok := ps["password"];
  66. if ok {
  67. v, _ := pwd.(string)
  68. if strings.Trim(v, " ") != "" {
  69. password = v
  70. }
  71. }
  72. ms := &MQTTSink{name:name, ruleId: ruleId, input: make(chan interface{}), srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password}
  73. return ms, nil
  74. }
  75. func (ms *MQTTSink) GetName() string {
  76. return ms.name
  77. }
  78. func (ms *MQTTSink) GetInput() (chan<- interface{}, string) {
  79. return ms.input, ms.name
  80. }
  81. func (ms *MQTTSink) Open(ctx context.Context, result chan<- error) {
  82. log := common.GetLogger(ctx)
  83. log.Printf("Opening mqtt sink for rule %s", ms.ruleId)
  84. go func() {
  85. exeCtx, cancel := context.WithCancel(ctx)
  86. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid).SetProtocolVersion(ms.pVersion)
  87. if ms.uName != "" {
  88. opts = opts.SetUsername(ms.uName)
  89. }
  90. if ms.password != "" {
  91. opts = opts.SetPassword(ms.password)
  92. }
  93. c := MQTT.NewClient(opts)
  94. if token := c.Connect(); token.Wait() && token.Error() != nil {
  95. result <- fmt.Errorf("Found error: %s", token.Error())
  96. cancel()
  97. }
  98. log.Printf("The connection to server %s was established successfully", ms.srv)
  99. ms.conn = c
  100. for {
  101. select {
  102. case item := <-ms.input:
  103. log.Infof("publish %s", item)
  104. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  105. result <- fmt.Errorf("Publish error: %s", token.Error())
  106. }
  107. case <-exeCtx.Done():
  108. c.Disconnect(5000)
  109. log.Infof("Closing mqtt sink")
  110. cancel()
  111. return
  112. }
  113. }
  114. }()
  115. }