瀏覽代碼

feat(table): add retainSize option for stream

1. Enable temporal stream source to become a table
2. Refactor parser for stream options
ngjaying 4 年之前
父節點
當前提交
38fbdd5176

+ 0 - 13
common/util.go

@@ -2,7 +2,6 @@ package common
 
 import (
 	"archive/zip"
-	"bytes"
 	"encoding/json"
 	"fmt"
 	"github.com/benbjohnson/clock"
@@ -16,7 +15,6 @@ import (
 	"os"
 	"path"
 	"path/filepath"
-	"sort"
 	"strings"
 	"time"
 )
@@ -171,17 +169,6 @@ func InitConf() {
 	}
 }
 
-func PrintMap(m map[string]string, buff *bytes.Buffer) {
-	si := make([]string, 0, len(m))
-	for s := range m {
-		si = append(si, s)
-	}
-	sort.Strings(si)
-	for _, s := range si {
-		buff.WriteString(fmt.Sprintf("%s: %s\n", s, m[s]))
-	}
-}
-
 func CloseLogger() {
 	if logFile != nil {
 		logFile.Close()

+ 12 - 2
xsql/ast.go

@@ -312,7 +312,17 @@ func (fr *MetaRef) expr() {}
 func (fr *MetaRef) node() {}
 
 // The stream AST tree
-type Options map[string]string
+type Options struct {
+	DATASOURCE        string
+	KEY               string
+	FORMAT            string
+	CONF_KEY          string
+	TYPE              string
+	STRICT_VALIDATION bool
+	TIMESTAMP         string
+	TIMESTAMP_FORMAT  string
+	RETAIN_SIZE       int
+}
 
 func (o Options) node() {}
 
@@ -335,7 +345,7 @@ var StreamTypeMap = map[StreamType]string{
 type StreamStmt struct {
 	Name         StreamName
 	StreamFields StreamFields
-	Options      Options
+	Options      *Options
 	StreamType   StreamType //default to TypeStream
 }
 

+ 4 - 0
xsql/lexical.go

@@ -120,6 +120,7 @@ const (
 	STRICT_VALIDATION
 	TIMESTAMP
 	TIMESTAMP_FORMAT
+	RETAIN_SIZE
 
 	DD
 	HH
@@ -212,6 +213,7 @@ var tokens = []string{
 	STRICT_VALIDATION: "STRICT_VALIDATION",
 	TIMESTAMP:         "TIMESTAMP",
 	TIMESTAMP_FORMAT:  "TIMESTAMP_FORMAT",
+	RETAIN_SIZE:       "RETAIN_SIZE",
 
 	AND:   "AND",
 	OR:    "OR",
@@ -481,6 +483,8 @@ func (s *Scanner) ScanIdent() (tok Token, lit string) {
 		return TIMESTAMP, lit
 	case "TIMESTAMP_FORMAT":
 		return TIMESTAMP_FORMAT, lit
+	case "RETAIN_SIZE":
+		return RETAIN_SIZE, lit
 	case "DD":
 		return DD, lit
 	case "HH":

+ 25 - 16
xsql/parser.go

@@ -6,6 +6,7 @@ import (
 	"github.com/golang-collections/collections/stack"
 	"io"
 	"math"
+	"reflect"
 	"strconv"
 	"strings"
 )
@@ -881,8 +882,8 @@ func (p *Parser) ParseCreateStmt() (Statement, error) {
 
 // TODO more accurate validation for table
 func validateStream(stmt *StreamStmt) error {
-	f, ok := stmt.Options["FORMAT"]
-	if !ok {
+	f := stmt.Options.FORMAT
+	if f == "" {
 		f = common.FORMAT_JSON
 	}
 	switch strings.ToLower(f) {
@@ -909,14 +910,6 @@ func validateStream(stmt *StreamStmt) error {
 	default:
 		return fmt.Errorf("option 'format=%s' is invalid", f)
 	}
-
-	if stmt.StreamType == TypeTable {
-		if t, ok := stmt.Options["TYPE"]; ok {
-			if strings.ToLower(t) != "file" {
-				return fmt.Errorf("table only supports 'file' type")
-			}
-		}
-	}
 	return nil
 }
 
@@ -1188,21 +1181,37 @@ func (p *Parser) parseStreamStructType() (FieldType, error) {
 	return rf, nil
 }
 
-func (p *Parser) parseStreamOptions() (map[string]string, error) {
-	var opts = make(map[string]string)
+func (p *Parser) parseStreamOptions() (*Options, error) {
+	opts := &Options{}
+	v := reflect.ValueOf(opts)
 	lStack := &stack.Stack{}
 	if tok, lit := p.scanIgnoreWhitespace(); tok == LPAREN {
 		lStack.Push(LPAREN)
 		for {
-			if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == DATASOURCE || tok1 == FORMAT || tok1 == KEY || tok1 == CONF_KEY || tok1 == STRICT_VALIDATION || tok1 == TYPE || tok1 == TIMESTAMP || tok1 == TIMESTAMP_FORMAT {
+			if tok1, lit1 := p.scanIgnoreWhitespace(); tok1 == DATASOURCE || tok1 == FORMAT || tok1 == KEY || tok1 == CONF_KEY || tok1 == STRICT_VALIDATION || tok1 == TYPE || tok1 == TIMESTAMP || tok1 == TIMESTAMP_FORMAT || tok1 == RETAIN_SIZE {
 				if tok2, lit2 := p.scanIgnoreWhitespace(); tok2 == EQ {
 					if tok3, lit3 := p.scanIgnoreWhitespace(); tok3 == STRING {
-						if tok1 == STRICT_VALIDATION {
+						switch tok1 {
+						case STRICT_VALIDATION:
 							if val := strings.ToUpper(lit3); (val != "TRUE") && (val != "FALSE") {
 								return nil, fmt.Errorf("found %q, expect TRUE/FALSE value in %s option.", lit3, tok1)
+							} else {
+								opts.STRICT_VALIDATION = (val == "TRUE")
+							}
+						case RETAIN_SIZE:
+							if val, err := strconv.Atoi(lit3); err != nil {
+								return nil, fmt.Errorf("found %q, expect number value in %s option.", lit3, tok1)
+							} else {
+								opts.RETAIN_SIZE = val
+							}
+						default:
+							f := v.Elem().FieldByName(lit1)
+							if f.IsValid() {
+								f.SetString(lit3)
+							} else { // should not happen
+								return nil, fmt.Errorf("invalid field %s.", lit1)
 							}
 						}
-						opts[lit1] = lit3
 					} else {
 						return nil, fmt.Errorf("found %q, expect string value in option.", lit3)
 					}
@@ -1219,7 +1228,7 @@ func (p *Parser) parseStreamOptions() (map[string]string, error) {
 					return nil, fmt.Errorf("Parenthesis is not matched in options definition.")
 				}
 			} else {
-				return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE).", lit1)
+				return nil, fmt.Errorf("found %q, unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).", lit1)
 			}
 		}
 	} else {

+ 31 - 1
xsql/processors/xsql_processor.go

@@ -200,7 +200,7 @@ func (p *StreamProcessor) execDescribe(stmt xsql.NameNode, st xsql.StreamType) (
 			buff.WriteString("\n")
 		}
 		buff.WriteString("\n")
-		common.PrintMap(s.Options, &buff)
+		printOptions(s.Options, &buff)
 		return buff.String(), err
 	default:
 		return "%s", fmt.Errorf("Error resolving the %s %s, the data in db may be corrupted.", xsql.StreamTypeMap[st], stmt.GetName())
@@ -208,6 +208,36 @@ func (p *StreamProcessor) execDescribe(stmt xsql.NameNode, st xsql.StreamType) (
 
 }
 
+func printOptions(opts *xsql.Options, buff *bytes.Buffer) {
+	if opts.CONF_KEY != "" {
+		buff.WriteString(fmt.Sprintf("CONF_KEY: %s\n", opts.CONF_KEY))
+	}
+	if opts.DATASOURCE != "" {
+		buff.WriteString(fmt.Sprintf("DATASOURCE: %s\n", opts.DATASOURCE))
+	}
+	if opts.FORMAT != "" {
+		buff.WriteString(fmt.Sprintf("FORMAT: %s\n", opts.FORMAT))
+	}
+	if opts.KEY != "" {
+		buff.WriteString(fmt.Sprintf("KEY: %s\n", opts.KEY))
+	}
+	if opts.RETAIN_SIZE != 0 {
+		buff.WriteString(fmt.Sprintf("RETAIN_SIZE: %d\n", opts.RETAIN_SIZE))
+	}
+	if opts.STRICT_VALIDATION {
+		buff.WriteString(fmt.Sprintf("STRICT_VALIDATION: %v\n", opts.STRICT_VALIDATION))
+	}
+	if opts.TIMESTAMP != "" {
+		buff.WriteString(fmt.Sprintf("TIMESTAMP: %s\n", opts.TIMESTAMP))
+	}
+	if opts.TIMESTAMP_FORMAT != "" {
+		buff.WriteString(fmt.Sprintf("TIMESTAMP_FORMAT: %s\n", opts.TIMESTAMP_FORMAT))
+	}
+	if opts.TYPE != "" {
+		buff.WriteString(fmt.Sprintf("TYPE: %s\n", opts.TYPE))
+	}
+}
+
 func (p *StreamProcessor) DescStream(name string, st xsql.StreamType) (xsql.Statement, error) {
 	statement, err := p.getStream(name, st)
 	if err != nil {

+ 14 - 13
xsql/xsql_parser_tree_test.go

@@ -23,26 +23,27 @@ func TestParser_ParseTree(t *testing.T) {
 				StreamFields: []StreamField{
 					{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
-					"KEY":        "USERID",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
 		{
 			s: `CREATE TABLE demo (
 					USERID BIGINT,
-				) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID");`,
+				) WITH (DATASOURCE="users", FORMAT="JSON", KEY="USERID", RETAIN_SIZE="3");`,
 			stmt: &StreamStmt{
 				Name: StreamName("demo"),
 				StreamFields: []StreamField{
 					{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
-					"KEY":        "USERID",
+				Options: &Options{
+					DATASOURCE:  "users",
+					FORMAT:      "JSON",
+					KEY:         "USERID",
+					RETAIN_SIZE: 3,
 				},
 				StreamType: TypeTable,
 			},
@@ -60,10 +61,10 @@ func TestParser_ParseTree(t *testing.T) {
 					{Name: "size", FieldType: &BasicType{Type: BIGINT}},
 					{Name: "id", FieldType: &BasicType{Type: BIGINT}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "lookup.json",
-					"FORMAT":     "json",
-					"CONF_KEY":   "test",
+				Options: &Options{
+					DATASOURCE: "lookup.json",
+					FORMAT:     "json",
+					CONF_KEY:   "test",
 				},
 				StreamType: TypeTable,
 			},

+ 54 - 54
xsql/xsql_stream_test.go

@@ -40,14 +40,14 @@ func TestParser_ParseCreateStream(t *testing.T) {
 						},
 					}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":       "users",
-					"FORMAT":           "JSON",
-					"KEY":              "USERID",
-					"CONF_KEY":         "srv1",
-					"TYPE":             "MQTT",
-					"TIMESTAMP":        "USERID",
-					"TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
+				Options: &Options{
+					DATASOURCE:       "users",
+					FORMAT:           "JSON",
+					KEY:              "USERID",
+					CONF_KEY:         "srv1",
+					TYPE:             "MQTT",
+					TIMESTAMP:        "USERID",
+					TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
 				},
 			},
 		},
@@ -61,11 +61,11 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: []StreamField{
 					{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":        "users",
-					"FORMAT":            "JSON",
-					"KEY":               "USERID",
-					"STRICT_VALIDATION": "true",
+				Options: &Options{
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					STRICT_VALIDATION: true,
 				},
 			},
 		},
@@ -87,11 +87,11 @@ func TestParser_ParseCreateStream(t *testing.T) {
 						},
 					}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":        "users",
-					"FORMAT":            "JSON",
-					"KEY":               "USERID",
-					"STRICT_VALIDATION": "FAlse",
+				Options: &Options{
+					DATASOURCE:        "users",
+					FORMAT:            "JSON",
+					KEY:               "USERID",
+					STRICT_VALIDATION: false,
 				},
 			},
 		},
@@ -115,10 +115,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					}},
 					{Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
-					"KEY":        "USERID",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -144,10 +144,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					}},
 					{Name: "birthday", FieldType: &BasicType{Type: DATETIME}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
-					"KEY":        "USERID",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -159,10 +159,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 			stmt: &StreamStmt{
 				Name:         StreamName("demo"),
 				StreamFields: nil,
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
-					"KEY":        "USERID",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -172,10 +172,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 			stmt: &StreamStmt{
 				Name:         StreamName("demo"),
 				StreamFields: nil,
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
-					"KEY":        "USERID",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -194,10 +194,10 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: []StreamField{
 					{Name: "NAME", FieldType: &BasicType{Type: STRINGS}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
-					"KEY":        "USERID",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
+					KEY:        "USERID",
 				},
 			},
 		},
@@ -239,7 +239,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: nil,
 				Options:      nil,
 			},
-			err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE).`,
+			err: `found "sources", unknown option keys(DATASOURCE|FORMAT|KEY|CONF_KEY|STRICT_VALIDATION|TYPE|TIMESTAMP|TIMESTAMP_FORMAT|RETAIN_SIZE).`,
 		},
 
 		{
@@ -261,7 +261,7 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: []StreamField{
 					{Name: "USERID", FieldType: &BasicType{Type: BIGINT}},
 				},
-				Options: map[string]string{},
+				Options: &Options{},
 			},
 		},
 
@@ -322,11 +322,11 @@ func TestParser_ParseCreateStream(t *testing.T) {
 						},
 					}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "test",
-					"FORMAT":     "JSON",
-					"CONF_KEY":   "democonf",
-					"TYPE":       "MQTT",
+				Options: &Options{
+					DATASOURCE: "test",
+					FORMAT:     "JSON",
+					CONF_KEY:   "democonf",
+					TYPE:       "MQTT",
 				},
 			},
 		}, {
@@ -344,9 +344,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
 					{Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
 				},
 			},
 		}, {
@@ -364,9 +364,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 					{Name: "LAST_NAME", FieldType: &BasicType{Type: STRINGS}},
 					{Name: "PICTURE", FieldType: &BasicType{Type: BYTEA}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "JSON",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "JSON",
 				},
 			},
 		}, {
@@ -391,9 +391,9 @@ func TestParser_ParseCreateStream(t *testing.T) {
 				StreamFields: []StreamField{
 					{Name: "image", FieldType: &BasicType{Type: BYTEA}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"FORMAT":     "BINARY",
+				Options: &Options{
+					DATASOURCE: "users",
+					FORMAT:     "BINARY",
 				},
 			},
 		},

+ 16 - 9
xstream/extensions/file_source.go

@@ -22,9 +22,10 @@ var fileTypes = map[FileType]bool{
 }
 
 type FileSourceConfig struct {
-	FileType FileType `json:"fileType"`
-	Path     string   `json:"path"`
-	Interval int      `json:"interval"`
+	FileType   FileType `json:"fileType"`
+	Path       string   `json:"path"`
+	Interval   int      `json:"interval"`
+	RetainSize int      `json:"$retainSize"`
 }
 
 // The BATCH to load data from file at once
@@ -112,6 +113,10 @@ func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTupl
 			return fmt.Errorf("loaded %s, check error %s", fs.file, err)
 		}
 		ctx.GetLogger().Debug("Sending tuples")
+		if fs.config.RetainSize > 0 && fs.config.RetainSize < len(resultMap) {
+			resultMap = resultMap[(len(resultMap) - fs.config.RetainSize):]
+			ctx.GetLogger().Debug("Sending tuples for retain size %d", fs.config.RetainSize)
+		}
 		for _, m := range resultMap {
 			select {
 			case consumer <- api.NewDefaultSourceTuple(m, nil):
@@ -120,12 +125,14 @@ func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTupl
 				return nil
 			}
 		}
-		// Send EOF
-		select {
-		case consumer <- api.NewDefaultSourceTuple(nil, nil):
-			// do nothing
-		case <-ctx.Done():
-			return nil
+		// Send EOF if retain size not set
+		if fs.config.RetainSize == 0 {
+			select {
+			case consumer <- api.NewDefaultSourceTuple(nil, nil):
+				// do nothing
+			case <-ctx.Done():
+				return nil
+			}
 		}
 		ctx.GetLogger().Debug("All tuples sent")
 		return nil

+ 5 - 4
xstream/nodes/node.go

@@ -3,6 +3,7 @@ package nodes
 import (
 	"fmt"
 	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/checkpoints"
 	"github.com/go-yaml/yaml"
@@ -156,8 +157,8 @@ func (o *defaultSinkNode) preprocess(data interface{}) (interface{}, bool) {
 	return data, false
 }
 
-func getSourceConf(ctx api.StreamContext, sourceType string, options map[string]string) map[string]interface{} {
-	confkey := options["CONF_KEY"]
+func getSourceConf(ctx api.StreamContext, sourceType string, options *xsql.Options) map[string]interface{} {
+	confkey := options.CONF_KEY
 	logger := ctx.GetLogger()
 	confPath := "sources/" + sourceType + ".yaml"
 	if sourceType == "mqtt" {
@@ -190,8 +191,8 @@ func getSourceConf(ctx api.StreamContext, sourceType string, options map[string]
 	} else {
 		logger.Warnf("config file %s.yaml is not loaded properly. Return an empty configuration", sourceType)
 	}
-	f, ok := options["FORMAT"]
-	if !ok || f == "" {
+	f := options.FORMAT
+	if f == "" {
 		f = "json"
 	}
 	props["format"] = strings.ToLower(f)

+ 11 - 6
xstream/nodes/source_node.go

@@ -13,16 +13,16 @@ type SourceNode struct {
 	*defaultNode
 	streamType xsql.StreamType
 	sourceType string
-	options    map[string]string
+	options    *xsql.Options
 	isMock     bool
 
 	mutex   sync.RWMutex
 	sources []api.Source
 }
 
-func NewSourceNode(name string, st xsql.StreamType, options map[string]string) *SourceNode {
-	t, ok := options["TYPE"]
-	if !ok {
+func NewSourceNode(name string, st xsql.StreamType, options *xsql.Options) *SourceNode {
+	t := options.TYPE
+	if t == "" {
 		if st == xsql.TypeStream {
 			t = "mqtt"
 		} else if st == xsql.TypeTable {
@@ -30,6 +30,7 @@ func NewSourceNode(name string, st xsql.StreamType, options map[string]string) *
 		}
 	}
 	return &SourceNode{
+		streamType: st,
 		sourceType: t,
 		defaultNode: &defaultNode{
 			name:        name,
@@ -43,7 +44,7 @@ func NewSourceNode(name string, st xsql.StreamType, options map[string]string) *
 const OFFSET_KEY = "$$offset"
 
 //Only for mock source, do not use it in production
-func NewSourceNodeWithSource(name string, source api.Source, options map[string]string) *SourceNode {
+func NewSourceNodeWithSource(name string, source api.Source, options *xsql.Options) *SourceNode {
 	return &SourceNode{
 		sources: []api.Source{source},
 		defaultNode: &defaultNode{
@@ -77,6 +78,10 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 				bl = t
 			}
 		}
+		// Set retain size for table type
+		if m.options.RETAIN_SIZE > 0 && m.streamType == xsql.TypeTable {
+			props["$retainSize"] = m.options.RETAIN_SIZE
+		}
 		m.reset()
 		logger.Infof("open source node %d instances", m.concurrency)
 		for i := 0; i < m.concurrency; i++ { // workers
@@ -90,7 +95,7 @@ func (m *SourceNode) Open(ctx api.StreamContext, errCh chan<- error) {
 						m.drainError(errCh, err, ctx, logger)
 						return
 					}
-					err = source.Configure(m.options["DATASOURCE"], props)
+					err = source.Configure(m.options.DATASOURCE, props)
 					if err != nil {
 						m.drainError(errCh, err, ctx, logger)
 						return

+ 7 - 7
xstream/nodes/source_node_test.go

@@ -27,9 +27,9 @@ func TestGetConf_Apply(t *testing.T) {
 			},
 		},
 	}
-	n := NewSourceNode("test", xsql.TypeStream, map[string]string{
-		"DATASOURCE": "RFC_READ_TABLE",
-		"TYPE":       "test",
+	n := NewSourceNode("test", xsql.TypeStream, &xsql.Options{
+		DATASOURCE: "RFC_READ_TABLE",
+		TYPE:       "test",
 	})
 	contextLogger := common.Log.WithField("rule", "test")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
@@ -49,10 +49,10 @@ func TestGetConfAndConvert_Apply(t *testing.T) {
 		},
 		"deduplicate": 50,
 	}
-	n := NewSourceNode("test", xsql.TypeStream, map[string]string{
-		"DATASOURCE": "test",
-		"TYPE":       "random",
-		"CONF_KEY":   "dedup",
+	n := NewSourceNode("test", xsql.TypeStream, &xsql.Options{
+		DATASOURCE: "test",
+		TYPE:       "random",
+		CONF_KEY:   "dedup",
 	})
 	contextLogger := common.Log.WithField("rule", "test")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)

+ 54 - 52
xstream/operators/preprocessor_test.go

@@ -595,14 +595,14 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
 					{Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":       "users",
-					"FORMAT":           "JSON",
-					"KEY":              "USERID",
-					"CONF_KEY":         "srv1",
-					"TYPE":             "MQTT",
-					"TIMESTAMP":        "USERID",
-					"TIMESTAMP_FORMAT": "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
+				Options: &xsql.Options{
+					DATASOURCE:       "users",
+					FORMAT:           "JSON",
+					KEY:              "USERID",
+					CONF_KEY:         "srv1",
+					TYPE:             "MQTT",
+					TIMESTAMP:        "USERID",
+					TIMESTAMP_FORMAT: "yyyy-MM-dd 'at' HH:mm:ss'Z'X",
 				},
 			},
 			data: []byte(`{"abc": "2019-09-19 at 18:55:15Z+07", "def" : 1568854573431}`),
@@ -661,7 +661,9 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 	for i, tt := range tests {
 		pp := &Preprocessor{}
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
-		pp.timestampFormat = tt.stmt.Options["TIMESTAMP_FORMAT"]
+		if tt.stmt.Options != nil {
+			pp.timestampFormat = tt.stmt.Options.TIMESTAMP_FORMAT
+		}
 		dm := make(map[string]interface{})
 		if e := json.Unmarshal(tt.data, &dm); e != nil {
 			log.Fatal(e)
@@ -709,14 +711,14 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 				StreamFields: []xsql.StreamField{
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":       "users",
-					"FORMAT":           "JSON",
-					"KEY":              "USERID",
-					"CONF_KEY":         "srv1",
-					"TYPE":             "MQTT",
-					"TIMESTAMP":        "abc",
-					"TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
+				Options: &xsql.Options{
+					DATASOURCE:       "users",
+					FORMAT:           "JSON",
+					KEY:              "USERID",
+					CONF_KEY:         "srv1",
+					TYPE:             "MQTT",
+					TIMESTAMP:        "abc",
+					TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
 				},
 			},
 			data: []byte(`{"abc": 1568854515000}`),
@@ -729,14 +731,14 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			stmt: &xsql.StreamStmt{
 				Name:         xsql.StreamName("demo"),
 				StreamFields: nil,
-				Options: map[string]string{
-					"DATASOURCE":       "users",
-					"FORMAT":           "JSON",
-					"KEY":              "USERID",
-					"CONF_KEY":         "srv1",
-					"TYPE":             "MQTT",
-					"TIMESTAMP":        "abc",
-					"TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
+				Options: &xsql.Options{
+					DATASOURCE:       "users",
+					FORMAT:           "JSON",
+					KEY:              "USERID",
+					CONF_KEY:         "srv1",
+					TYPE:             "MQTT",
+					TIMESTAMP:        "abc",
+					TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
 				},
 			},
 			data: []byte(`{"abc": 1568854515000}`),
@@ -751,9 +753,9 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 				StreamFields: []xsql.StreamField{
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"TIMESTAMP":  "abc",
+				Options: &xsql.Options{
+					DATASOURCE: "users",
+					TIMESTAMP:  "abc",
 				},
 			},
 			data:   []byte(`{"abc": true}`),
@@ -766,9 +768,9 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
 					{Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"TIMESTAMP":  "def",
+				Options: &xsql.Options{
+					DATASOURCE: "users",
+					TIMESTAMP:  "def",
 				},
 			},
 			data: []byte(`{"abc": 34, "def" : "2019-09-23T02:47:29.754Z", "ghi": 50}`),
@@ -785,9 +787,9 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
 					{Name: "def", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
 				},
-				Options: map[string]string{
-					"DATASOURCE": "users",
-					"TIMESTAMP":  "abc",
+				Options: &xsql.Options{
+					DATASOURCE: "users",
+					TIMESTAMP:  "abc",
 				},
 			},
 			data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
@@ -804,10 +806,10 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
 					{Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":       "users",
-					"TIMESTAMP":        "def",
-					"TIMESTAMP_FORMAT": "yyyy-MM-dd'AT'HH:mm:ss",
+				Options: &xsql.Options{
+					DATASOURCE:       "users",
+					TIMESTAMP:        "def",
+					TIMESTAMP_FORMAT: "yyyy-MM-dd'AT'HH:mm:ss",
 				},
 			},
 			data: []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
@@ -824,10 +826,10 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
 					{Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":       "users",
-					"TIMESTAMP":        "def",
-					"TIMESTAMP_FORMAT": "yyyy-MM-ddaHH:mm:ss",
+				Options: &xsql.Options{
+					DATASOURCE:       "users",
+					TIMESTAMP:        "def",
+					TIMESTAMP_FORMAT: "yyyy-MM-ddaHH:mm:ss",
 				},
 			},
 			data:   []byte(`{"abc": 34, "def" : "2019-09-23AT02:47:29", "ghi": 50}`),
@@ -847,10 +849,10 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 				streamFields:    convertFields(tt.stmt.StreamFields),
 				aliasFields:     nil,
 				isBinary:        false,
-				timestampFormat: tt.stmt.Options["TIMESTAMP_FORMAT"],
+				timestampFormat: tt.stmt.Options.TIMESTAMP_FORMAT,
 			},
 			isEventTime:    true,
-			timestampField: tt.stmt.Options["TIMESTAMP"],
+			timestampField: tt.stmt.Options.TIMESTAMP,
 		}
 
 		dm := make(map[string]interface{})
@@ -909,14 +911,14 @@ func TestPreprocessorError(t *testing.T) {
 				StreamFields: []xsql.StreamField{
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
 				},
-				Options: map[string]string{
-					"DATASOURCE":       "users",
-					"FORMAT":           "JSON",
-					"KEY":              "USERID",
-					"CONF_KEY":         "srv1",
-					"TYPE":             "MQTT",
-					"TIMESTAMP":        "abc",
-					"TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
+				Options: &xsql.Options{
+					DATASOURCE:       "users",
+					FORMAT:           "JSON",
+					KEY:              "USERID",
+					CONF_KEY:         "srv1",
+					TYPE:             "MQTT",
+					TIMESTAMP:        "abc",
+					TIMESTAMP_FORMAT: "yyyy-MM-dd''T''HH:mm:ssX'",
 				},
 			},
 			data:   []byte(`{"abc": "not a time"}`),

+ 30 - 16
xstream/operators/table_processor.go

@@ -10,15 +10,28 @@ type TableProcessor struct {
 	//Pruned stream fields. Could be streamField(with data type info) or string
 	defaultFieldProcessor
 
-	isBatchInput bool
-	output       xsql.WindowTuples
-	count        int
+	isBatchInput bool // whether the inputs are batched, such as file which sends multiple messages at a batch. If batch input, only fires when EOF is received. This is mutual exclusive with retainSize.
+	retainSize   int  // how many(maximum) messages to be retained for each output
+	// States
+	output       xsql.WindowTuples // current batched message collection
+	batchEmitted bool              // if batch input, this is the signal for whether the last batch has emitted. If true, reinitialize.
 }
 
-func NewTableProcessor(fields []interface{}, fs xsql.Fields, timestampFormat string, isBatchInput bool) (*TableProcessor, error) {
-	p := &TableProcessor{isBatchInput: isBatchInput}
+func NewTableProcessor(name string, fields []interface{}, fs xsql.Fields, options *xsql.Options) (*TableProcessor, error) {
+	p := &TableProcessor{
+		output: xsql.WindowTuples{
+			Emitter: name,
+			Tuples:  make([]xsql.Tuple, 0),
+		},
+	}
 	p.defaultFieldProcessor = defaultFieldProcessor{
-		streamFields: fields, aliasFields: fs, isBinary: false, timestampFormat: timestampFormat,
+		streamFields: fields, aliasFields: fs, isBinary: false, timestampFormat: options.TIMESTAMP_FORMAT,
+	}
+	if options.RETAIN_SIZE > 0 {
+		p.retainSize = options.RETAIN_SIZE
+		p.isBatchInput = false
+	} else if isBatch(options.TYPE) {
+		p.isBatchInput = true
 	}
 	return p, nil
 }
@@ -34,29 +47,30 @@ func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql
 		return fmt.Errorf("expect *xsql.Tuple data type")
 	}
 	logger.Debugf("preprocessor receive %v", tuple)
-
-	if p.count == 0 {
-		p.output = xsql.WindowTuples{
-			Emitter: tuple.Emitter,
-			Tuples:  make([]xsql.Tuple, 0),
-		}
+	if p.batchEmitted {
+		p.output.Tuples = make([]xsql.Tuple, 0)
+		p.batchEmitted = false
 	}
-
 	if tuple.Message != nil {
 		result, err := p.processField(tuple, fv)
 		if err != nil {
 			return fmt.Errorf("error in table processor: %s", err)
 		}
 		tuple.Message = result
+		if p.retainSize > 0 && len(p.output.Tuples) == p.retainSize {
+			p.output.Tuples = p.output.Tuples[1:]
+		}
 		p.output.Tuples = append(p.output.Tuples, *tuple)
 		if !p.isBatchInput {
 			return p.output
-		} else {
-			p.count = p.count + 1
 		}
 	} else if p.isBatchInput { // EOF
-		p.count = 0
+		p.batchEmitted = true
 		return p.output
 	}
 	return nil
 }
+
+func isBatch(t string) bool {
+	return t == "file" || t == ""
+}

+ 4 - 0
xstream/operators/table_processor_test.go

@@ -107,6 +107,10 @@ func TestTableProcessor_Apply(t *testing.T) {
 	for i, tt := range tests {
 		pp := &TableProcessor{isBatchInput: true}
 		pp.streamFields = convertFields(tt.stmt.StreamFields)
+		pp.output = xsql.WindowTuples{
+			Emitter: "demo",
+			Tuples:  make([]xsql.Tuple, 0),
+		}
 
 		var dm []map[string]interface{}
 		if e := json.Unmarshal(tt.data, &dm); e != nil {

+ 6 - 8
xstream/planner/dataSourcePlan.go

@@ -175,19 +175,17 @@ func (p *DataSourcePlan) getAllFields() {
 
 func (p *DataSourcePlan) getProps() error {
 	if p.iet {
-		if tf, ok := p.streamStmt.Options["TIMESTAMP"]; ok {
-			p.timestampField = tf
+		if p.streamStmt.Options.TIMESTAMP != "" {
+			p.timestampField = p.streamStmt.Options.TIMESTAMP
 		} else {
 			return fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found")
 		}
-		if ts, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok {
-			p.timestampFormat = ts
+		if p.streamStmt.Options.TIMESTAMP_FORMAT != "" {
+			p.timestampFormat = p.streamStmt.Options.TIMESTAMP_FORMAT
 		}
 	}
-	if f, ok := p.streamStmt.Options["FORMAT"]; ok {
-		if strings.ToLower(f) == common.FORMAT_BINARY {
-			p.isBinary = true
-		}
+	if strings.ToLower(p.streamStmt.Options.FORMAT) == common.FORMAT_BINARY {
+		p.isBinary = true
 	}
 	return nil
 }

+ 16 - 15
xstream/planner/planner.go

@@ -113,14 +113,8 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption,
 				node := nodes.NewSourceNode(t.name, t.streamStmt.StreamType, t.streamStmt.Options)
 				srcNode = node
 			} else {
-				found := false
-				for _, source := range sources {
-					if t.name == source.GetName() {
-						srcNode = source
-						found = true
-					}
-				}
-				if !found {
+				srcNode = getMockSource(sources, t.name)
+				if srcNode == nil {
 					return nil, 0, fmt.Errorf("can't find predefined source %s", t.name)
 				}
 			}
@@ -128,11 +122,17 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption,
 			op = Transform(pp, fmt.Sprintf("%d_preprocessor_%s", newIndex, t.name), options)
 			inputs = []api.Emitter{srcNode}
 		case xsql.TypeTable:
-			pp, err := operators.NewTableProcessor(t.streamFields, t.alias, t.timestampFormat, isBatch(t.streamStmt.Options))
+			pp, err := operators.NewTableProcessor(t.name, t.streamFields, t.alias, t.streamStmt.Options)
 			if err != nil {
 				return nil, 0, err
 			}
-			srcNode := nodes.NewSourceNode(t.name, t.streamStmt.StreamType, t.streamStmt.Options)
+			var srcNode *nodes.SourceNode
+			if len(sources) > 0 {
+				srcNode = getMockSource(sources, t.name)
+			}
+			if srcNode == nil {
+				srcNode = nodes.NewSourceNode(t.name, t.streamStmt.StreamType, t.streamStmt.Options)
+			}
 			tp.AddSrc(srcNode)
 			op = Transform(pp, fmt.Sprintf("%d_tableprocessor_%s", newIndex, t.name), options)
 			inputs = []api.Emitter{srcNode}
@@ -177,12 +177,13 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption,
 	return op, newIndex, nil
 }
 
-func isBatch(options xsql.Options) bool {
-	t, ok := options["TYPE"]
-	if !ok || t == "file" {
-		return true
+func getMockSource(sources []*nodes.SourceNode, name string) *nodes.SourceNode {
+	for _, source := range sources {
+		if name == source.GetName() {
+			return source
+		}
 	}
-	return false
+	return nil
 }
 
 func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {

+ 49 - 2
xstream/topotest/mock_topo.go

@@ -958,6 +958,48 @@ var testData = map[string][]*xsql.Tuple{
 			Timestamp: 1541152488013,
 		},
 	},
+	"demoTable": {
+		{
+			Emitter: "demoTable",
+			Message: map[string]interface{}{
+				"ts":     1541152486013,
+				"device": "device1",
+			},
+			Timestamp: 1541152486501,
+		},
+		{
+			Emitter: "demoTable",
+			Message: map[string]interface{}{
+				"ts":     1541152486822,
+				"device": "device2",
+			},
+			Timestamp: 1541152486502,
+		},
+		{
+			Emitter: "demoTable",
+			Message: map[string]interface{}{
+				"ts":     1541152487632,
+				"device": "device3",
+			},
+			Timestamp: 1541152488001,
+		},
+		{
+			Emitter: "demoTable",
+			Message: map[string]interface{}{
+				"ts":     1541152488442,
+				"device": "device4",
+			},
+			Timestamp: 1541152488002,
+		},
+		{
+			Emitter: "demoTable",
+			Message: map[string]interface{}{
+				"ts":     1541152489252,
+				"device": "device5",
+			},
+			Timestamp: 1541152488003,
+		},
+	},
 }
 
 func commonResultFunc(result [][]byte) interface{} {
@@ -1115,8 +1157,8 @@ func createStream(t *testing.T, tt RuleTest, j int, opt *api.RuleOption, sinkPro
 				}
 				dataLength = len(data)
 				datas = append(datas, data)
-				source := nodes.NewSourceNodeWithSource(stream, mocknodes.NewMockSource(data), map[string]string{
-					"DATASOURCE": stream,
+				source := nodes.NewSourceNodeWithSource(stream, mocknodes.NewMockSource(data), &xsql.Options{
+					DATASOURCE: stream,
 				})
 				sources = append(sources, source)
 			}
@@ -1159,6 +1201,11 @@ func HandleStream(createOrDrop bool, names []string, t *testing.T) {
 					"`from`" + ` STRING,
 					ts BIGINT
 				) WITH (DATASOURCE="demo1", FORMAT="json", KEY="ts");`
+			case "demoTable":
+				sql = `CREATE TABLE demoTable (
+					device STRING,
+					ts BIGINT
+				) WITH (DATASOURCE="demoTable", TYPE="mqtt", RETAIN_SIZE="3");`
 			case "sessionDemo":
 				sql = `CREATE STREAM sessionDemo (
 					temp FLOAT,

+ 48 - 1
xstream/topotest/rule_test.go

@@ -9,7 +9,7 @@ import (
 
 func TestSingleSQL(t *testing.T) {
 	//Reset
-	streamList := []string{"demo", "demoError", "demo1", "table1"}
+	streamList := []string{"demo", "demoError", "demo1", "table1", "demoTable"}
 	HandleStream(false, streamList, t)
 	//Data setup
 	var tests = []RuleTest{
@@ -431,6 +431,7 @@ func TestSingleSQL(t *testing.T) {
 					"ts":    float64(1541152489252),
 				}},
 			},
+			W: 15,
 			M: map[string]interface{}{
 				"op_1_preprocessor_demo_0_exceptions_total":  int64(0),
 				"op_1_preprocessor_demo_0_records_in_total":  int64(5),
@@ -463,6 +464,52 @@ func TestSingleSQL(t *testing.T) {
 				"source_table1_0_records_in_total":  int64(4),
 				"source_table1_0_records_out_total": int64(4),
 			},
+		}, {
+			Name: `TestSingleSQLRule11`,
+			Sql:  "SELECT device FROM demo INNER JOIN demoTable on demo.ts = demoTable.ts",
+			R: [][]map[string]interface{}{
+				{{
+					"device": "device2",
+				}},
+				{{
+					"device": "device4",
+				}},
+				{{
+					"device": "device5",
+				}},
+			},
+			M: map[string]interface{}{
+				"op_1_preprocessor_demo_0_exceptions_total":  int64(0),
+				"op_1_preprocessor_demo_0_records_in_total":  int64(5),
+				"op_1_preprocessor_demo_0_records_out_total": int64(5),
+
+				"op_2_tableprocessor_demoTable_0_exceptions_total":  int64(0),
+				"op_2_tableprocessor_demoTable_0_records_in_total":  int64(5),
+				"op_2_tableprocessor_demoTable_0_records_out_total": int64(5),
+
+				"op_3_join_aligner_0_records_in_total":  int64(10),
+				"op_3_join_aligner_0_records_out_total": int64(5),
+
+				"op_4_join_0_exceptions_total":  int64(0),
+				"op_4_join_0_records_in_total":  int64(5),
+				"op_4_join_0_records_out_total": int64(3),
+
+				"op_5_project_0_exceptions_total":  int64(0),
+				"op_5_project_0_records_in_total":  int64(3),
+				"op_5_project_0_records_out_total": int64(3),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(3),
+				"sink_mockSink_0_records_out_total": int64(3),
+
+				"source_demo_0_exceptions_total":  int64(0),
+				"source_demo_0_records_in_total":  int64(5),
+				"source_demo_0_records_out_total": int64(5),
+
+				"source_demoTable_0_exceptions_total":  int64(0),
+				"source_demoTable_0_records_in_total":  int64(5),
+				"source_demoTable_0_records_out_total": int64(5),
+			},
 		},
 	}
 	HandleStream(true, streamList, t)