mqtt_wrapper.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. // Copyright 2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mqtt
  15. import (
  16. "fmt"
  17. pahoMqtt "github.com/eclipse/paho.mqtt.golang"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/connection/clients"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "strings"
  22. "sync"
  23. )
  24. type mqttSubscriptionInfo struct {
  25. topic string
  26. qos byte
  27. topicHandler pahoMqtt.MessageHandler
  28. topicConsumers []*clients.ConsumerInfo
  29. }
  30. type mqttClientWrapper struct {
  31. cli *MQTTClient
  32. subLock sync.RWMutex
  33. //topic: subscriber
  34. //multiple go routine can sub same topic
  35. topicSubscriptions map[string]*mqttSubscriptionInfo
  36. //consumerId: SubscribedTopics
  37. subscribers map[string]clients.SubscribedTopics
  38. conSelector string
  39. refLock sync.RWMutex
  40. refCnt uint64
  41. }
  42. func NewMqttClientWrapper(props map[string]interface{}) (clients.ClientWrapper, error) {
  43. if props == nil {
  44. conf.Log.Warnf("props is nill for mqtt client wrapper")
  45. }
  46. client := &MQTTClient{}
  47. err := client.CfgValidate(props)
  48. if err != nil {
  49. return nil, err
  50. }
  51. cliWpr := &mqttClientWrapper{
  52. cli: client,
  53. subLock: sync.RWMutex{},
  54. topicSubscriptions: make(map[string]*mqttSubscriptionInfo),
  55. subscribers: make(map[string]clients.SubscribedTopics),
  56. refCnt: 1,
  57. }
  58. err = client.Connect(cliWpr.onConnectHandler)
  59. if err != nil {
  60. return nil, err
  61. }
  62. return cliWpr, nil
  63. }
  64. func (mc *mqttClientWrapper) onConnectHandler(_ pahoMqtt.Client) {
  65. // activeSubscriptions will be empty on the first connection.
  66. // On a re-connect is when the subscriptions must be re-created.
  67. conf.Log.Infof("The connection to mqtt broker %s client id %s established", mc.cli.srv, mc.cli.clientid)
  68. mc.subLock.RLock()
  69. defer mc.subLock.RUnlock()
  70. for topic, subscription := range mc.topicSubscriptions {
  71. token := mc.cli.conn.Subscribe(topic, subscription.qos, subscription.topicHandler)
  72. if token.Error() != nil {
  73. for _, con := range subscription.topicConsumers {
  74. con.SubErrors <- token.Error()
  75. }
  76. }
  77. }
  78. }
  79. func (mc *mqttClientWrapper) newMessageHandler(sub *mqttSubscriptionInfo) pahoMqtt.MessageHandler {
  80. return func(client pahoMqtt.Client, message pahoMqtt.Message) {
  81. if sub != nil {
  82. for _, consumer := range sub.topicConsumers {
  83. select {
  84. case consumer.ConsumerChan <- message:
  85. break
  86. default:
  87. conf.Log.Warnf("consumer chan full for request id %s", consumer.ConsumerId)
  88. }
  89. }
  90. }
  91. }
  92. }
  93. func (mc *mqttClientWrapper) Publish(_ api.StreamContext, topic string, message []byte, params map[string]interface{}) error {
  94. var Qos byte = 0
  95. if pq, ok := params["qos"]; ok {
  96. if v, ok := pq.(byte); ok {
  97. Qos = v
  98. }
  99. }
  100. retained := false
  101. if pk, ok := params["retained"]; ok {
  102. if v, ok := pk.(bool); ok {
  103. retained = v
  104. }
  105. }
  106. err := mc.cli.Publish(topic, Qos, retained, message)
  107. if err != nil {
  108. return err
  109. }
  110. return nil
  111. }
  112. func (mc *mqttClientWrapper) Subscribe(c api.StreamContext, subChan []api.TopicChannel, messageErrors chan error, params map[string]interface{}) error {
  113. log := c.GetLogger()
  114. mc.subLock.Lock()
  115. defer mc.subLock.Unlock()
  116. subId := fmt.Sprintf("%s_%s_%d", c.GetRuleId(), c.GetOpId(), c.GetInstanceId())
  117. if _, ok := mc.subscribers[subId]; ok {
  118. return fmt.Errorf("already have subscription %s", subId)
  119. }
  120. var Qos byte = 0
  121. if pq, ok := params["qos"]; ok {
  122. if v, ok := pq.(byte); ok {
  123. Qos = v
  124. }
  125. }
  126. subTopics := clients.SubscribedTopics{
  127. Topics: make([]string, 0),
  128. }
  129. for _, tpChan := range subChan {
  130. tpc := tpChan.Topic
  131. subTopics.Topics = append(subTopics.Topics, tpc)
  132. sub, found := mc.topicSubscriptions[tpc]
  133. if found {
  134. sub.topicConsumers = append(sub.topicConsumers, &clients.ConsumerInfo{
  135. ConsumerId: subId,
  136. ConsumerChan: tpChan.Messages,
  137. SubErrors: messageErrors,
  138. })
  139. log.Infof("subscription for topic %s already exists, reqId is %s, total subs %d", tpc, subId, len(sub.topicConsumers))
  140. } else {
  141. sub := &mqttSubscriptionInfo{
  142. topic: tpc,
  143. qos: Qos,
  144. topicConsumers: []*clients.ConsumerInfo{
  145. {
  146. ConsumerId: subId,
  147. ConsumerChan: tpChan.Messages,
  148. SubErrors: messageErrors,
  149. },
  150. },
  151. }
  152. sub.topicHandler = mc.newMessageHandler(sub)
  153. log.Infof("new subscription for topic %s, reqId is %s", tpc, subId)
  154. token := mc.cli.conn.Subscribe(tpc, Qos, sub.topicHandler)
  155. if token.Error() != nil {
  156. messageErrors <- token.Error()
  157. return token.Error()
  158. }
  159. mc.topicSubscriptions[tpc] = sub
  160. }
  161. }
  162. mc.subscribers[subId] = subTopics
  163. return nil
  164. }
  165. func (mc *mqttClientWrapper) unsubscribe(c api.StreamContext) {
  166. log := c.GetLogger()
  167. mc.subLock.Lock()
  168. defer mc.subLock.Unlock()
  169. subId := fmt.Sprintf("%s_%s_%d", c.GetRuleId(), c.GetOpId(), c.GetInstanceId())
  170. subTopics, found := mc.subscribers[subId]
  171. if !found {
  172. log.Errorf("not found subscription id %s", subId)
  173. return
  174. }
  175. for _, tpc := range subTopics.Topics {
  176. if sub, found := mc.topicSubscriptions[tpc]; found {
  177. for index, consumer := range sub.topicConsumers {
  178. if strings.EqualFold(subId, consumer.ConsumerId) {
  179. close(consumer.ConsumerChan)
  180. sub.topicConsumers = append(sub.topicConsumers[:index], sub.topicConsumers[index+1:]...)
  181. log.Infof("unsubscription topic %s for reqId %s, total subs %d", tpc, subId, len(sub.topicConsumers))
  182. }
  183. }
  184. if 0 == len(sub.topicConsumers) {
  185. delete(mc.topicSubscriptions, tpc)
  186. }
  187. }
  188. }
  189. delete(mc.subscribers, subId)
  190. }
  191. func (mc *mqttClientWrapper) Release(c api.StreamContext) {
  192. mc.unsubscribe(c)
  193. clients.ClientRegistry.Lock.Lock()
  194. mc.DeRef(c)
  195. clients.ClientRegistry.Lock.Unlock()
  196. }
  197. func (mc *mqttClientWrapper) SetConnectionSelector(conSelector string) {
  198. mc.conSelector = conSelector
  199. }
  200. func (mc *mqttClientWrapper) AddRef() {
  201. mc.refLock.Lock()
  202. defer mc.refLock.Unlock()
  203. mc.refCnt = mc.refCnt + 1
  204. conf.Log.Infof("mqtt client wrapper add refence for connection selector %s total refcount %d", mc.conSelector, mc.refCnt)
  205. }
  206. func (mc *mqttClientWrapper) DeRef(c api.StreamContext) {
  207. log := c.GetLogger()
  208. mc.refLock.Lock()
  209. defer mc.refLock.Unlock()
  210. mc.refCnt = mc.refCnt - 1
  211. if mc.refCnt == 0 {
  212. log.Infof("mqtt client wrapper reference count 0")
  213. if mc.conSelector != "" {
  214. conf.Log.Infof("remove mqtt client wrapper for connection selector %s", mc.conSelector)
  215. delete(clients.ClientRegistry.ShareClientStore, mc.conSelector)
  216. }
  217. _ = mc.cli.Disconnect()
  218. }
  219. }