Quellcode durchsuchen

feat(table): align table source continuously

ngjaying vor 4 Jahren
Ursprung
Commit
32b6de4f24
1 geänderte Dateien mit 42 neuen und 33 gelöschten Zeilen
  1. 42 33
      xstream/nodes/join_align_node.go

+ 42 - 33
xstream/nodes/join_align_node.go

@@ -1,7 +1,6 @@
 package nodes
 package nodes
 
 
 import (
 import (
-	"errors"
 	"fmt"
 	"fmt"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/api"
@@ -14,17 +13,20 @@ import (
 type JoinAlignNode struct {
 type JoinAlignNode struct {
 	*defaultSinkNode
 	*defaultSinkNode
 	statManager StatManager
 	statManager StatManager
-	emitters    []string
+	emitters    map[string]int
 	// states
 	// states
 	batch xsql.WindowTuplesSet
 	batch xsql.WindowTuplesSet
 }
 }
 
 
-const StreamInputsKey = "$$streamInputs"
+const BatchKey = "$$batchInputs"
 
 
 func NewJoinAlignNode(name string, emitters []string, options *api.RuleOption) (*JoinAlignNode, error) {
 func NewJoinAlignNode(name string, emitters []string, options *api.RuleOption) (*JoinAlignNode, error) {
+	emap := make(map[string]int, len(emitters))
+	for i, e := range emitters {
+		emap[e] = i
+	}
 	n := &JoinAlignNode{
 	n := &JoinAlignNode{
-		emitters: emitters,
-		batch:    make([]xsql.WindowTuples, len(emitters)),
+		emitters: emap,
 	}
 	}
 	n.defaultSinkNode = &defaultSinkNode{
 	n.defaultSinkNode = &defaultSinkNode{
 		input: make(chan interface{}, options.BufferLength),
 		input: make(chan interface{}, options.BufferLength),
@@ -52,9 +54,29 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		return
 		return
 	}
 	}
 	n.statManager = stats
 	n.statManager = stats
-	var inputs []xsql.WindowTuplesSet
-	batchLen := len(n.emitters)
 	go func() {
 	go func() {
+		// restore batch state
+		if s, err := ctx.GetState(BatchKey); err == nil {
+			switch st := s.(type) {
+			case []xsql.WindowTuples:
+				if len(st) == len(n.emitters) {
+					n.batch = st
+					log.Infof("Restore batch state %+v", st)
+				} else {
+					log.Warnf("Restore batch state got different emitter length so discarded: %+v", st)
+				}
+			case nil:
+				log.Debugf("Restore batch state, nothing")
+			default:
+				errCh <- fmt.Errorf("restore batch state %v error, invalid type", st)
+			}
+		} else {
+			log.Warnf("Restore batch state fails: %s", err)
+		}
+		if n.batch == nil {
+			n.batch = make([]xsql.WindowTuples, len(n.emitters))
+		}
+
 		for {
 		for {
 			log.Debugf("JoinAlignNode %s is looping", n.name)
 			log.Debugf("JoinAlignNode %s is looping", n.name)
 			select {
 			select {
@@ -78,35 +100,23 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 					log.Debugf("JoinAlignNode receive tuple input %s", d)
 					log.Debugf("JoinAlignNode receive tuple input %s", d)
 					var temp xsql.WindowTuplesSet = make([]xsql.WindowTuples, 0)
 					var temp xsql.WindowTuplesSet = make([]xsql.WindowTuples, 0)
 					temp = temp.AddTuple(d)
 					temp = temp.AddTuple(d)
-					if batchLen == 0 {
-						n.alignBatch(ctx, temp)
-					} else {
-						log.Debugf("JoinAlignNode buffer input")
-						inputs = append(inputs, temp)
-						ctx.PutState(StreamInputsKey, inputs)
-						n.statManager.SetBufferLength(int64(len(n.input)))
-					}
+					n.alignBatch(ctx, temp)
 				case xsql.WindowTuplesSet:
 				case xsql.WindowTuplesSet:
 					log.Debugf("JoinAlignNode receive window input %s", d)
 					log.Debugf("JoinAlignNode receive window input %s", d)
-					if batchLen == 0 {
-						n.alignBatch(ctx, d)
-					} else {
-						log.Debugf("JoinAlignNode buffer input")
-						inputs = append(inputs, d)
-						ctx.PutState(StreamInputsKey, inputs)
-						n.statManager.SetBufferLength(int64(len(n.input)))
-					}
+					n.alignBatch(ctx, d)
 				case xsql.WindowTuples: // batch input
 				case xsql.WindowTuples: // batch input
 					log.Debugf("JoinAlignNode receive batch source %s", d)
 					log.Debugf("JoinAlignNode receive batch source %s", d)
-					if batchLen <= 0 {
-						errCh <- errors.New("Join receives too many table content")
+					// Buffer and update batch inputs
+					index, ok := n.emitters[d.Emitter]
+					if !ok {
+						n.Broadcast(fmt.Errorf("run JoinAlignNode error: receive batch input from unknown emitter %[1]T(%[1]v)", d))
+						n.statManager.IncTotalExceptions()
 					}
 					}
-					n.batch[len(n.emitters)-batchLen] = d
-					batchLen -= 1
-					if batchLen == 0 {
-						for _, w := range inputs {
-							n.alignBatch(ctx, w)
-						}
+					if len(n.batch) > index {
+						n.batch[index] = d
+						ctx.PutState(BatchKey, n.batch)
+					} else {
+						log.Errorf("Invalid index %d for batch %v", index, n.batch)
 					}
 					}
 				default:
 				default:
 					n.Broadcast(fmt.Errorf("run JoinAlignNode error: invalid input type but got %[1]T(%[1]v)", d))
 					n.Broadcast(fmt.Errorf("run JoinAlignNode error: invalid input type but got %[1]T(%[1]v)", d))
@@ -120,14 +130,13 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 	}()
 	}()
 }
 }
 
 
-func (n *JoinAlignNode) alignBatch(ctx api.StreamContext, w xsql.WindowTuplesSet) {
+func (n *JoinAlignNode) alignBatch(_ api.StreamContext, w xsql.WindowTuplesSet) {
 	n.statManager.ProcessTimeStart()
 	n.statManager.ProcessTimeStart()
 	w = append(w, n.batch...)
 	w = append(w, n.batch...)
 	n.Broadcast(w)
 	n.Broadcast(w)
 	n.statManager.ProcessTimeEnd()
 	n.statManager.ProcessTimeEnd()
 	n.statManager.IncTotalRecordsOut()
 	n.statManager.IncTotalRecordsOut()
 	n.statManager.SetBufferLength(int64(len(n.input)))
 	n.statManager.SetBufferLength(int64(len(n.input)))
-	ctx.PutState(StreamInputsKey, nil)
 }
 }
 
 
 func (n *JoinAlignNode) GetMetrics() [][]interface{} {
 func (n *JoinAlignNode) GetMetrics() [][]interface{} {