mqtt_source.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package extensions
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "engine/common"
  6. "engine/xsql"
  7. "engine/xstream/api"
  8. "fmt"
  9. MQTT "github.com/eclipse/paho.mqtt.golang"
  10. "github.com/google/uuid"
  11. "strconv"
  12. "strings"
  13. )
  14. type MQTTSource struct {
  15. srv string
  16. tpc string
  17. clientid string
  18. pVersion uint
  19. uName string
  20. password string
  21. certPath string
  22. pkeyPath string
  23. schema map[string]interface{}
  24. conn MQTT.Client
  25. }
  26. type MQTTConfig struct {
  27. Qos int `json:"qos"`
  28. Sharedsubscription bool `json:"sharedSubscription"`
  29. Servers []string `json:"servers"`
  30. Clientid string `json:"clientid"`
  31. PVersion string `json:"protocolVersion"`
  32. Uname string `json:"username"`
  33. Password string `json:"password"`
  34. Certification string `json:"certificationPath"`
  35. PrivateKPath string `json:"privateKeyPath"`
  36. }
  37. func (ms *MQTTSource) WithSchema(schema string) *MQTTSource {
  38. return ms
  39. }
  40. func (ms *MQTTSource) Configure(topic string, props map[string]interface{}) error {
  41. cfg := &MQTTConfig{}
  42. err := common.MapToStruct(props, cfg)
  43. if err != nil {
  44. return fmt.Errorf("read properties %v fail with error: %v", props, err)
  45. }
  46. ms.tpc = topic
  47. if srvs := cfg.Servers; srvs != nil && len(srvs) > 0 {
  48. ms.srv = srvs[0]
  49. } else {
  50. return fmt.Errorf("missing server property")
  51. }
  52. ms.clientid = cfg.Clientid
  53. ms.pVersion = 3
  54. if cfg.PVersion == "3.1.1" {
  55. ms.pVersion = 4
  56. }
  57. ms.uName = cfg.Uname
  58. ms.password = strings.Trim(cfg.PVersion, " ")
  59. ms.certPath = cfg.Certification
  60. ms.pkeyPath = cfg.PrivateKPath
  61. return nil
  62. }
  63. func (ms *MQTTSource) Open(ctx api.StreamContext, consume api.ConsumeFunc) error {
  64. log := ctx.GetLogger()
  65. opts := MQTT.NewClientOptions().AddBroker(ms.srv).SetProtocolVersion(ms.pVersion)
  66. if ms.clientid == "" {
  67. if uuid, err := uuid.NewUUID(); err != nil {
  68. return fmt.Errorf("failed to get uuid, the error is %s", err)
  69. } else {
  70. opts.SetClientID(uuid.String())
  71. }
  72. } else {
  73. opts.SetClientID(ms.clientid)
  74. }
  75. if ms.certPath != "" || ms.pkeyPath != "" {
  76. log.Infof("Connect MQTT broker with certification and keys.")
  77. if cp, err := common.ProcessPath(ms.certPath); err == nil {
  78. log.Infof("The certification file is %s.", cp)
  79. if kp, err1 := common.ProcessPath(ms.pkeyPath); err1 == nil {
  80. log.Infof("The private key file is %s.", kp)
  81. if cer, err2 := tls.LoadX509KeyPair(cp, kp); err2 != nil {
  82. return err2
  83. } else {
  84. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  85. }
  86. } else {
  87. return err1
  88. }
  89. } else {
  90. return err
  91. }
  92. } else {
  93. log.Infof("Connect MQTT broker with username and password.")
  94. if ms.uName != "" {
  95. opts = opts.SetUsername(ms.uName)
  96. }
  97. if ms.password != "" {
  98. opts = opts.SetPassword(ms.password)
  99. }
  100. }
  101. h := func(client MQTT.Client, msg MQTT.Message) {
  102. log.Infof("received %s", msg.Payload())
  103. result := make(map[string]interface{})
  104. //The unmarshal type can only be bool, float64, string, []interface{}, map[string]interface{}, nil
  105. if e := json.Unmarshal(msg.Payload(), &result); e != nil {
  106. log.Errorf("Invalid data format, cannot convert %s into JSON with error %s", string(msg.Payload()), e)
  107. return
  108. }
  109. //Convert the keys to lowercase
  110. result = xsql.LowercaseKeyMap(result)
  111. meta := make(map[string]interface{})
  112. meta[xsql.INTERNAL_MQTT_TOPIC_KEY] = msg.Topic()
  113. meta[xsql.INTERNAL_MQTT_MSG_ID_KEY] = strconv.Itoa(int(msg.MessageID()))
  114. consume(result, meta)
  115. }
  116. //TODO error listener?
  117. opts.SetDefaultPublishHandler(h)
  118. c := MQTT.NewClient(opts)
  119. if token := c.Connect(); token.Wait() && token.Error() != nil {
  120. return fmt.Errorf("found error when connecting to %s: %s", ms.srv, token.Error())
  121. }
  122. log.Printf("The connection to server %s was established successfully", ms.srv)
  123. ms.conn = c
  124. if token := c.Subscribe(ms.tpc, 0, nil); token.Wait() && token.Error() != nil {
  125. return fmt.Errorf("Found error: %s", token.Error())
  126. }
  127. log.Printf("Successfully subscribe to topic %s", ms.tpc)
  128. return nil
  129. }
  130. func (ms *MQTTSource) Close(ctx api.StreamContext) error{
  131. ctx.GetLogger().Println("Mqtt Source Done")
  132. if ms.conn != nil && ms.conn.IsConnected() {
  133. ms.conn.Disconnect(5000)
  134. }
  135. return nil
  136. }