Переглянути джерело

refactor(table): align table with source implementation

Share source plugin implementation for both
ngjaying 4 роки тому
батько
коміт
04e4eab9e5

+ 2 - 0
etc/sources/file.yaml

@@ -3,6 +3,8 @@ default:
   # The directory of the file relative to kuiper root or an absolute path.
   # Do not include the file name here. The file name should be defined in the stream data source
   path: data
+  # The interval between reading the files, time unit is ms. If only read once, set it to 0
+  interval: 0
 
 test:
   path: fvt_scripts

+ 57 - 8
xstream/extensions/file_source.go

@@ -8,6 +8,7 @@ import (
 	"os"
 	"path"
 	"path/filepath"
+	"time"
 )
 
 type FileType string
@@ -22,7 +23,8 @@ var fileTypes = map[FileType]bool{
 
 type FileSourceConfig struct {
 	FileType FileType `json:"fileType"`
-	Path     string   `json:"Path"`
+	Path     string   `json:"path"`
+	Interval int      `json:"interval"`
 }
 
 // The BATCH to load data from file at once
@@ -31,6 +33,12 @@ type FileSource struct {
 	config *FileSourceConfig
 }
 
+func (fs *FileSource) Close(ctx api.StreamContext) error {
+	ctx.GetLogger().Infof("Close file source")
+	// do nothing
+	return nil
+}
+
 func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error {
 	cfg := &FileSourceConfig{}
 	err := common.MapToStruct(props, cfg)
@@ -68,18 +76,59 @@ func (fs *FileSource) Configure(fileName string, props map[string]interface{}) e
 	return nil
 }
 
-func (fs *FileSource) Load(ctx api.StreamContext) ([]api.SourceTuple, error) {
+func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
+	err := fs.Load(ctx, consumer)
+	if err != nil {
+		errCh <- err
+		return
+	}
+	if fs.config.Interval > 0 {
+		ticker := time.NewTicker(time.Millisecond * time.Duration(fs.config.Interval))
+		logger := ctx.GetLogger()
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ticker.C:
+				logger.Debugf("Load file source again at %v", common.GetNowInMilli())
+				err := fs.Load(ctx, consumer)
+				if err != nil {
+					errCh <- err
+					return
+				}
+			case <-ctx.Done():
+				return
+			}
+		}
+	}
+}
+
+func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error {
 	switch fs.config.FileType {
 	case JSON_TYPE:
 		ctx.GetLogger().Debugf("Start to load from file %s", fs.file)
 		resultMap := make([]map[string]interface{}, 0)
 		err := common.ReadJsonUnmarshal(fs.file, &resultMap)
-		result := make([]api.SourceTuple, len(resultMap))
-		for i, m := range resultMap {
-			result[i] = api.NewDefaultSourceTuple(m, nil)
+		if err != nil {
+			return fmt.Errorf("loaded %s, check error %s", fs.file, err)
+		}
+		ctx.GetLogger().Debug("Sending tuples")
+		for _, m := range resultMap {
+			select {
+			case consumer <- api.NewDefaultSourceTuple(m, nil):
+				// do nothing
+			case <-ctx.Done():
+				return nil
+			}
+		}
+		// Send EOF
+		select {
+		case consumer <- api.NewDefaultSourceTuple(nil, nil):
+			// do nothing
+		case <-ctx.Done():
+			return nil
 		}
-		ctx.GetLogger().Debugf("loaded %s, check error %s", fs.file, err)
-		return result, err
+		ctx.GetLogger().Debug("All tuples sent")
+		return nil
 	}
-	return nil, fmt.Errorf("invalid file type %s", fs.config.FileType)
+	return fmt.Errorf("invalid file type %s", fs.config.FileType)
 }

+ 2 - 2
xstream/nodes/join_align_node.go

@@ -58,7 +58,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 		for {
 			log.Debugf("JoinAlignNode %s is looping", n.name)
 			select {
-			// process incoming item
+			// process incoming item from both streams(transformed) and tables
 			case item, opened := <-n.input:
 				processed := false
 				if item, processed = n.preprocess(item); processed {
@@ -96,7 +96,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
 						ctx.PutState(StreamInputsKey, inputs)
 						n.statManager.SetBufferLength(int64(len(n.input)))
 					}
-				case xsql.WindowTuples:
+				case xsql.WindowTuples: // batch input
 					log.Debugf("JoinAlignNode receive batch source %s", d)
 					if batchLen <= 0 {
 						errCh <- errors.New("Join receives too many table content")

+ 9 - 2
xstream/nodes/source_node.go

@@ -11,6 +11,7 @@ import (
 
 type SourceNode struct {
 	*defaultNode
+	streamType xsql.StreamType
 	sourceType string
 	options    map[string]string
 	isMock     bool
@@ -19,10 +20,14 @@ type SourceNode struct {
 	sources []api.Source
 }
 
-func NewSourceNode(name string, options map[string]string) *SourceNode {
+func NewSourceNode(name string, st xsql.StreamType, options map[string]string) *SourceNode {
 	t, ok := options["TYPE"]
 	if !ok {
-		t = "mqtt"
+		if st == xsql.TypeStream {
+			t = "mqtt"
+		} else if st == xsql.TypeTable {
+			t = "file"
+		}
 	}
 	return &SourceNode{
 		sourceType: t,
@@ -179,6 +184,8 @@ func doGetSource(t string) (api.Source, error) {
 		s = &extensions.MQTTSource{}
 	case "httppull":
 		s = &extensions.HTTPPullSource{}
+	case "file":
+		s = &extensions.FileSource{}
 	default:
 		s, err = plugins.GetSource(t)
 		if err != nil {

+ 3 - 2
xstream/nodes/source_node_test.go

@@ -2,6 +2,7 @@ package nodes
 
 import (
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/contexts"
 	"reflect"
 	"testing"
@@ -26,7 +27,7 @@ func TestGetConf_Apply(t *testing.T) {
 			},
 		},
 	}
-	n := NewSourceNode("test", map[string]string{
+	n := NewSourceNode("test", xsql.TypeStream, map[string]string{
 		"DATASOURCE": "RFC_READ_TABLE",
 		"TYPE":       "test",
 	})
@@ -48,7 +49,7 @@ func TestGetConfAndConvert_Apply(t *testing.T) {
 		},
 		"deduplicate": 50,
 	}
-	n := NewSourceNode("test", map[string]string{
+	n := NewSourceNode("test", xsql.TypeStream, map[string]string{
 		"DATASOURCE": "test",
 		"TYPE":       "random",
 		"CONF_KEY":   "dedup",

+ 0 - 99
xstream/nodes/table_node.go

@@ -1,99 +0,0 @@
-package nodes
-
-import (
-	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/xsql"
-	"github.com/emqx/kuiper/xstream/api"
-	"github.com/emqx/kuiper/xstream/extensions"
-)
-
-// Node for table source
-type TableNode struct {
-	*defaultNode
-	sourceType string
-	options    map[string]string
-}
-
-func NewTableNode(name string, options map[string]string) *TableNode {
-	t, ok := options["TYPE"]
-	if !ok {
-		t = "file"
-	}
-	return &TableNode{
-		sourceType: t,
-		defaultNode: &defaultNode{
-			name:        name,
-			outputs:     make(map[string]chan<- interface{}),
-			concurrency: 1,
-		},
-		options: options,
-	}
-}
-
-func (m *TableNode) Open(ctx api.StreamContext, errCh chan<- error) {
-	m.ctx = ctx
-	logger := ctx.GetLogger()
-	logger.Infof("open table node %s with option %v", m.name, m.options)
-	go func() {
-		props := getSourceConf(ctx, m.sourceType, m.options)
-		//TODO apply properties like concurrency
-		source, err := doGetTableSource(m.sourceType)
-		if err != nil {
-			m.drainError(errCh, err, ctx)
-			return
-		}
-		err = source.Configure(m.options["DATASOURCE"], props)
-		if err != nil {
-			m.drainError(errCh, err, ctx)
-			return
-		}
-		stats, err := NewStatManager("source", ctx)
-		if err != nil {
-			m.drainError(errCh, err, ctx)
-			return
-		}
-		m.statManagers = append(m.statManagers, stats)
-		stats.ProcessTimeStart()
-		if data, err := source.Load(ctx); err != nil {
-			stats.IncTotalExceptions()
-			stats.ProcessTimeEnd()
-			m.drainError(errCh, err, ctx)
-			return
-		} else {
-			stats.IncTotalRecordsIn()
-			stats.ProcessTimeEnd()
-			logger.Debugf("table node %s is sending result", m.name)
-			result := make([]*xsql.Tuple, len(data))
-			for i, t := range data {
-				tuple := &xsql.Tuple{Emitter: m.name, Message: t.Message(), Metadata: t.Meta(), Timestamp: common.GetNowInMilli()}
-				result[i] = tuple
-			}
-			m.doBroadcast(result)
-			stats.IncTotalRecordsOut()
-			logger.Debugf("table node %s has consumed all data", m.name)
-		}
-	}()
-}
-
-func (m *TableNode) drainError(errCh chan<- error, err error, ctx api.StreamContext) {
-	select {
-	case errCh <- err:
-	case <-ctx.Done():
-
-	}
-	return
-}
-
-func doGetTableSource(t string) (api.TableSource, error) {
-	var s api.TableSource
-	switch t {
-	case "file":
-		s = &extensions.FileSource{}
-	default: //TODO table source plugin
-		//s, err = plugins.GetTableSource(t)
-		//if err != nil {
-		//	return nil, err
-		//}
-	}
-	return s, nil
-}

+ 0 - 81
xstream/nodes/table_node_test.go

@@ -1,81 +0,0 @@
-package nodes
-
-import (
-	"github.com/emqx/kuiper/common"
-	"github.com/emqx/kuiper/xsql"
-	"github.com/emqx/kuiper/xstream/contexts"
-	"github.com/emqx/kuiper/xstream/topotest/mockclock"
-	"reflect"
-	"testing"
-)
-
-func TestTableNode(t *testing.T) {
-	mockclock.ResetClock(1541152486000)
-	var tests = []struct {
-		name    string
-		options map[string]string
-		result  []*xsql.Tuple
-	}{
-		{ //0
-			name: "test0",
-			options: map[string]string{
-				"TYPE":       "file",
-				"DATASOURCE": "lookup.json",
-				"CONF_KEY":   "test",
-			},
-			result: []*xsql.Tuple{
-				{
-					Emitter: "test0",
-					Message: map[string]interface{}{
-						"id":   float64(1541152486013),
-						"name": "name1",
-						"size": float64(2),
-					},
-					Timestamp: common.GetNowInMilli(),
-				},
-				{
-					Emitter: "test0",
-					Message: map[string]interface{}{
-						"id":   float64(1541152487632),
-						"name": "name2",
-						"size": float64(6),
-					},
-					Timestamp: common.GetNowInMilli(),
-				},
-				{
-					Emitter: "test0",
-					Message: map[string]interface{}{
-						"id":   float64(1541152489252),
-						"name": "name3",
-						"size": float64(4),
-					},
-					Timestamp: common.GetNowInMilli(),
-				},
-			},
-		},
-	}
-
-	t.Logf("The test bucket size is %d.\n\n", len(tests))
-	for i, tt := range tests {
-		n := NewTableNode(tt.name, tt.options)
-		resultCh := make(chan interface{})
-		errCh := make(chan error)
-		contextLogger := common.Log.WithField("test", "test")
-		ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
-		n.AddOutput(resultCh, "test")
-		n.Open(ctx, errCh)
-		select {
-		case err := <-errCh:
-			t.Error(err)
-		case d := <-resultCh:
-			r, ok := d.([]*xsql.Tuple)
-			if !ok {
-				t.Errorf("%d. \nresult is not tuple list:got=%#v\n\n", i, d)
-				break
-			}
-			if !reflect.DeepEqual(tt.result, r) {
-				t.Errorf("%d. \nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, r)
-			}
-		}
-	}
-}

+ 30 - 14
xstream/operators/table_processor.go

@@ -9,10 +9,14 @@ import (
 type TableProcessor struct {
 	//Pruned stream fields. Could be streamField(with data type info) or string
 	defaultFieldProcessor
+
+	isBatchInput bool
+	output       xsql.WindowTuples
+	count        int
 }
 
-func NewTableProcessor(fields []interface{}, fs xsql.Fields, timestampFormat string) (*TableProcessor, error) {
-	p := &TableProcessor{}
+func NewTableProcessor(fields []interface{}, fs xsql.Fields, timestampFormat string, isBatchInput bool) (*TableProcessor, error) {
+	p := &TableProcessor{isBatchInput: isBatchInput}
 	p.defaultFieldProcessor = defaultFieldProcessor{
 		streamFields: fields, aliasFields: fs, isBinary: false, timestampFormat: timestampFormat,
 	}
@@ -20,27 +24,39 @@ func NewTableProcessor(fields []interface{}, fs xsql.Fields, timestampFormat str
 }
 
 /*
- *	input: []*xsql.Tuple
+ *	input: *xsql.Tuple or BatchCount
  *	output: WindowTuples
  */
 func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
 	logger := ctx.GetLogger()
-	tuples, ok := data.([]*xsql.Tuple)
+	tuple, ok := data.(*xsql.Tuple)
 	if !ok {
-		return fmt.Errorf("expect []*xsql.Tuple data type")
+		return fmt.Errorf("expect *xsql.Tuple data type")
 	}
-	logger.Debugf("Start to process table fields")
-	w := xsql.WindowTuples{
-		Emitter: tuples[0].Emitter,
-		Tuples:  make([]xsql.Tuple, len(tuples)),
+	logger.Debugf("preprocessor receive %v", tuple)
+
+	if p.count == 0 {
+		p.output = xsql.WindowTuples{
+			Emitter: tuple.Emitter,
+			Tuples:  make([]xsql.Tuple, 0),
+		}
 	}
-	for i, t := range tuples {
-		result, err := p.processField(t, fv)
+
+	if tuple.Message != nil {
+		result, err := p.processField(tuple, fv)
 		if err != nil {
 			return fmt.Errorf("error in table processor: %s", err)
 		}
-		t.Message = result
-		w.Tuples[i] = *t
+		tuple.Message = result
+		p.output.Tuples = append(p.output.Tuples, *tuple)
+		if !p.isBatchInput {
+			return p.output
+		} else {
+			p.count = p.count + 1
+		}
+	} else if p.isBatchInput { // EOF
+		p.count = 0
+		return p.output
 	}
-	return w
+	return nil
 }

+ 7 - 7
xstream/operators/table_processor_test.go

@@ -105,7 +105,7 @@ func TestTableProcessor_Apply(t *testing.T) {
 	contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
-		pp := &TableProcessor{}
+		pp := &TableProcessor{isBatchInput: true}
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
 
 		var dm []map[string]interface{}
@@ -113,15 +113,15 @@ func TestTableProcessor_Apply(t *testing.T) {
 			t.Log(e)
 			t.Fail()
 		} else {
-			tuples := make([]*xsql.Tuple, len(dm))
-			for i, m := range dm {
-				tuples[i] = &xsql.Tuple{
+			fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
+			for _, m := range dm {
+				pp.Apply(ctx, &xsql.Tuple{
 					Emitter: "demo",
 					Message: m,
-				}
+				}, fv, afv)
 			}
-			fv, afv := xsql.NewFunctionValuersForOp(nil, xsql.FuncRegisters)
-			result := pp.Apply(ctx, tuples, fv, afv)
+
+			result := pp.Apply(ctx, &xsql.Tuple{}, fv, afv)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. result mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.result, result)
 			}

+ 11 - 3
xstream/planner/planner.go

@@ -110,7 +110,7 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption,
 			}
 			var srcNode *nodes.SourceNode
 			if len(sources) == 0 {
-				node := nodes.NewSourceNode(t.name, t.streamStmt.Options)
+				node := nodes.NewSourceNode(t.name, t.streamStmt.StreamType, t.streamStmt.Options)
 				srcNode = node
 			} else {
 				found := false
@@ -128,11 +128,11 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption,
 			op = Transform(pp, fmt.Sprintf("%d_preprocessor_%s", newIndex, t.name), options)
 			inputs = []api.Emitter{srcNode}
 		case xsql.TypeTable:
-			pp, err := operators.NewTableProcessor(t.streamFields, t.alias, t.timestampFormat)
+			pp, err := operators.NewTableProcessor(t.streamFields, t.alias, t.timestampFormat, isBatch(t.streamStmt.Options))
 			if err != nil {
 				return nil, 0, err
 			}
-			srcNode := nodes.NewTableNode(t.name, t.streamStmt.Options)
+			srcNode := nodes.NewSourceNode(t.name, t.streamStmt.StreamType, t.streamStmt.Options)
 			tp.AddSrc(srcNode)
 			op = Transform(pp, fmt.Sprintf("%d_tableprocessor_%s", newIndex, t.name), options)
 			inputs = []api.Emitter{srcNode}
@@ -177,6 +177,14 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption,
 	return op, newIndex, nil
 }
 
+func isBatch(options xsql.Options) bool {
+	t, ok := options["TYPE"]
+	if !ok || t == "file" {
+		return true
+	}
+	return false
+}
+
 func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
 	streamsFromStmt := xsql.GetStreams(stmt)
 	dimensions := stmt.Dimensions

+ 4 - 4
xstream/topotest/rule_test.go

@@ -437,7 +437,7 @@ func TestSingleSQL(t *testing.T) {
 				"op_1_preprocessor_demo_0_records_out_total": int64(5),
 
 				"op_2_tableprocessor_table1_0_exceptions_total":  int64(0),
-				"op_2_tableprocessor_table1_0_records_in_total":  int64(1),
+				"op_2_tableprocessor_table1_0_records_in_total":  int64(4),
 				"op_2_tableprocessor_table1_0_records_out_total": int64(1),
 
 				"op_3_join_aligner_0_records_in_total":  int64(6),
@@ -460,8 +460,8 @@ func TestSingleSQL(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"source_table1_0_exceptions_total":  int64(0),
-				"source_table1_0_records_in_total":  int64(1),
-				"source_table1_0_records_out_total": int64(1),
+				"source_table1_0_records_in_total":  int64(4),
+				"source_table1_0_records_out_total": int64(4),
 			},
 		},
 	}
@@ -483,7 +483,7 @@ func TestSingleSQL(t *testing.T) {
 		},
 	}
 	for j, opt := range options {
-		DoRuleTest(t, tests[9:10], j, opt, 0)
+		DoRuleTest(t, tests, j, opt, 0)
 	}
 }
 

+ 3 - 3
xstream/topotest/window_rule_test.go

@@ -662,7 +662,7 @@ func TestWindow(t *testing.T) {
 				"op_1_preprocessor_demo_0_records_out_total": int64(5),
 
 				"op_4_tableprocessor_table1_0_exceptions_total":  int64(0),
-				"op_4_tableprocessor_table1_0_records_in_total":  int64(1),
+				"op_4_tableprocessor_table1_0_records_in_total":  int64(4),
 				"op_4_tableprocessor_table1_0_records_out_total": int64(1),
 
 				"op_5_filter_0_exceptions_total":  int64(0),
@@ -689,8 +689,8 @@ func TestWindow(t *testing.T) {
 				"source_demo_0_records_out_total": int64(5),
 
 				"source_table1_0_exceptions_total":  int64(0),
-				"source_table1_0_records_in_total":  int64(1),
-				"source_table1_0_records_out_total": int64(1),
+				"source_table1_0_records_in_total":  int64(4),
+				"source_table1_0_records_out_total": int64(4),
 			},
 		},
 	}