123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- // Copyright 2021-2022 EMQ Technologies Co., Ltd.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package node
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "github.com/lf-edge/ekuiper/internal/binder/io"
- "github.com/lf-edge/ekuiper/internal/conf"
- kctx "github.com/lf-edge/ekuiper/internal/topo/context"
- "github.com/lf-edge/ekuiper/internal/topo/state"
- "github.com/lf-edge/ekuiper/pkg/api"
- "github.com/lf-edge/ekuiper/pkg/infra"
- )
- //// Package vars and funcs
- var pool = &sourcePool{
- registry: make(map[string]*sourceSingleton),
- }
- // node is readonly
- func getSourceInstance(node *SourceNode, index int) (*sourceInstance, error) {
- var si *sourceInstance
- if node.options.SHARED {
- rkey := fmt.Sprintf("%s.%s", node.sourceType, node.name)
- s, ok := pool.load(rkey)
- if !ok {
- ns, err := io.Source(node.sourceType)
- if ns != nil {
- s, err = pool.addInstance(rkey, node, ns)
- if err != nil {
- return nil, err
- }
- } else {
- if err != nil {
- return nil, err
- } else {
- return nil, fmt.Errorf("source %s not found", node.sourceType)
- }
- }
- }
- // attach
- instanceKey := fmt.Sprintf("%s.%s.%d", rkey, node.ctx.GetRuleId(), index)
- err := s.attach(instanceKey, node.bufferLength)
- if err != nil {
- return nil, err
- }
- si = &sourceInstance{
- source: s.source,
- ctx: s.ctx,
- sourceInstanceChannels: s.outputs[instanceKey],
- }
- } else {
- ns, err := io.Source(node.sourceType)
- if ns != nil {
- si, err = start(nil, node, ns)
- if err != nil {
- return nil, err
- }
- go func() {
- err := infra.SafeRun(func() error {
- nctx := node.ctx.WithInstance(index)
- defer si.source.Close(nctx)
- si.source.Open(nctx, si.dataCh.In, si.errorCh)
- return nil
- })
- if err != nil {
- infra.DrainError(node.ctx, err, si.errorCh)
- }
- }()
- } else {
- if err != nil {
- return nil, err
- } else {
- return nil, fmt.Errorf("source %s not found", node.sourceType)
- }
- }
- }
- return si, nil
- }
- // removeSourceInstance remove an attach from the sourceSingleton
- // If all attaches are removed, close the sourceSingleton and remove it from the pool registry
- // ONLY apply to shared instance
- func removeSourceInstance(node *SourceNode) {
- for i := 0; i < node.concurrency; i++ {
- rkey := fmt.Sprintf("%s.%s", node.sourceType, node.name)
- pool.deleteInstance(rkey, node, i)
- }
- }
- //// data types
- /*
- * Pool for all keyed source instance.
- * Create an instance, and start the source go routine when the keyed was hit the first time.
- * For later hit, create the new set of channels and attach to the instance
- * When hit a delete (when close a rule), remove the attached channels. If all channels removed, remove the instance from the pool
- * For performance reason, the pool only holds the shared instance. Rule specific instance are holden by rule source node itself
- */
- type sourcePool struct {
- registry map[string]*sourceSingleton
- sync.RWMutex
- }
- func (p *sourcePool) load(k string) (*sourceSingleton, bool) {
- p.RLock()
- defer p.RUnlock()
- s, ok := p.registry[k]
- return s, ok
- }
- func (p *sourcePool) addInstance(k string, node *SourceNode, source api.Source) (*sourceSingleton, error) {
- p.Lock()
- defer p.Unlock()
- s, ok := p.registry[k]
- if !ok {
- contextLogger := conf.Log.WithField("source_pool", k)
- ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
- ruleId := "$$source_pool_" + k
- opId := "source_pool_" + k
- store, err := state.CreateStore("source_pool_"+k, 0)
- if err != nil {
- ctx.GetLogger().Errorf("source pool %s create store error %v", k, err)
- return nil, err
- }
- sctx, cancel := ctx.WithMeta(ruleId, opId, store).WithCancel()
- sctx = kctx.WithValue(sctx.(*kctx.DefaultContext), kctx.DecodeKey, node.ctx.Value(kctx.DecodeKey))
- si, err := start(sctx, node, source)
- if err != nil {
- return nil, err
- }
- newS := &sourceSingleton{
- sourceInstance: si,
- outputs: make(map[string]*sourceInstanceChannels),
- cancel: cancel,
- }
- p.registry[k] = newS
- go func() {
- err := infra.SafeRun(func() error {
- defer si.source.Close(sctx)
- si.source.Open(sctx, si.dataCh.In, si.errorCh)
- return nil
- })
- if err != nil {
- newS.broadcastError(err)
- }
- }()
- go func() {
- err := infra.SafeRun(func() error {
- newS.run(node.sourceType, node.name)
- return nil
- })
- if err != nil {
- newS.broadcastError(err)
- }
- }()
- s = newS
- }
- return s, nil
- }
- func (p *sourcePool) deleteInstance(k string, node *SourceNode, index int) {
- p.Lock()
- defer p.Unlock()
- s, ok := p.registry[k]
- if ok {
- instanceKey := fmt.Sprintf("%s.%s.%d", k, node.ctx.GetRuleId(), index)
- end := s.detach(instanceKey)
- if end {
- s.cancel()
- s.dataCh.Close()
- delete(p.registry, k)
- }
- }
- }
- type sourceInstance struct {
- source api.Source
- ctx api.StreamContext
- *sourceInstanceChannels
- }
- // Hold the only instance for all shared source
- // And hold the reference to all shared source input channels. Must be sync when dealing with outputs
- type sourceSingleton struct {
- *sourceInstance // immutable
- cancel context.CancelFunc // immutable
- outputs map[string]*sourceInstanceChannels // read-write lock
- sync.RWMutex
- }
- type sourceInstanceChannels struct {
- dataCh *DynamicChannelBuffer
- errorCh chan error
- }
- func newSourceInstanceChannels(bl int) *sourceInstanceChannels {
- buffer := NewDynamicChannelBuffer()
- buffer.SetLimit(bl)
- errorOutput := make(chan error)
- return &sourceInstanceChannels{
- dataCh: buffer,
- errorCh: errorOutput,
- }
- }
- func (ss *sourceSingleton) run(name, key string) {
- logger := ss.ctx.GetLogger()
- logger.Infof("Start source %s shared instance %s successfully", name, key)
- for {
- select {
- case <-ss.ctx.Done():
- logger.Infof("source %s shared instance %s done", name, key)
- return
- case err := <-ss.errorCh:
- ss.broadcastError(err)
- return
- case data := <-ss.dataCh.Out:
- logger.Debugf("broadcast data %v from source pool %s:%s", data, name, key)
- ss.broadcast(data)
- }
- }
- }
- func (ss *sourceSingleton) broadcast(val api.SourceTuple) {
- ss.RLock()
- defer ss.RUnlock()
- for name, out := range ss.outputs {
- select {
- case out.dataCh.In <- val:
- case <-ss.ctx.Done():
- case <-out.dataCh.done:
- // detached
- default:
- ss.ctx.GetLogger().Errorf("share source drop message to %s", name)
- }
- }
- }
- func (ss *sourceSingleton) broadcastError(err error) {
- logger := ss.ctx.GetLogger()
- var wg sync.WaitGroup
- ss.RLock()
- wg.Add(len(ss.outputs))
- for n, out := range ss.outputs {
- go func(name string, output chan<- error) {
- infra.DrainError(ss.ctx, err, output)
- wg.Done()
- }(n, out.errorCh)
- }
- ss.RUnlock()
- logger.Debugf("broadcasting from source pool")
- wg.Wait()
- }
- func (ss *sourceSingleton) attach(instanceKey string, bl int) error {
- retry := 10
- var err error
- // retry multiple times in case the detach is still in progress
- for i := 0; i < retry; i++ {
- err = func() error {
- ss.Lock()
- defer ss.Unlock()
- if _, ok := ss.outputs[instanceKey]; !ok {
- ss.outputs[instanceKey] = newSourceInstanceChannels(bl)
- } else {
- // should not happen
- return fmt.Errorf("fail to attach source instance, already has an output of the same key %s", instanceKey)
- }
- return nil
- }()
- if err == nil {
- return nil
- }
- time.Sleep(time.Millisecond * 100)
- }
- return err
- }
- // detach Detach an instance and return if the singleton is ended
- func (ss *sourceSingleton) detach(instanceKey string) bool {
- ss.Lock()
- defer ss.Unlock()
- if chs, ok := ss.outputs[instanceKey]; ok {
- chs.dataCh.Close()
- } else {
- // should not happen
- ss.ctx.GetLogger().Warnf("detach source instance %s, not found", instanceKey)
- return false
- }
- delete(ss.outputs, instanceKey)
- if len(ss.outputs) == 0 {
- ss.cancel()
- return true
- }
- return false
- }
- func start(poolCtx api.StreamContext, node *SourceNode, s api.Source) (*sourceInstance, error) {
- err := s.Configure(node.options.DATASOURCE, node.props)
- if err != nil {
- return nil, err
- }
- ctx := poolCtx
- if poolCtx == nil {
- ctx = node.ctx
- if rw, ok := s.(api.Rewindable); ok {
- if offset, err := ctx.GetState(OffsetKey); err != nil {
- return nil, err
- } else if offset != nil {
- ctx.GetLogger().Infof("Source rewind from %v", offset)
- err = rw.Rewind(offset)
- if err != nil {
- return nil, err
- }
- }
- }
- }
- chs := newSourceInstanceChannels(node.bufferLength)
- return &sourceInstance{
- source: s,
- sourceInstanceChannels: chs,
- ctx: ctx,
- }, nil
- }
|