Browse Source

feat(preprocessor): support string value conversion to any other types

ngjaying 5 years atrás
parent
commit
85f87b6f80
2 changed files with 165 additions and 31 deletions
  1. 100 26
      xsql/plans/preprocessor.go
  2. 65 5
      xsql/plans/preprocessor_test.go

+ 100 - 26
xsql/plans/preprocessor.go

@@ -1,11 +1,13 @@
 package plans
 
 import (
+	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
 	"github.com/emqx/kuiper/xstream/api"
-	"fmt"
 	"reflect"
+	"strconv"
 	"strings"
 	"time"
 )
@@ -108,12 +110,24 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 					r[n] = t.(int)
 				}else if jtype == reflect.Float64{
 					r[n] = int(t.(float64))
+				}else if jtype == reflect.String {
+					if i, err := strconv.Atoi(t.(string)); err != nil{
+						return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
+					}else{
+						r[n] = i
+					}
 				}else{
 					return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
 				}
 			case xsql.FLOAT:
 				if jtype == reflect.Float64{
 					r[n] = t.(float64)
+				}else if jtype == reflect.String {
+					if f, err := strconv.ParseFloat(t.(string), 64); err != nil{
+						return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
+					}else{
+						r[n] = f
+					}
 				}else{
 					return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
 				}
@@ -143,6 +157,12 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 			case xsql.BOOLEAN:
 				if jtype == reflect.Bool{
 					r[n] = t.(bool)
+				}else if jtype == reflect.String {
+					if i, err := strconv.ParseBool(t.(string)); err != nil{
+						return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
+					}else{
+						r[n] = i
+					}
 				}else{
 					return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
 				}
@@ -150,22 +170,38 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 				return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
 			}
 		case *xsql.ArrayType:
