mqtt_sink.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package sinks
  2. import (
  3. "engine/xstream/api"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/google/uuid"
  7. "strings"
  8. )
  9. type MQTTSink struct {
  10. srv string
  11. tpc string
  12. clientid string
  13. pVersion uint
  14. uName string
  15. password string
  16. conn MQTT.Client
  17. }
  18. func NewMqttSink(properties interface{}) (*MQTTSink, error) {
  19. ps, ok := properties.(map[string]interface{})
  20. if !ok {
  21. return nil, fmt.Errorf("expect map[string]interface{} type for the mqtt sink properties")
  22. }
  23. srv, ok := ps["server"]
  24. if !ok {
  25. return nil, fmt.Errorf("mqtt sink is missing property server")
  26. }
  27. tpc, ok := ps["topic"]
  28. if !ok {
  29. return nil, fmt.Errorf("mqtt sink is missing property topic")
  30. }
  31. clientid, ok := ps["clientId"]
  32. if !ok{
  33. if uuid, err := uuid.NewUUID(); err != nil {
  34. return nil, fmt.Errorf("mqtt sink fails to get uuid, the error is %s", err)
  35. }else{
  36. clientid = uuid.String()
  37. }
  38. }
  39. var pVersion uint = 3
  40. pVersionStr, ok := ps["protocol_version"];
  41. if ok {
  42. v, _ := pVersionStr.(string)
  43. if v == "3.1" {
  44. pVersion = 3
  45. } else if v == "3.1.1" {
  46. pVersion = 4
  47. } else {
  48. return nil, fmt.Errorf("Unknown protocol version {0}, the value could be only 3.1 or 3.1.1 (also refers to MQTT version 4).", pVersionStr)
  49. }
  50. }
  51. uName := ""
  52. un, ok := ps["username"];
  53. if ok {
  54. v, _ := un.(string)
  55. if strings.Trim(v, " ") != "" {
  56. uName = v
  57. }
  58. }
  59. password := ""
  60. pwd, ok := ps["password"];
  61. if ok {
  62. v, _ := pwd.(string)
  63. if strings.Trim(v, " ") != "" {
  64. password = v
  65. }
  66. }
  67. ms := &MQTTSink{srv: srv.(string), tpc: tpc.(string), clientid: clientid.(string), pVersion:pVersion, uName:uName, password:password}
  68. return ms, nil
  69. }
  70. func (ms *MQTTSink) Open(ctx api.StreamContext) error {
  71. log := ctx.GetLogger()
  72. log.Printf("Opening mqtt sink for rule %s", ctx.GetRuleId())
  73. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetClientID(ms.clientid)
  74. if ms.uName != "" {
  75. opts = opts.SetUsername(ms.uName)
  76. }
  77. if ms.password != "" {
  78. opts = opts.SetPassword(ms.password)
  79. }
  80. c := MQTT.NewClient(opts)
  81. if token := c.Connect(); token.Wait() && token.Error() != nil {
  82. return fmt.Errorf("Found error: %s", token.Error())
  83. }
  84. log.Printf("The connection to server %s was established successfully", ms.srv)
  85. ms.conn = c
  86. return nil
  87. }
  88. func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error {
  89. logger := ctx.GetLogger()
  90. c := ms.conn
  91. logger.Infof("publish %s", item)
  92. if token := c.Publish(ms.tpc, 0, false, item); token.Wait() && token.Error() != nil {
  93. return fmt.Errorf("publish error: %s", token.Error())
  94. }
  95. return nil
  96. }
  97. func (ms *MQTTSink) Close(ctx api.StreamContext) error {
  98. logger := ctx.GetLogger()
  99. logger.Infof("Closing mqtt sink")
  100. ms.conn.Disconnect(5000)
  101. return nil
  102. }