edgex_wrapper.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. // Copyright 2022-2023 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build edgex
  15. // +build edgex
  16. package edgex
  17. import (
  18. "fmt"
  19. "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
  20. "github.com/lf-edge/ekuiper/internal/conf"
  21. "github.com/lf-edge/ekuiper/internal/topo/connection/clients"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "strings"
  24. "sync"
  25. )
  26. type messageHandler func(stopChan chan struct{}, msgChan chan types.MessageEnvelope)
  27. type edgexSubscriptionInfo struct {
  28. topic string
  29. handler messageHandler
  30. stop chan struct{}
  31. topicConsumers []*clients.ConsumerInfo
  32. hasError bool
  33. }
  34. type edgexClientWrapper struct {
  35. cli *EdgexClient
  36. subLock sync.RWMutex
  37. //topic: subscriber
  38. //multiple go routine can sub same topic
  39. topicSubscriptions map[string]*edgexSubscriptionInfo
  40. //consumerId: SubscribedTopics
  41. subscribers map[string]clients.SubscribedTopics
  42. conSelector string
  43. refLock sync.RWMutex
  44. refCnt uint64
  45. }
  46. func NewEdgeClientWrapper(props map[string]interface{}) (clients.ClientWrapper, error) {
  47. if props == nil {
  48. conf.Log.Warnf("props is nill for mqtt client wrapper")
  49. }
  50. client := &EdgexClient{}
  51. err := client.CfgValidate(props)
  52. if err != nil {
  53. return nil, err
  54. }
  55. cliWpr := &edgexClientWrapper{
  56. cli: client,
  57. subLock: sync.RWMutex{},
  58. topicSubscriptions: make(map[string]*edgexSubscriptionInfo),
  59. subscribers: make(map[string]clients.SubscribedTopics),
  60. refCnt: 1,
  61. }
  62. err = client.Connect()
  63. if err != nil {
  64. return nil, err
  65. }
  66. return cliWpr, nil
  67. }
  68. func (mc *edgexClientWrapper) Publish(c api.StreamContext, topic string, message []byte, params map[string]interface{}) error {
  69. env := types.NewMessageEnvelope(message, c)
  70. env.ContentType = "application/json"
  71. if pk, ok := params["contentType"]; ok {
  72. if v, ok := pk.(string); ok {
  73. env.ContentType = v
  74. }
  75. }
  76. err := mc.cli.Publish(env, topic)
  77. if err != nil {
  78. return err
  79. }
  80. return nil
  81. }
  82. func (mc *edgexClientWrapper) newMessageHandler(topic string, sub *edgexSubscriptionInfo, messageErrors chan error) func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
  83. return func(stopChan chan struct{}, msgChan chan types.MessageEnvelope) {
  84. for {
  85. select {
  86. case <-stopChan:
  87. conf.Log.Infof("message handler for topic %s stopped", topic)
  88. return
  89. case msgErr := <-messageErrors:
  90. //broadcast to all topic subscribers only one time
  91. if sub != nil && !sub.hasError {
  92. for _, consumer := range sub.topicConsumers {
  93. select {
  94. case consumer.SubErrors <- msgErr:
  95. break
  96. default:
  97. conf.Log.Warnf("consumer SubErrors channel full for request id %s", consumer.ConsumerId)
  98. }
  99. }
  100. sub.hasError = true
  101. }
  102. case msg, ok := <-msgChan:
  103. if !ok {
  104. for _, consumer := range sub.topicConsumers {
  105. close(consumer.ConsumerChan)
  106. }
  107. conf.Log.Errorf("message handler for topic %s stopped", topic)
  108. return
  109. }
  110. //broadcast to all topic subscribers
  111. if sub != nil {
  112. if sub.hasError == true {
  113. sub.hasError = false
  114. conf.Log.Infof("Subscription to edgex messagebus topic %s recoverd", topic)
  115. }
  116. for _, consumer := range sub.topicConsumers {
  117. select {
  118. case consumer.ConsumerChan <- &msg:
  119. break
  120. default:
  121. conf.Log.Warnf("consumer chan full for request id %s", consumer.ConsumerId)
  122. }
  123. }
  124. }
  125. }
  126. }
  127. }
  128. }
  129. func (mc *edgexClientWrapper) Subscribe(c api.StreamContext, subChan []api.TopicChannel, messageErrors chan error, _ map[string]interface{}) error {
  130. log := c.GetLogger()
  131. mc.subLock.Lock()
  132. defer mc.subLock.Unlock()
  133. subId := fmt.Sprintf("%s_%s_%d", c.GetRuleId(), c.GetOpId(), c.GetInstanceId())
  134. if _, ok := mc.subscribers[subId]; ok {
  135. return fmt.Errorf("already have subscription %s", subId)
  136. }
  137. subTopics := clients.SubscribedTopics{
  138. Topics: make([]string, 0),
  139. }
  140. for _, tpChan := range subChan {
  141. tpc := tpChan.Topic
  142. subTopics.Topics = append(subTopics.Topics, tpc)
  143. sub, found := mc.topicSubscriptions[tpc]
  144. if found {
  145. sub.topicConsumers = append(sub.topicConsumers, &clients.ConsumerInfo{
  146. ConsumerId: subId,
  147. ConsumerChan: tpChan.Messages,
  148. SubErrors: messageErrors,
  149. })
  150. log.Infof("subscription for topic %s already exists, reqId is %s, total subs %d", tpc, subId, len(sub.topicConsumers))
  151. } else {
  152. sub := &edgexSubscriptionInfo{
  153. topic: tpc,
  154. stop: make(chan struct{}, 1),
  155. topicConsumers: []*clients.ConsumerInfo{
  156. {
  157. ConsumerId: subId,
  158. ConsumerChan: tpChan.Messages,
  159. SubErrors: messageErrors,
  160. },
  161. },
  162. hasError: false,
  163. }
  164. log.Infof("new subscription for topic %s, reqId is %s", tpc, subId)
  165. message := make(chan types.MessageEnvelope)
  166. errChan := make(chan error)
  167. if err := mc.cli.Subscribe(message, tpc, errChan); err != nil {
  168. return err
  169. }
  170. sub.handler = mc.newMessageHandler(tpc, sub, errChan)
  171. go sub.handler(sub.stop, message)
  172. mc.topicSubscriptions[tpc] = sub
  173. }
  174. }
  175. mc.subscribers[subId] = subTopics
  176. return nil
  177. }
  178. func (mc *edgexClientWrapper) unsubscribe(c api.StreamContext) {
  179. log := c.GetLogger()
  180. mc.subLock.Lock()
  181. defer mc.subLock.Unlock()
  182. subId := fmt.Sprintf("%s_%s_%d", c.GetRuleId(), c.GetOpId(), c.GetInstanceId())
  183. subTopics, found := mc.subscribers[subId]
  184. if !found {
  185. log.Errorf("not found subscription id %s", subId)
  186. return
  187. }
  188. // just clean the consumers, do not clean the topic subscription
  189. for _, tpc := range subTopics.Topics {
  190. if sub, found := mc.topicSubscriptions[tpc]; found {
  191. for index, consumer := range sub.topicConsumers {
  192. if strings.EqualFold(subId, consumer.ConsumerId) {
  193. sub.topicConsumers = append(sub.topicConsumers[:index], sub.topicConsumers[index+1:]...)
  194. log.Infof("unsubscription topic %s for reqId %s, total subs %d", tpc, subId, len(sub.topicConsumers))
  195. }
  196. }
  197. }
  198. }
  199. delete(mc.subscribers, subId)
  200. }
  201. func (mc *edgexClientWrapper) SetConnectionSelector(conSelector string) {
  202. mc.conSelector = conSelector
  203. }
  204. func (mc *edgexClientWrapper) GetConnectionSelector() string {
  205. return mc.conSelector
  206. }
  207. func (mc *edgexClientWrapper) Release(c api.StreamContext) bool {
  208. mc.unsubscribe(c)
  209. return mc.deRef(c)
  210. }
  211. func (mc *edgexClientWrapper) AddRef() {
  212. mc.refLock.Lock()
  213. defer mc.refLock.Unlock()
  214. mc.refCnt = mc.refCnt + 1
  215. conf.Log.Infof("edgex client wrapper add refence for connection selector %s total refcount %d", mc.conSelector, mc.refCnt)
  216. }
  217. func (mc *edgexClientWrapper) deRef(c api.StreamContext) bool {
  218. log := c.GetLogger()
  219. mc.refLock.Lock()
  220. defer mc.refLock.Unlock()
  221. mc.refCnt = mc.refCnt - 1
  222. if mc.refCnt != 0 {
  223. conf.Log.Infof("edgex client wrapper derefence for connection selector %s total refcount %d", mc.conSelector, mc.refCnt)
  224. }
  225. if mc.refCnt == 0 {
  226. log.Infof("mqtt client wrapper reference count 0")
  227. // clean the go routine that waiting on the messages
  228. for _, sub := range mc.topicSubscriptions {
  229. sub.stop <- struct{}{}
  230. }
  231. _ = mc.cli.Disconnect()
  232. return true
  233. } else {
  234. return false
  235. }
  236. }