Przeglądaj źródła

fix(sql): table must re-initiated after emission

ngjaying 4 lat temu
rodzic
commit
a7b9607b15

+ 18 - 10
xstream/operators/table_processor.go

@@ -12,18 +12,14 @@ type TableProcessor struct {
 
 
 	isBatchInput bool // whether the inputs are batched, such as file which sends multiple messages at a batch. If batch input, only fires when EOF is received. This is mutual exclusive with retainSize.
 	isBatchInput bool // whether the inputs are batched, such as file which sends multiple messages at a batch. If batch input, only fires when EOF is received. This is mutual exclusive with retainSize.
 	retainSize   int  // how many(maximum) messages to be retained for each output
 	retainSize   int  // how many(maximum) messages to be retained for each output
+	emitterName  string
 	// States
 	// States
 	output       xsql.WindowTuples // current batched message collection
 	output       xsql.WindowTuples // current batched message collection
 	batchEmitted bool              // if batch input, this is the signal for whether the last batch has emitted. If true, reinitialize.
 	batchEmitted bool              // if batch input, this is the signal for whether the last batch has emitted. If true, reinitialize.
 }
 }
 
 
 func NewTableProcessor(name string, fields []interface{}, fs xsql.Fields, options *xsql.Options) (*TableProcessor, error) {
 func NewTableProcessor(name string, fields []interface{}, fs xsql.Fields, options *xsql.Options) (*TableProcessor, error) {
-	p := &TableProcessor{
-		output: xsql.WindowTuples{
-			Emitter: name,
-			Tuples:  make([]xsql.Tuple, 0),
-		},
-	}
+	p := &TableProcessor{emitterName: name, batchEmitted: true, retainSize: 1}
 	p.defaultFieldProcessor = defaultFieldProcessor{
 	p.defaultFieldProcessor = defaultFieldProcessor{
 		streamFields: fields, aliasFields: fs, isBinary: false, timestampFormat: options.TIMESTAMP_FORMAT,
 		streamFields: fields, aliasFields: fs, isBinary: false, timestampFormat: options.TIMESTAMP_FORMAT,
 	}
 	}
@@ -32,6 +28,7 @@ func NewTableProcessor(name string, fields []interface{}, fs xsql.Fields, option
 		p.isBatchInput = false
 		p.isBatchInput = false
 	} else if isBatch(options.TYPE) {
 	} else if isBatch(options.TYPE) {
 		p.isBatchInput = true
 		p.isBatchInput = true
+		p.retainSize = 0
 	}
 	}
 	return p, nil
 	return p, nil
 }
 }
@@ -48,7 +45,10 @@ func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql
 	}
 	}
 	logger.Debugf("preprocessor receive %v", tuple)
 	logger.Debugf("preprocessor receive %v", tuple)
 	if p.batchEmitted {
 	if p.batchEmitted {
-		p.output.Tuples = make([]xsql.Tuple, 0)
+		p.output = xsql.WindowTuples{
+			Emitter: p.emitterName,
+			Tuples:  make([]xsql.Tuple, 0),
+		}
 		p.batchEmitted = false
 		p.batchEmitted = false
 	}
 	}
 	if tuple.Message != nil {
 	if tuple.Message != nil {
@@ -57,10 +57,18 @@ func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql
 			return fmt.Errorf("error in table processor: %s", err)
 			return fmt.Errorf("error in table processor: %s", err)
 		}
 		}
 		tuple.Message = result
 		tuple.Message = result
-		if p.retainSize > 0 && len(p.output.Tuples) == p.retainSize {
-			p.output.Tuples = p.output.Tuples[1:]
+		var newTuples []xsql.Tuple
+		for i, ot := range p.output.Tuples {
+			if p.retainSize > 0 && len(p.output.Tuples) == p.retainSize && i == 0 {
+				continue
+			}
+			newTuples = append(newTuples, ot)
+		}
+		newTuples = append(newTuples, *tuple)
+		p.output = xsql.WindowTuples{
+			Emitter: p.emitterName,
+			Tuples:  newTuples,
 		}
 		}
-		p.output.Tuples = append(p.output.Tuples, *tuple)
 		if !p.isBatchInput {
 		if !p.isBatchInput {
 			return p.output
 			return p.output
 		}
 		}

+ 1 - 1
xstream/operators/table_processor_test.go

@@ -105,7 +105,7 @@ func TestTableProcessor_Apply(t *testing.T) {
 	contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
 	contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 	for i, tt := range tests {
-		pp := &TableProcessor{isBatchInput: true}
+		pp := &TableProcessor{isBatchInput: true, emitterName: "demo"}
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
 		pp.output = xsql.WindowTuples{
 		pp.output = xsql.WindowTuples{
 			Emitter: "demo",
 			Emitter: "demo",