source_pool.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package node
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/lf-edge/ekuiper/internal/conf"
  6. kctx "github.com/lf-edge/ekuiper/internal/topo/context"
  7. "github.com/lf-edge/ekuiper/pkg/api"
  8. "sync"
  9. )
  10. //// Package vars and funcs
  11. var (
  12. pool = &sourcePool{
  13. registry: make(map[string]*sourceSingleton),
  14. }
  15. )
  16. // node is readonly
  17. func getSourceInstance(node *SourceNode, index int) (*sourceInstance, error) {
  18. var si *sourceInstance
  19. if node.options.SHARED {
  20. rkey := fmt.Sprintf("%s.%s", node.sourceType, node.name)
  21. s, ok := pool.load(rkey)
  22. if !ok {
  23. ns, err := getSource(node.sourceType)
  24. if err != nil {
  25. return nil, err
  26. }
  27. s, err = pool.addInstance(rkey, node, ns, index)
  28. if err != nil {
  29. return nil, err
  30. }
  31. }
  32. // attach
  33. instanceKey := fmt.Sprintf("%s.%s.%d", rkey, node.ctx.GetRuleId(), index)
  34. err := s.attach(instanceKey, node.bufferLength)
  35. if err != nil {
  36. return nil, err
  37. }
  38. si = &sourceInstance{
  39. source: s.source,
  40. ctx: s.ctx,
  41. sourceInstanceChannels: s.outputs[instanceKey],
  42. }
  43. } else {
  44. ns, err := getSource(node.sourceType)
  45. if err != nil {
  46. return nil, err
  47. }
  48. si, err = start(nil, node, ns, index)
  49. if err != nil {
  50. return nil, err
  51. }
  52. }
  53. return si, nil
  54. }
  55. // removeSourceInstance remove an attach from the sourceSingleton
  56. // If all attaches are removed, close the sourceSingleton and remove it from the pool registry
  57. // ONLY apply to shared instance
  58. func removeSourceInstance(node *SourceNode) {
  59. for i := 0; i < node.concurrency; i++ {
  60. rkey := fmt.Sprintf("%s.%s", node.sourceType, node.name)
  61. pool.deleteInstance(rkey, node, i)
  62. }
  63. }
  64. //// data types
  65. /*
  66. * Pool for all keyed source instance.
  67. * Create an instance, and start the source go routine when the keyed was hit the first time.
  68. * For later hit, create the new set of channels and attach to the instance
  69. * When hit a delete (when close a rule), remove the attached channels. If all channels removed, remove the instance from the pool
  70. * For performance reason, the pool only holds the shared instance. Rule specific instance are holden by rule source node itself
  71. */
  72. type sourcePool struct {
  73. registry map[string]*sourceSingleton
  74. sync.RWMutex
  75. }
  76. func (p *sourcePool) load(k string) (*sourceSingleton, bool) {
  77. p.RLock()
  78. defer p.RUnlock()
  79. s, ok := p.registry[k]
  80. return s, ok
  81. }
  82. func (p *sourcePool) addInstance(k string, node *SourceNode, source api.Source, index int) (*sourceSingleton, error) {
  83. p.Lock()
  84. defer p.Unlock()
  85. s, ok := p.registry[k]
  86. if !ok {
  87. contextLogger := conf.Log.WithField("source_pool", k)
  88. ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
  89. // TODO cancel
  90. sctx, cancel := ctx.WithCancel()
  91. si, err := start(sctx, node, source, index)
  92. if err != nil {
  93. return nil, err
  94. }
  95. newS := &sourceSingleton{
  96. sourceInstance: si,
  97. outputs: make(map[string]*sourceInstanceChannels),
  98. cancel: cancel,
  99. }
  100. p.registry[k] = newS
  101. go newS.run(node.sourceType, node.name)
  102. s = newS
  103. }
  104. return s, nil
  105. }
  106. func (p *sourcePool) deleteInstance(k string, node *SourceNode, index int) {
  107. p.Lock()
  108. defer p.Unlock()
  109. s, ok := p.registry[k]
  110. if ok {
  111. instanceKey := fmt.Sprintf("%s.%s.%d", k, node.ctx.GetRuleId(), index)
  112. end := s.detach(instanceKey)
  113. if end {
  114. s.cancel()
  115. s.source.Close(s.ctx)
  116. delete(p.registry, k)
  117. }
  118. }
  119. }
  120. type sourceInstance struct {
  121. source api.Source
  122. ctx api.StreamContext
  123. *sourceInstanceChannels
  124. }
  125. // Hold the only instance for all shared source
  126. // And hold the reference to all shared source input channels. Must be sync when dealing with outputs
  127. type sourceSingleton struct {
  128. *sourceInstance // immutable
  129. cancel context.CancelFunc // immutable
  130. outputs map[string]*sourceInstanceChannels // read-write lock
  131. sync.RWMutex
  132. }
  133. type sourceInstanceChannels struct {
  134. dataCh *DynamicChannelBuffer
  135. errorCh chan error
  136. }
  137. func newSourceInstanceChannels(bl int) *sourceInstanceChannels {
  138. buffer := NewDynamicChannelBuffer()
  139. buffer.SetLimit(bl)
  140. errorOutput := make(chan error)
  141. return &sourceInstanceChannels{
  142. dataCh: buffer,
  143. errorCh: errorOutput,
  144. }
  145. }
  146. func (ss *sourceSingleton) run(name, key string) {
  147. logger := ss.ctx.GetLogger()
  148. logger.Infof("Start source %s shared instance %s successfully", name, key)
  149. for {
  150. select {
  151. case <-ss.ctx.Done():
  152. logger.Infof("source %s shared instance %s done", name, key)
  153. return
  154. case err := <-ss.errorCh:
  155. ss.broadcastError(err)
  156. return
  157. case data := <-ss.dataCh.Out:
  158. logger.Debugf("broadcast data %v from source pool %s:%s", data, name, key)
  159. ss.broadcast(data)
  160. }
  161. }
  162. }
  163. func (ss *sourceSingleton) broadcast(val api.SourceTuple) {
  164. logger := ss.ctx.GetLogger()
  165. var wg sync.WaitGroup
  166. ss.RLock()
  167. wg.Add(len(ss.outputs))
  168. for n, out := range ss.outputs {
  169. go func(name string, output chan<- api.SourceTuple) {
  170. select {
  171. case output <- val:
  172. logger.Debugf("broadcast from source pool to %s done", name)
  173. case <-ss.ctx.Done():
  174. // rule stop so stop waiting
  175. }
  176. wg.Done()
  177. }(n, out.dataCh.Out)
  178. }
  179. ss.RUnlock()
  180. wg.Wait()
  181. }
  182. func (ss *sourceSingleton) broadcastError(err error) {
  183. logger := ss.ctx.GetLogger()
  184. var wg sync.WaitGroup
  185. ss.RLock()
  186. wg.Add(len(ss.outputs))
  187. for n, out := range ss.outputs {
  188. go func(name string, output chan<- error) {
  189. select {
  190. case output <- err:
  191. logger.Debugf("broadcast error from source pool to %s done", name)
  192. case <-ss.ctx.Done():
  193. // rule stop so stop waiting
  194. }
  195. wg.Done()
  196. }(n, out.errorCh)
  197. }
  198. ss.RLock()
  199. logger.Debugf("broadcasting from source pool")
  200. wg.Wait()
  201. }
  202. func (ss *sourceSingleton) attach(instanceKey string, bl int) error {
  203. ss.Lock()
  204. defer ss.Unlock()
  205. if _, ok := ss.outputs[instanceKey]; !ok {
  206. ss.outputs[instanceKey] = newSourceInstanceChannels(bl)
  207. } else {
  208. // should not happen
  209. return fmt.Errorf("fail to attach source instance, already has an output of the same key %s", instanceKey)
  210. }
  211. return nil
  212. }
  213. // detach Detach an instance and return if the singleton is ended
  214. func (ss *sourceSingleton) detach(instanceKey string) bool {
  215. ss.Lock()
  216. defer ss.Unlock()
  217. if chs, ok := ss.outputs[instanceKey]; ok {
  218. chs.dataCh.Close()
  219. } else {
  220. // should not happen
  221. ss.ctx.GetLogger().Warnf("detach source instance %s, not found", instanceKey)
  222. }
  223. delete(ss.outputs, instanceKey)
  224. if len(ss.outputs) == 0 {
  225. ss.cancel()
  226. return true
  227. }
  228. return false
  229. }
  230. func start(poolCtx api.StreamContext, node *SourceNode, s api.Source, instanceIndex int) (*sourceInstance, error) {
  231. err := s.Configure(node.options.DATASOURCE, node.props)
  232. if err != nil {
  233. return nil, err
  234. }
  235. ctx := poolCtx
  236. if poolCtx == nil {
  237. ctx = node.ctx
  238. if rw, ok := s.(api.Rewindable); ok {
  239. if offset, err := ctx.GetState(OffsetKey); err != nil {
  240. return nil, err
  241. } else if offset != nil {
  242. ctx.GetLogger().Infof("Source rewind from %v", offset)
  243. err = rw.Rewind(offset)
  244. if err != nil {
  245. return nil, err
  246. }
  247. }
  248. }
  249. }
  250. chs := newSourceInstanceChannels(node.bufferLength)
  251. go s.Open(ctx.WithInstance(instanceIndex), chs.dataCh.In, chs.errorCh)
  252. return &sourceInstance{
  253. source: s,
  254. sourceInstanceChannels: chs,
  255. ctx: ctx,
  256. }, nil
  257. }