浏览代码

feat(stream): support schema-less in preprocessor

ngjaying 5 年之前
父节点
当前提交
6d364d187b
共有 2 个文件被更改,包括 147 次插入7 次删除
  1. 9 5
      xsql/plans/preprocessor.go
  2. 138 2
      xsql/plans/preprocessor_test.go

+ 9 - 5
xsql/plans/preprocessor.go

@@ -51,12 +51,16 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}) interface{
 	log.Debugf("preprocessor receive %s", tuple.Message)
 
 	result := make(map[string]interface{})
-	for _, f := range p.streamStmt.StreamFields {
-		fname := strings.ToLower(f.Name)
-		if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
-			log.Errorf("error in preprocessor: %s", e)
-			return nil
+	if p.streamStmt.StreamFields != nil {
+		for _, f := range p.streamStmt.StreamFields {
+			fname := strings.ToLower(f.Name)
+			if e := p.addRecField(f.FieldType, result, tuple.Message, fname); e != nil {
+				log.Errorf("error in preprocessor: %s", e)
+				return nil
+			}
 		}
+	} else {
+		result = tuple.Message
 	}
 
 	//If the field has alias name, then evaluate the alias field before transfer it to proceeding operators, and put it into result.

+ 138 - 2
xsql/plans/preprocessor_test.go

@@ -30,7 +30,17 @@ func TestPreprocessor_Apply(t *testing.T) {
 			data:   []byte(`{"a": 6}`),
 			result: nil,
 		},
-
+		{
+			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"a": 6}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": float64(6),
+			},
+			},
+		},
 		{
 			stmt: &xsql.StreamStmt{
 				Name: xsql.StreamName("demo"),
@@ -40,7 +50,18 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			data: []byte(`{"abc": 6}`),
 			result: &xsql.Tuple{Message: xsql.Message{
-				"abc": int(6),
+				"abc": 6,
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"abc": 6}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"abc": float64(6),
 			},
 			},
 		},
@@ -61,6 +82,19 @@ func TestPreprocessor_Apply(t *testing.T) {
 		},
 		{
 			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"abc": 34, "def" : "hello", "ghi": 50}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"abc": float64(34),
+				"def": "hello",
+				"ghi": float64(50),
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
 				Name: xsql.StreamName("demo"),
 				StreamFields: []xsql.StreamField{
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
@@ -96,6 +130,19 @@ func TestPreprocessor_Apply(t *testing.T) {
 			data:   []byte(`{"a": {"b" : "hello"}}`),
 			result: nil,
 		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"a": {"b" : "hello"}}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}{
+					"b": "hello",
+				},
+			},
+			},
+		},
 		//Rec type
 		{
 			stmt: &xsql.StreamStmt{
@@ -136,6 +183,19 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"a": {"b" : "32"}}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}{
+					"b": "32",
+				},
+			},
+			},
+		},
 		//Array of complex type
 		{
 			stmt: &xsql.StreamStmt{
@@ -162,6 +222,20 @@ func TestPreprocessor_Apply(t *testing.T) {
 		},
 		{
 			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"a": [{"b" : "hello1"}, {"b" : "hello2"}]}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": []interface{}{
+					map[string]interface{}{"b": "hello1"},
+					map[string]interface{}{"b": "hello2"},
+				},
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
 				Name: xsql.StreamName("demo"),
 				StreamFields: []xsql.StreamField{
 					{Name: "a", FieldType: &xsql.ArrayType{
@@ -178,6 +252,20 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"a": [55, 77]}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": []interface{}{
+					float64(55),
+					float64(77),
+				},
+			},
+			},
+		},
 		//Rec of complex type
 		{
 			stmt: &xsql.StreamStmt{
@@ -206,6 +294,22 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"a": {"b" : "hello", "c": {"d": 35.2}}}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}{
+					"b": "hello",
+					"c": map[string]interface{}{
+						"d": 35.2,
+					},
+				},
+			},
+			},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -255,6 +359,18 @@ func TestPreprocessorTime_Apply(t *testing.T) {
 		},
 		{
 			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+			},
+			data: []byte(`{"abc": "2019-09-19T00:55:15.000Z", "def" : 1568854573431}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"abc": "2019-09-19T00:55:15.000Z",
+				"def": float64(1568854573431),
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
 				Name: xsql.StreamName("demo"),
 				StreamFields: []xsql.StreamField{
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.DATETIME}},
@@ -391,6 +507,26 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 		},
 		{
 			stmt: &xsql.StreamStmt{
+				Name:         xsql.StreamName("demo"),
+				StreamFields: nil,
+				Options: map[string]string{
+					"DATASOURCE":       "users",
+					"FORMAT":           "AVRO",
+					"KEY":              "USERID",
+					"CONF_KEY":         "srv1",
+					"TYPE":             "MQTT",
+					"TIMESTAMP":        "abc",
+					"TIMESTAMP_FORMAT": "yyyy-MM-dd''T''HH:mm:ssX'",
+				},
+			},
+			data: []byte(`{"abc": 1568854515000}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"abc": float64(1568854515000),
+			}, Timestamp: 1568854515000,
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
 				Name: xsql.StreamName("demo"),
 				StreamFields: []xsql.StreamField{
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},