manager.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package memory
  15. import (
  16. "github.com/lf-edge/ekuiper/internal/conf"
  17. "github.com/lf-edge/ekuiper/pkg/api"
  18. "regexp"
  19. "sync"
  20. )
  21. const IdProperty = "topic"
  22. type pubConsumers struct {
  23. count int
  24. consumers map[string]chan api.SourceTuple // The consumer channel list [sourceId]chan
  25. }
  26. type subChan struct {
  27. regex *regexp.Regexp
  28. ch chan api.SourceTuple
  29. }
  30. var (
  31. pubTopics = make(map[string]*pubConsumers)
  32. subExps = make(map[string]*subChan)
  33. mu = sync.RWMutex{}
  34. )
  35. func GetSink() *sink {
  36. return &sink{}
  37. }
  38. func GetSource() *source {
  39. return &source{}
  40. }
  41. func CreatePub(topic string) {
  42. mu.Lock()
  43. defer mu.Unlock()
  44. if c, exists := pubTopics[topic]; exists {
  45. c.count += 1
  46. return
  47. }
  48. c := &pubConsumers{
  49. count: 1,
  50. consumers: make(map[string]chan api.SourceTuple),
  51. }
  52. pubTopics[topic] = c
  53. for sourceId, sc := range subExps {
  54. if sc.regex.MatchString(topic) {
  55. addPubConsumer(topic, sourceId, sc.ch)
  56. }
  57. }
  58. }
  59. func CreateSub(wildcard string, regex *regexp.Regexp, sourceId string, bufferLength int) chan api.SourceTuple {
  60. mu.Lock()
  61. defer mu.Unlock()
  62. ch := make(chan api.SourceTuple, bufferLength)
  63. if regex != nil {
  64. subExps[sourceId] = &subChan{
  65. regex: regex,
  66. ch: ch,
  67. }
  68. for topic := range pubTopics {
  69. if regex.MatchString(topic) {
  70. addPubConsumer(topic, sourceId, ch)
  71. }
  72. }
  73. } else {
  74. addPubConsumer(wildcard, sourceId, ch)
  75. }
  76. return ch
  77. }
  78. func CloseSourceConsumerChannel(topic string, sourceId string) {
  79. mu.Lock()
  80. defer mu.Unlock()
  81. if sc, exists := subExps[sourceId]; exists {
  82. close(sc.ch)
  83. delete(subExps, sourceId)
  84. for _, c := range pubTopics {
  85. removePubConsumer(topic, sourceId, c)
  86. }
  87. } else {
  88. if sinkConsumerChannels, exists := pubTopics[topic]; exists {
  89. removePubConsumer(topic, sourceId, sinkConsumerChannels)
  90. }
  91. }
  92. }
  93. func RemovePub(topic string) {
  94. mu.Lock()
  95. defer mu.Unlock()
  96. if sinkConsumerChannels, exists := pubTopics[topic]; exists {
  97. sinkConsumerChannels.count -= 1
  98. if len(sinkConsumerChannels.consumers) == 0 && sinkConsumerChannels.count == 0 {
  99. delete(pubTopics, topic)
  100. }
  101. }
  102. }
  103. func Produce(ctx api.StreamContext, topic string, data map[string]interface{}) {
  104. c, exists := pubTopics[topic]
  105. if !exists {
  106. return
  107. }
  108. logger := ctx.GetLogger()
  109. mu.RLock()
  110. defer mu.RUnlock()
  111. // broadcast to all consumers
  112. for name, out := range c.consumers {
  113. select {
  114. case out <- api.NewDefaultSourceTuple(data, map[string]interface{}{"topic": topic}):
  115. logger.Debugf("memory source broadcast from topic %s to %s done", topic, name)
  116. case <-ctx.Done():
  117. // rule stop so stop waiting
  118. default:
  119. logger.Errorf("memory source topic %s drop message to %s", topic, name)
  120. }
  121. }
  122. }
  123. func addPubConsumer(topic string, sourceId string, ch chan api.SourceTuple) {
  124. var sinkConsumerChannels *pubConsumers
  125. if c, exists := pubTopics[topic]; exists {
  126. sinkConsumerChannels = c
  127. } else {
  128. sinkConsumerChannels = &pubConsumers{
  129. consumers: make(map[string]chan api.SourceTuple),
  130. }
  131. pubTopics[topic] = sinkConsumerChannels
  132. }
  133. if _, exists := sinkConsumerChannels.consumers[sourceId]; exists {
  134. conf.Log.Warnf("create memory source consumer for %s which is already exists", sourceId)
  135. } else {
  136. sinkConsumerChannels.consumers[sourceId] = ch
  137. }
  138. }
  139. func removePubConsumer(topic string, sourceId string, c *pubConsumers) {
  140. if _, exists := c.consumers[sourceId]; exists {
  141. delete(c.consumers, sourceId)
  142. }
  143. if len(c.consumers) == 0 && c.count == 0 {
  144. delete(pubTopics, topic)
  145. }
  146. }