Przeglądaj źródła

fix(source): restart may fail because the previous detach has not done

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 2 lat temu
rodzic
commit
897447f894
1 zmienionych plików z 21 dodań i 8 usunięć
  1. 21 8
      internal/topo/node/source_pool.go

+ 21 - 8
internal/topo/node/source_pool.go

@@ -24,6 +24,7 @@ import (
 	"github.com/lf-edge/ekuiper/pkg/api"
 	"github.com/lf-edge/ekuiper/pkg/infra"
 	"sync"
+	"time"
 )
 
 //// Package vars and funcs
@@ -273,15 +274,27 @@ func (ss *sourceSingleton) broadcastError(err error) {
 }
 
 func (ss *sourceSingleton) attach(instanceKey string, bl int) error {
-	ss.Lock()
-	defer ss.Unlock()
-	if _, ok := ss.outputs[instanceKey]; !ok {
-		ss.outputs[instanceKey] = newSourceInstanceChannels(bl)
-	} else {
-		// should not happen
-		return fmt.Errorf("fail to attach source instance, already has an output of the same key %s", instanceKey)
+	retry := 10
+	var err error
+	// retry multiple times in case the detach is still in progress
+	for i := 0; i < retry; i++ {
+		err = func() error {
+			ss.Lock()
+			defer ss.Unlock()
+			if _, ok := ss.outputs[instanceKey]; !ok {
+				ss.outputs[instanceKey] = newSourceInstanceChannels(bl)
+			} else {
+				// should not happen
+				return fmt.Errorf("fail to attach source instance, already has an output of the same key %s", instanceKey)
+			}
+			return nil
+		}()
+		if err == nil {
+			return nil
+		}
+		time.Sleep(time.Millisecond * 100)
 	}
-	return nil
+	return err
 }
 
 // detach Detach an instance and return if the singleton is ended