source_pool.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // Copyright 2021-2022 EMQ Technologies Co., Ltd.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package node
  15. import (
  16. "context"
  17. "fmt"
  18. "github.com/lf-edge/ekuiper/internal/binder/io"
  19. "github.com/lf-edge/ekuiper/internal/conf"
  20. kctx "github.com/lf-edge/ekuiper/internal/topo/context"
  21. "github.com/lf-edge/ekuiper/internal/topo/state"
  22. "github.com/lf-edge/ekuiper/pkg/api"
  23. "sync"
  24. )
  25. //// Package vars and funcs
  26. var (
  27. pool = &sourcePool{
  28. registry: make(map[string]*sourceSingleton),
  29. }
  30. )
  31. // node is readonly
  32. func getSourceInstance(node *SourceNode, index int) (*sourceInstance, error) {
  33. var si *sourceInstance
  34. if node.options.SHARED {
  35. rkey := fmt.Sprintf("%s.%s", node.sourceType, node.name)
  36. s, ok := pool.load(rkey)
  37. if !ok {
  38. ns, err := io.Source(node.sourceType)
  39. if ns != nil {
  40. s, err = pool.addInstance(rkey, node, ns, index)
  41. if err != nil {
  42. return nil, err
  43. }
  44. } else {
  45. if err != nil {
  46. return nil, err
  47. } else {
  48. return nil, fmt.Errorf("source %s not found", node.sourceType)
  49. }
  50. }
  51. }
  52. // attach
  53. instanceKey := fmt.Sprintf("%s.%s.%d", rkey, node.ctx.GetRuleId(), index)
  54. err := s.attach(instanceKey, node.bufferLength)
  55. if err != nil {
  56. return nil, err
  57. }
  58. si = &sourceInstance{
  59. source: s.source,
  60. ctx: s.ctx,
  61. sourceInstanceChannels: s.outputs[instanceKey],
  62. }
  63. } else {
  64. ns, err := io.Source(node.sourceType)
  65. if ns != nil {
  66. si, err = start(nil, node, ns, index)
  67. if err != nil {
  68. return nil, err
  69. }
  70. } else {
  71. if err != nil {
  72. return nil, err
  73. } else {
  74. return nil, fmt.Errorf("source %s not found", node.sourceType)
  75. }
  76. }
  77. }
  78. return si, nil
  79. }
  80. // removeSourceInstance remove an attach from the sourceSingleton
  81. // If all attaches are removed, close the sourceSingleton and remove it from the pool registry
  82. // ONLY apply to shared instance
  83. func removeSourceInstance(node *SourceNode) {
  84. for i := 0; i < node.concurrency; i++ {
  85. rkey := fmt.Sprintf("%s.%s", node.sourceType, node.name)
  86. pool.deleteInstance(rkey, node, i)
  87. }
  88. }
  89. //// data types
  90. /*
  91. * Pool for all keyed source instance.
  92. * Create an instance, and start the source go routine when the keyed was hit the first time.
  93. * For later hit, create the new set of channels and attach to the instance
  94. * When hit a delete (when close a rule), remove the attached channels. If all channels removed, remove the instance from the pool
  95. * For performance reason, the pool only holds the shared instance. Rule specific instance are holden by rule source node itself
  96. */
  97. type sourcePool struct {
  98. registry map[string]*sourceSingleton
  99. sync.RWMutex
  100. }
  101. func (p *sourcePool) load(k string) (*sourceSingleton, bool) {
  102. p.RLock()
  103. defer p.RUnlock()
  104. s, ok := p.registry[k]
  105. return s, ok
  106. }
  107. func (p *sourcePool) addInstance(k string, node *SourceNode, source api.Source, index int) (*sourceSingleton, error) {
  108. p.Lock()
  109. defer p.Unlock()
  110. s, ok := p.registry[k]
  111. if !ok {
  112. contextLogger := conf.Log.WithField("source_pool", k)
  113. ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
  114. ruleId := "$$source_pool_" + k
  115. opId := "source_pool_" + k
  116. store, err := state.CreateStore("source_pool_"+k, 0)
  117. if err != nil {
  118. ctx.GetLogger().Errorf("source pool %s create store error %v", k, err)
  119. return nil, err
  120. }
  121. sctx, cancel := ctx.WithMeta(ruleId, opId, store).WithCancel()
  122. si, err := start(sctx, node, source, index)
  123. if err != nil {
  124. return nil, err
  125. }
  126. newS := &sourceSingleton{
  127. sourceInstance: si,
  128. outputs: make(map[string]*sourceInstanceChannels),
  129. cancel: cancel,
  130. }
  131. p.registry[k] = newS
  132. go newS.run(node.sourceType, node.name)
  133. s = newS
  134. }
  135. return s, nil
  136. }
  137. func (p *sourcePool) deleteInstance(k string, node *SourceNode, index int) {
  138. p.Lock()
  139. defer p.Unlock()
  140. s, ok := p.registry[k]
  141. if ok {
  142. instanceKey := fmt.Sprintf("%s.%s.%d", k, node.ctx.GetRuleId(), index)
  143. end := s.detach(instanceKey)
  144. if end {
  145. s.cancel()
  146. s.dataCh.Close()
  147. delete(p.registry, k)
  148. }
  149. }
  150. }
  151. type sourceInstance struct {
  152. source api.Source
  153. ctx api.StreamContext
  154. *sourceInstanceChannels
  155. }
  156. // Hold the only instance for all shared source
  157. // And hold the reference to all shared source input channels. Must be sync when dealing with outputs
  158. type sourceSingleton struct {
  159. *sourceInstance // immutable
  160. cancel context.CancelFunc // immutable
  161. outputs map[string]*sourceInstanceChannels // read-write lock
  162. sync.RWMutex
  163. }
  164. type sourceInstanceChannels struct {
  165. dataCh *DynamicChannelBuffer
  166. errorCh chan error
  167. }
  168. func newSourceInstanceChannels(bl int) *sourceInstanceChannels {
  169. buffer := NewDynamicChannelBuffer()
  170. buffer.SetLimit(bl)
  171. errorOutput := make(chan error)
  172. return &sourceInstanceChannels{
  173. dataCh: buffer,
  174. errorCh: errorOutput,
  175. }
  176. }
  177. func (ss *sourceSingleton) run(name, key string) {
  178. logger := ss.ctx.GetLogger()
  179. logger.Infof("Start source %s shared instance %s successfully", name, key)
  180. for {
  181. select {
  182. case <-ss.ctx.Done():
  183. logger.Infof("source %s shared instance %s done", name, key)
  184. return
  185. case err := <-ss.errorCh:
  186. ss.broadcastError(err)
  187. return
  188. case data := <-ss.dataCh.Out:
  189. logger.Debugf("broadcast data %v from source pool %s:%s", data, name, key)
  190. ss.broadcast(data)
  191. }
  192. }
  193. }
  194. func (ss *sourceSingleton) broadcast(val api.SourceTuple) {
  195. ss.RLock()
  196. for n, out := range ss.outputs {
  197. go func(name string, dataCh *DynamicChannelBuffer) {
  198. select {
  199. case dataCh.In <- val:
  200. case <-ss.ctx.Done():
  201. case <-dataCh.done:
  202. // detached
  203. }
  204. }(n, out.dataCh)
  205. }
  206. ss.RUnlock()
  207. }
  208. func (ss *sourceSingleton) broadcastError(err error) {
  209. logger := ss.ctx.GetLogger()
  210. var wg sync.WaitGroup
  211. ss.RLock()
  212. wg.Add(len(ss.outputs))
  213. for n, out := range ss.outputs {
  214. go func(name string, output chan<- error) {
  215. select {
  216. case output <- err:
  217. logger.Debugf("broadcast error from source pool to %s done", name)
  218. case <-ss.ctx.Done():
  219. // rule stop so stop waiting
  220. }
  221. wg.Done()
  222. }(n, out.errorCh)
  223. }
  224. ss.RUnlock()
  225. logger.Debugf("broadcasting from source pool")
  226. wg.Wait()
  227. }
  228. func (ss *sourceSingleton) attach(instanceKey string, bl int) error {
  229. ss.Lock()
  230. defer ss.Unlock()
  231. if _, ok := ss.outputs[instanceKey]; !ok {
  232. ss.outputs[instanceKey] = newSourceInstanceChannels(bl)
  233. } else {
  234. // should not happen
  235. return fmt.Errorf("fail to attach source instance, already has an output of the same key %s", instanceKey)
  236. }
  237. return nil
  238. }
  239. // detach Detach an instance and return if the singleton is ended
  240. func (ss *sourceSingleton) detach(instanceKey string) bool {
  241. ss.Lock()
  242. defer ss.Unlock()
  243. if chs, ok := ss.outputs[instanceKey]; ok {
  244. chs.dataCh.Close()
  245. } else {
  246. // should not happen
  247. ss.ctx.GetLogger().Warnf("detach source instance %s, not found", instanceKey)
  248. return false
  249. }
  250. delete(ss.outputs, instanceKey)
  251. if len(ss.outputs) == 0 {
  252. ss.cancel()
  253. return true
  254. }
  255. return false
  256. }
  257. func start(poolCtx api.StreamContext, node *SourceNode, s api.Source, instanceIndex int) (*sourceInstance, error) {
  258. err := s.Configure(node.options.DATASOURCE, node.props)
  259. if err != nil {
  260. return nil, err
  261. }
  262. ctx := poolCtx
  263. if poolCtx == nil {
  264. ctx = node.ctx
  265. if rw, ok := s.(api.Rewindable); ok {
  266. if offset, err := ctx.GetState(OffsetKey); err != nil {
  267. return nil, err
  268. } else if offset != nil {
  269. ctx.GetLogger().Infof("Source rewind from %v", offset)
  270. err = rw.Rewind(offset)
  271. if err != nil {
  272. return nil, err
  273. }
  274. }
  275. }
  276. }
  277. chs := newSourceInstanceChannels(node.bufferLength)
  278. go func() {
  279. nctx := ctx.WithInstance(instanceIndex)
  280. defer s.Close(nctx)
  281. s.Open(nctx, chs.dataCh.In, chs.errorCh)
  282. }()
  283. return &sourceInstance{
  284. source: s,
  285. sourceInstanceChannels: chs,
  286. ctx: ctx,
  287. }, nil
  288. }