-			if jtype != reflect.Slice{
+			var s []interface{}
+			if jtype == reflect.Slice{
+				s = t.([]interface{})
+			}else if jtype == reflect.String {
+				err := json.Unmarshal([]byte(t.(string)), &s)
+				if err != nil{
+					return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
+				}
+			}else{
 				return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
 			}
-			if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
+
+			if tempArr, err := p.addArrayField(st, s); err !=nil{
 				return err
 			}else {
 				r[n] = tempArr
 			}
 		case *xsql.RecType:
-			if jtype != reflect.Map{
+			nextJ := make(map[string]interface{})
+			if jtype == reflect.Map{
+				nextJ, ok = t.(map[string]interface{})
+				if !ok {
+					return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
+				}
+			}else if jtype == reflect.String {
+				err := json.Unmarshal([]byte(t.(string)), &nextJ)
+				if err != nil{
+					return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
+				}
+			}else{
 				return fmt.Errorf("invalid data type for %s, expect struct but found %s", n, t)
 			}
-			nextJ, ok := j[n].(map[string]interface{})
-			if !ok {
-				return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
-			}
 			nextR := make(map[string]interface{})
 			for _, nextF := range st.StreamFields {
 				nextP := strings.ToLower(nextF.Name)
@@ -190,37 +226,54 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 		switch st := ft.FieldType.(type) { //Only two complex types supported here
 		case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
 			var tempSlice [][]interface{}
+			var s []interface{}
 			for i, t := range srcSlice{
-				if reflect.ValueOf(t).Kind() == reflect.Array{
-					if tempArr, err := p.addArrayField(st, t.([]interface{})); err !=nil{
-						return nil, err
-					}else {
-						tempSlice = append(tempSlice, tempArr.([]interface{}))
+				jtype := reflect.ValueOf(t).Kind()
+				if jtype == reflect.Slice || jtype == reflect.Array{
+					s = t.([]interface{})
+				}else if jtype == reflect.String {
+					err := json.Unmarshal([]byte(t.(string)), &s)
+					if err != nil{
+						return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
 					}
 				}else{
 					return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
 				}
+				if tempArr, err := p.addArrayField(st, s); err !=nil{
+					return nil, err
+				}else {
+					tempSlice = append(tempSlice, tempArr.([]interface{}))
+				}
 			}
 			return tempSlice, nil
 		case *xsql.RecType:
 			var tempSlice []map[string]interface{}
 			for i, t := range srcSlice{
-				if reflect.ValueOf(t).Kind() == reflect.Map{
-					j, ok := t.(map[string]interface{})
+				jtype := reflect.ValueOf(t).Kind()
+				j := make(map[string]interface{})
+				var ok bool
+				if jtype == reflect.Map{
+					j, ok = t.(map[string]interface{})
 					if !ok {
 						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
 					}
-					r := make(map[string]interface{})
-					for _, f := range st.StreamFields {
-						n := f.Name
-						if e := p.addRecField(f.FieldType, r, j, n); e != nil{
-							return nil, e
-						}
+
+				}else if jtype == reflect.String {
+					err := json.Unmarshal([]byte(t.(string)), &j)
+					if err != nil{
+						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
 					}
-					tempSlice = append(tempSlice, r)
 				}else{
-					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
+				}
+				r := make(map[string]interface{})
+				for _, f := range st.StreamFields {
+					n := f.Name
+					if e := p.addRecField(f.FieldType, r, j, n); e != nil{
+						return nil, e
+					}
 				}
+				tempSlice = append(tempSlice, r)
 			}
 			return tempSlice, nil
 		default:
@@ -233,8 +286,15 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 		case xsql.BIGINT:
 			var tempSlice []int
 			for i, t := range srcSlice {
-				if reflect.ValueOf(t).Kind() == reflect.Float64{
+				jtype := reflect.ValueOf(t).Kind()
+				if jtype == reflect.Float64{
 					tempSlice = append(tempSlice, int(t.(float64)))
+				}else if jtype == reflect.String {
+					if v, err := strconv.Atoi(t.(string)); err != nil{
+						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
+					}else{
+						tempSlice = append(tempSlice, v)
+					}
 				}else{
 					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
 				}
@@ -243,8 +303,15 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 		case xsql.FLOAT:
 			var tempSlice []float64
 			for i, t := range srcSlice {
-				if reflect.ValueOf(t).Kind() == reflect.Float64{
+				jtype := reflect.ValueOf(t).Kind()
+				if jtype == reflect.Float64{
 					tempSlice = append(tempSlice, t.(float64))
+				}else if jtype == reflect.String {
+					if f, err := strconv.ParseFloat(t.(string), 64); err != nil{
+						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
+					}else{
+						tempSlice = append(tempSlice, f)
+					}
 				}else{
 					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
 				}
@@ -285,8 +352,15 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 		case xsql.BOOLEAN:
 			var tempSlice []bool
 			for i, t := range srcSlice {
-				if reflect.ValueOf(t).Kind() == reflect.Bool{
+				jtype := reflect.ValueOf(t).Kind()
+				if jtype == reflect.Bool{
 					tempSlice = append(tempSlice, t.(bool))
+				}else if jtype == reflect.String {
+					if v, err := strconv.ParseBool(t.(string)); err != nil{
+						return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
+					}else{
+						tempSlice = append(tempSlice, v)
+					}
 				}else{
 					return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
 				}

+ 65 - 5
xsql/plans/preprocessor_test.go

@@ -2,9 +2,10 @@ package plans
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xsql"
-	"fmt"
+	"github.com/emqx/kuiper/xstream/contexts"
 	"log"
 	"reflect"
 	"testing"
@@ -63,6 +64,21 @@ func TestPreprocessor_Apply(t *testing.T) {
 				Name: xsql.StreamName("demo"),
 				StreamFields: []xsql.StreamField{
 					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
+					{Name: "def", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+				},
+			},
+			data: []byte(`{"abc": "34", "def" : "hello", "ghi": "50"}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"abc": float64(34),
+				"def": "hello",
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
 					{Name: "def", FieldType: &xsql.BasicType{Type: xsql.BOOLEAN}},
 				},
 			},
@@ -100,6 +116,26 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
+		//Rec type
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.RecType{
+						StreamFields: []xsql.StreamField{
+							{Name: "b", FieldType: &xsql.BasicType{Type: xsql.FLOAT}},
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": "{\"b\" : \"32\"}"}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}{
+					"b": float64(32),
+				},
+			},
+			},
+		},
 		//Array of complex type
 		{
 			stmt: &xsql.StreamStmt{
@@ -124,6 +160,24 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 			},
 		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.ArrayType{
+						Type: xsql.FLOAT,
+					}},
+				},
+			},
+			data: []byte(`{"a": "[\"55\", \"77\"]"}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": []float64{
+					55,
+					77,
+				},
+			},
+			},
+		},
 		//Rec of complex type
 		{
 			stmt: &xsql.StreamStmt{
@@ -157,6 +211,8 @@ func TestPreprocessor_Apply(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
 		pp := &Preprocessor{streamStmt: tt.stmt}
@@ -167,7 +223,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			if !reflect.DeepEqual(tt.result, result) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tuple, tt.result, result)
 			}
@@ -276,6 +332,8 @@ func TestPreprocessorTime_Apply(t *testing.T){
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
 		pp := &Preprocessor{streamStmt: tt.stmt}
@@ -286,7 +344,7 @@ func TestPreprocessorTime_Apply(t *testing.T){
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok{
 				if rtt, ok := rt.Message["abc"].(time.Time); ok{
@@ -424,9 +482,11 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
 
 	defer common.CloseLogger()
+	contextLogger := common.Log.WithField("rule", "TestPreprocessorEventtime_Apply")
+	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 
-		pp, err := NewPreprocessor(tt.stmt, true)
+		pp, err := NewPreprocessor(tt.stmt, nil,true)
 		if err != nil{
 			t.Error(err)
 		}
@@ -437,7 +497,7 @@ func TestPreprocessorEventtime_Apply(t *testing.T) {
 			return
 		} else {
 			tuple := &xsql.Tuple{Message:dm}
-			result := pp.Apply(nil, tuple)
+			result := pp.Apply(ctx, tuple)
 			//workaround make sure all the timezone are the same for time vars or the DeepEqual will be false.
 			if rt, ok := result.(*xsql.Tuple); ok{
 				if rtt, ok := rt.Message["abc"].(time.Time); ok{