mqtt_wrapper.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package mqtt
  15. import (
  16. "fmt"
  17. pahoMqtt "github.com/eclipse/paho.mqtt.golang"
  18. "github.com/lf-edge/ekuiper/internal/conf"
  19. "github.com/lf-edge/ekuiper/internal/topo/connection/clients"
  20. "github.com/lf-edge/ekuiper/pkg/api"
  21. "github.com/lf-edge/ekuiper/pkg/errorx"
  22. "strings"
  23. "sync"
  24. )
  25. type mqttSubscriptionInfo struct {
  26. topic string
  27. qos byte
  28. topicHandler pahoMqtt.MessageHandler
  29. topicConsumers []*clients.ConsumerInfo
  30. }
  31. type mqttClientWrapper struct {
  32. cli *MQTTClient
  33. subLock sync.RWMutex
  34. //topic: subscriber
  35. //multiple go routine can sub same topic
  36. topicSubscriptions map[string]*mqttSubscriptionInfo
  37. //consumerId: SubscribedTopics
  38. subscribers map[string]clients.SubscribedTopics
  39. conSelector string
  40. connected bool
  41. refLock sync.RWMutex
  42. refCnt uint64
  43. }
  44. func NewMqttClientWrapper(props map[string]interface{}) (clients.ClientWrapper, error) {
  45. if props == nil {
  46. conf.Log.Warnf("props is nill for mqtt client wrapper")
  47. }
  48. client := &MQTTClient{}
  49. err := client.CfgValidate(props)
  50. if err != nil {
  51. return nil, err
  52. }
  53. cliWpr := &mqttClientWrapper{
  54. cli: client,
  55. subLock: sync.RWMutex{},
  56. topicSubscriptions: make(map[string]*mqttSubscriptionInfo),
  57. subscribers: make(map[string]clients.SubscribedTopics),
  58. refCnt: 1,
  59. // set default connected to true, because we only return the client when it is create
  60. connected: true,
  61. }
  62. err = client.Connect(cliWpr.onConnectHandler, cliWpr.onConnectLost)
  63. if err != nil {
  64. return nil, err
  65. }
  66. conf.Log.Infof("new mqtt client created")
  67. return cliWpr, nil
  68. }
  69. func (mc *mqttClientWrapper) onConnectHandler(_ pahoMqtt.Client) {
  70. // activeSubscriptions will be empty on the first connection.
  71. // On a re-connect is when the subscriptions must be re-created.
  72. conf.Log.Infof("The connection to mqtt broker %s client id %s established", mc.cli.srv, mc.cli.clientid)
  73. mc.subLock.Lock()
  74. defer mc.subLock.Unlock()
  75. mc.connected = true
  76. for topic, subscription := range mc.topicSubscriptions {
  77. token := mc.cli.conn.Subscribe(topic, subscription.qos, subscription.topicHandler)
  78. if token.Error() != nil {
  79. for _, con := range subscription.topicConsumers {
  80. select {
  81. case con.SubErrors <- token.Error():
  82. break
  83. default:
  84. conf.Log.Warnf("consumer SubErrors channel full for request id %s", con.ConsumerId)
  85. }
  86. }
  87. }
  88. }
  89. }
  90. func (mc *mqttClientWrapper) onConnectLost(_ pahoMqtt.Client, err error) {
  91. mc.subLock.Lock()
  92. defer mc.subLock.Unlock()
  93. mc.connected = false
  94. e := fmt.Errorf("The connection to mqtt broker %s client id %s disconnected with error: %s ", mc.cli.srv, mc.cli.clientid, err.Error())
  95. conf.Log.Warnf(e.Error())
  96. for _, sub := range mc.topicSubscriptions {
  97. if sub != nil {
  98. // broadcast errors to all consumers
  99. for _, consumer := range sub.topicConsumers {
  100. select {
  101. case consumer.SubErrors <- e:
  102. break
  103. default:
  104. conf.Log.Warnf("consumer chan full for request id %s", consumer.ConsumerId)
  105. }
  106. }
  107. }
  108. }
  109. }
  110. func (mc *mqttClientWrapper) newMessageHandler(sub *mqttSubscriptionInfo) pahoMqtt.MessageHandler {
  111. return func(client pahoMqtt.Client, message pahoMqtt.Message) {
  112. if sub != nil {
  113. // broadcast to all consumers
  114. for _, consumer := range sub.topicConsumers {
  115. select {
  116. case consumer.ConsumerChan <- message:
  117. break
  118. default:
  119. conf.Log.Warnf("consumer chan full for request id %s", consumer.ConsumerId)
  120. }
  121. }
  122. }
  123. }
  124. }
  125. func (mc *mqttClientWrapper) Publish(_ api.StreamContext, topic string, message []byte, params map[string]interface{}) error {
  126. err := mc.checkConn()
  127. if err != nil {
  128. return err
  129. }
  130. var Qos byte = 0
  131. if pq, ok := params["qos"]; ok {
  132. if v, ok := pq.(byte); ok {
  133. Qos = v
  134. }
  135. }
  136. retained := false
  137. if pk, ok := params["retained"]; ok {
  138. if v, ok := pk.(bool); ok {
  139. retained = v
  140. }
  141. }
  142. err = mc.cli.Publish(topic, Qos, retained, message)
  143. if err != nil {
  144. return err
  145. }
  146. return nil
  147. }
  148. func (mc *mqttClientWrapper) checkConn() error {
  149. mc.subLock.RLock()
  150. defer mc.subLock.RUnlock()
  151. if !mc.connected {
  152. return fmt.Errorf("%s: %s", errorx.IOErr, "mqtt client is not connected")
  153. }
  154. return nil
  155. }
  156. func (mc *mqttClientWrapper) Subscribe(c api.StreamContext, subChan []api.TopicChannel, messageErrors chan error, params map[string]interface{}) error {
  157. log := c.GetLogger()
  158. mc.subLock.Lock()
  159. defer mc.subLock.Unlock()
  160. subId := fmt.Sprintf("%s_%s_%d", c.GetRuleId(), c.GetOpId(), c.GetInstanceId())
  161. if _, ok := mc.subscribers[subId]; ok {
  162. return fmt.Errorf("already have subscription %s", subId)
  163. }
  164. var Qos byte = 0
  165. if pq, ok := params["qos"]; ok {
  166. if v, ok := pq.(byte); ok {
  167. Qos = v
  168. }
  169. }
  170. subTopics := clients.SubscribedTopics{
  171. Topics: make([]string, 0),
  172. }
  173. for _, tpChan := range subChan {
  174. tpc := tpChan.Topic
  175. subTopics.Topics = append(subTopics.Topics, tpc)
  176. sub, found := mc.topicSubscriptions[tpc]
  177. if found {
  178. sub.topicConsumers = append(sub.topicConsumers, &clients.ConsumerInfo{
  179. ConsumerId: subId,
  180. ConsumerChan: tpChan.Messages,
  181. SubErrors: messageErrors,
  182. })
  183. log.Infof("subscription for topic %s already exists, reqId is %s, total subs %d", tpc, subId, len(sub.topicConsumers))
  184. } else {
  185. sub := &mqttSubscriptionInfo{
  186. topic: tpc,
  187. qos: Qos,
  188. topicConsumers: []*clients.ConsumerInfo{
  189. {
  190. ConsumerId: subId,
  191. ConsumerChan: tpChan.Messages,
  192. SubErrors: messageErrors,
  193. },
  194. },
  195. }
  196. sub.topicHandler = mc.newMessageHandler(sub)
  197. log.Infof("new subscription for topic %s, reqId is %s", tpc, subId)
  198. token := mc.cli.conn.Subscribe(tpc, Qos, sub.topicHandler)
  199. if token.Error() != nil {
  200. return token.Error()
  201. }
  202. mc.topicSubscriptions[tpc] = sub
  203. }
  204. }
  205. mc.subscribers[subId] = subTopics
  206. return nil
  207. }
  208. func (mc *mqttClientWrapper) unsubscribe(c api.StreamContext) {
  209. log := c.GetLogger()
  210. mc.subLock.Lock()
  211. defer mc.subLock.Unlock()
  212. subId := fmt.Sprintf("%s_%s_%d", c.GetRuleId(), c.GetOpId(), c.GetInstanceId())
  213. subTopics, found := mc.subscribers[subId]
  214. if !found {
  215. log.Errorf("not found subscription id %s", subId)
  216. return
  217. }
  218. for _, tpc := range subTopics.Topics {
  219. if sub, found := mc.topicSubscriptions[tpc]; found {
  220. for index, consumer := range sub.topicConsumers {
  221. if strings.EqualFold(subId, consumer.ConsumerId) {
  222. sub.topicConsumers = append(sub.topicConsumers[:index], sub.topicConsumers[index+1:]...)
  223. log.Infof("unsubscription topic %s for reqId %s, total subs %d", tpc, subId, len(sub.topicConsumers))
  224. }
  225. }
  226. if 0 == len(sub.topicConsumers) {
  227. delete(mc.topicSubscriptions, tpc)
  228. log.Infof("delete subscription for topic %s", tpc)
  229. mc.cli.conn.Unsubscribe(tpc)
  230. }
  231. }
  232. }
  233. delete(mc.subscribers, subId)
  234. }
  235. func (mc *mqttClientWrapper) Release(c api.StreamContext) bool {
  236. mc.unsubscribe(c)
  237. return mc.deRef(c)
  238. }
  239. func (mc *mqttClientWrapper) SetConnectionSelector(conSelector string) {
  240. mc.conSelector = conSelector
  241. }
  242. func (mc *mqttClientWrapper) GetConnectionSelector() string {
  243. return mc.conSelector
  244. }
  245. func (mc *mqttClientWrapper) AddRef() {
  246. mc.refLock.Lock()
  247. defer mc.refLock.Unlock()
  248. mc.refCnt = mc.refCnt + 1
  249. conf.Log.Infof("mqtt client wrapper add refence for connection selector %s total refcount %d", mc.conSelector, mc.refCnt)
  250. }
  251. func (mc *mqttClientWrapper) deRef(c api.StreamContext) bool {
  252. log := c.GetLogger()
  253. mc.refLock.Lock()
  254. defer mc.refLock.Unlock()
  255. mc.refCnt = mc.refCnt - 1
  256. log.Infof("mqtt client wrapper reference count %d", mc.refCnt)
  257. if mc.refCnt == 0 {
  258. _ = mc.cli.Disconnect()
  259. return true
  260. } else {
  261. return false
  262. }
  263. }