manager.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. // Copyright 2021 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. "regexp"
  19. "sync"
  20. )
  21. const IdProperty = "topic"
  22. type pubConsumers struct {
  23. count int
  24. consumers map[string]chan map[string]interface{} // The consumer channel list [sourceId]chan
  25. }
  26. type subChan struct {
  27. regex *regexp.Regexp
  28. ch chan map[string]interface{}
  29. }
  30. var (
  31. pubTopics = make(map[string]*pubConsumers)
  32. subExps = make(map[string]*subChan)
  33. mu = sync.RWMutex{}
  34. )
  35. func GetSink() *sink {
  36. return &sink{}
  37. }
  38. func GetSource() *source {
  39. return &source{}
  40. }
  41. func createPub(topic string) {
  42. mu.Lock()
  43. defer mu.Unlock()
  44. if c, exists := pubTopics[topic]; exists {
  45. c.count += 1
  46. return
  47. }
  48. c := &pubConsumers{
  49. count: 1,
  50. consumers: make(map[string]chan map[string]interface{}),
  51. }
  52. pubTopics[topic] = c
  53. for sourceId, sc := range subExps {
  54. if sc.regex.MatchString(topic) {
  55. addPubConsumer(topic, sourceId, sc.ch)
  56. }
  57. }
  58. }
  59. func createSub(wildcard string, regex *regexp.Regexp, sourceId string) chan map[string]interface{} {
  60. mu.Lock()
  61. defer mu.Unlock()
  62. ch := make(chan map[string]interface{})
  63. if regex != nil {
  64. subExps[sourceId] = &subChan{
  65. regex: regex,
  66. ch: ch,
  67. }
  68. for topic := range pubTopics {
  69. if regex.MatchString(topic) {
  70. addPubConsumer(topic, sourceId, ch)
  71. }
  72. }
  73. } else {
  74. addPubConsumer(wildcard, sourceId, ch)
  75. }
  76. return ch
  77. }
  78. func closeSourceConsumerChannel(topic string, sourceId string) error {
  79. mu.Lock()
  80. defer mu.Unlock()
  81. if sc, exists := subExps[sourceId]; exists {
  82. close(sc.ch)
  83. delete(subExps, sourceId)
  84. for _, c := range pubTopics {
  85. removePubConsumer(topic, sourceId, c)
  86. }
  87. } else {
  88. if sinkConsumerChannels, exists := pubTopics[topic]; exists {
  89. removePubConsumer(topic, sourceId, sinkConsumerChannels)
  90. }
  91. }
  92. return nil
  93. }
  94. func closeSink(topic string) error {
  95. mu.Lock()
  96. defer mu.Unlock()
  97. if sinkConsumerChannels, exists := pubTopics[topic]; exists {
  98. sinkConsumerChannels.count -= 1
  99. if len(sinkConsumerChannels.consumers) == 0 && sinkConsumerChannels.count == 0 {
  100. delete(pubTopics, topic)
  101. }
  102. }
  103. return nil
  104. }
  105. func produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
  106. c, exists := pubTopics[topic]
  107. if !exists {
  108. return
  109. }
  110. logger := ctx.GetLogger()
  111. var wg sync.WaitGroup
  112. mu.RLock()
  113. // blocking to retain the sequence, expect the source to consume the data immediately
  114. wg.Add(len(c.consumers))
  115. for n, out := range c.consumers {
  116. go func(name string, output chan<- map[string]interface{}) {
  117. select {
  118. case output <- data:
  119. logger.Debugf("broadcast from topic %s to %s done", topic, name)
  120. case <-ctx.Done():
  121. // rule stop so stop waiting
  122. }
  123. wg.Done()
  124. }(n, out)
  125. }
  126. mu.RUnlock()
  127. wg.Wait()
  128. }
  129. func addPubConsumer(topic string, sourceId string, ch chan map[string]interface{}) {
  130. var sinkConsumerChannels *pubConsumers
  131. if c, exists := pubTopics[topic]; exists {
  132. sinkConsumerChannels = c
  133. } else {
  134. sinkConsumerChannels = &pubConsumers{
  135. consumers: make(map[string]chan map[string]interface{}),
  136. }
  137. pubTopics[topic] = sinkConsumerChannels
  138. }
  139. if _, exists := sinkConsumerChannels.consumers[sourceId]; exists {
  140. conf.Log.Warnf("create memory source consumer for %s which is already exists", sourceId)
  141. } else {
  142. sinkConsumerChannels.consumers[sourceId] = ch
  143. }
  144. }
  145. func removePubConsumer(topic string, sourceId string, c *pubConsumers) {
  146. if _, exists := c.consumers[sourceId]; exists {
  147. delete(c.consumers, sourceId)
  148. }
  149. if len(c.consumers) == 0 && c.count == 0 {
  150. delete(pubTopics, topic)
  151. }
  152. }