浏览代码

Merge pull request #188 from emqx/nullable_support

Nullable support
jinfahua 5 年之前
父节点
当前提交
c3d1632437
共有 6 个文件被更改,包括 330 次插入21 次删除
  1. 13 1
      xsql/funcs_misc.go
  2. 1 1
      xsql/functions.go
  3. 42 3
      xsql/plans/misc_func_test.go
  4. 51 14
      xsql/plans/preprocessor.go
  5. 204 2
      xsql/plans/preprocessor_test.go
  6. 19 0
      xsql/plans/project_test.go

+ 13 - 1
xsql/funcs_misc.go

@@ -12,6 +12,7 @@ import (
 	"hash"
 	"io"
 	"math"
+	"reflect"
 	"strconv"
 	"strings"
 	"time"
@@ -192,7 +193,18 @@ func hashCall(name string, args []interface{}) (interface{}, bool) {
 func otherCall(name string, args []interface{}) (interface{}, bool) {
 	switch name {
 	case "isnull":
-		return args[0] == nil, true
+		if args[0] == nil {
+			return true, true
+		} else {
+			v := reflect.ValueOf(args[0])
+			switch v.Kind() {
+			case reflect.Slice, reflect.Map:
+				return v.IsNil(), true
+			default:
+				return false, true
+			}
+		}
+		return false, true
 	case "newuuid":
 		if uuid, err := uuid.NewUUID(); err != nil {
 			return err, false

+ 1 - 1
xsql/functions.go

@@ -56,7 +56,7 @@ var hashFuncMap = map[string]string{"md5": "",
 	"sha1": "", "sha256": "", "sha384": "", "sha512": "",
 }
 
-var otherFuncMap = map[string]string{"isNull": "",
+var otherFuncMap = map[string]string{"isnull": "",
 	"newuuid": "", "timestamp": "", "mqtt": "", "meta": "",
 }
 

+ 42 - 3
xsql/plans/misc_func_test.go

@@ -11,7 +11,7 @@ import (
 	"testing"
 )
 
-func TestHashFunc_Apply1(t *testing.T) {
+func TestMiscFunc_Apply1(t *testing.T) {
 	var tests = []struct {
 		sql    string
 		data   *xsql.Tuple
@@ -132,12 +132,51 @@ func TestHashFunc_Apply1(t *testing.T) {
 				"a":     "devices/device_001/message",
 			}},
 		},
+		{
+			sql: "SELECT isNull(arr) as r FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"temperature": 43.2,
+					"arr":         []int{},
+				},
+			},
+			result: []map[string]interface{}{{
+				"r": false,
+			}},
+		},
+		{
+			sql: "SELECT isNull(arr) as r FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"temperature": 43.2,
+					"arr":         []float64(nil),
+				},
+			},
+			result: []map[string]interface{}{{
+				"r": true,
+			}},
+		}, {
+			sql: "SELECT isNull(rec) as r FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"temperature": 43.2,
+					"rec":         map[string]interface{}(nil),
+				},
+			},
+			result: []map[string]interface{}{{
+				"r": true,
+			}},
+		},
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
-	contextLogger := common.Log.WithField("rule", "TestHashFunc_Apply1")
+	contextLogger := common.Log.WithField("rule", "TestMiscFunc_Apply1")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
+
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 		if err != nil || stmt == nil {
 			t.Errorf("parse sql %s error %v", tt.sql, err)
@@ -260,7 +299,7 @@ func TestMetaFunc_Apply1(t *testing.T) {
 	}
 
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
-	contextLogger := common.Log.WithField("rule", "TestHashFunc_Apply1")
+	contextLogger := common.Log.WithField("rule", "TestMetaFunc_Apply1")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
 		stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse()

+ 51 - 14
xsql/plans/preprocessor.go

@@ -174,7 +174,9 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 			}
 		case *xsql.ArrayType:
 			var s []interface{}
-			if jtype == reflect.Slice {
+			if t == nil {
+				s = nil
+			} else if jtype == reflect.Slice {
 				s = t.([]interface{})
 			} else if jtype == reflect.String {
 				err := json.Unmarshal([]byte(t.(string)), &s)
@@ -186,13 +188,17 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 			}
 
 			if tempArr, err := p.addArrayField(st, s); err != nil {
-				return err
+				return fmt.Errorf("fail to parse field %s: %s", n, err)
 			} else {
 				r[n] = tempArr
 			}
 		case *xsql.RecType:
 			nextJ := make(map[string]interface{})
-			if jtype == reflect.Map {
+			if t == nil {
+				nextJ = nil
+				r[n] = nextJ
+				return nil
+			} else if jtype == reflect.Map {
 				nextJ, ok = t.(map[string]interface{})
 				if !ok {
 					return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
@@ -228,11 +234,16 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 	if ft.FieldType != nil { //complex type array or struct
 		switch st := ft.FieldType.(type) { //Only two complex types supported here
 		case *xsql.ArrayType: //TODO handle array of array. Now the type is treated as interface{}
-			var tempSlice [][]interface{}
+			if srcSlice == nil {
+				return [][]interface{}(nil), nil
+			}
 			var s []interface{}
+			var tempSlice reflect.Value
 			for i, t := range srcSlice {
 				jtype := reflect.ValueOf(t).Kind()
-				if jtype == reflect.Slice || jtype == reflect.Array {
+				if t == nil {
+					s = nil
+				} else if jtype == reflect.Slice || jtype == reflect.Array {
 					s = t.([]interface{})
 				} else if jtype == reflect.String {
 					err := json.Unmarshal([]byte(t.(string)), &s)
@@ -245,17 +256,28 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 				if tempArr, err := p.addArrayField(st, s); err != nil {
 					return nil, err
 				} else {
-					tempSlice = append(tempSlice, tempArr.([]interface{}))
+					if !tempSlice.IsValid() {
+						s := reflect.TypeOf(tempArr)
+						tempSlice = reflect.MakeSlice(reflect.SliceOf(s), 0, 0)
+					}
+					tempSlice = reflect.Append(tempSlice, reflect.ValueOf(tempArr))
 				}
 			}
-			return tempSlice, nil
+			return tempSlice.Interface(), nil
 		case *xsql.RecType:
-			var tempSlice []map[string]interface{}
+			if srcSlice == nil {
+				return []map[string]interface{}(nil), nil
+			}
+			tempSlice := make([]map[string]interface{}, 0)
 			for i, t := range srcSlice {
 				jtype := reflect.ValueOf(t).Kind()
 				j := make(map[string]interface{})
 				var ok bool
-				if jtype == reflect.Map {
+				if t == nil {
+					j = nil
+					tempSlice = append(tempSlice, j)
+					continue
+				} else if jtype == reflect.Map {
 					j, ok = t.(map[string]interface{})
 					if !ok {
 						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
@@ -287,7 +309,10 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 		case xsql.UNKNOWN:
 			return nil, fmt.Errorf("invalid data type unknown defined for %s, please checke the stream definition", srcSlice)
 		case xsql.BIGINT:
-			var tempSlice []int
+			if srcSlice == nil {
+				return []int(nil), nil
+			}
+			tempSlice := make([]int, 0)
 			for i, t := range srcSlice {
 				jtype := reflect.ValueOf(t).Kind()
 				if jtype == reflect.Float64 {
@@ -304,7 +329,10 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 			}
 			return tempSlice, nil
 		case xsql.FLOAT:
-			var tempSlice []float64
+			if srcSlice == nil {
+				return []float64(nil), nil
+			}
+			tempSlice := make([]float64, 0)
 			for i, t := range srcSlice {
 				jtype := reflect.ValueOf(t).Kind()
 				if jtype == reflect.Float64 {
@@ -321,7 +349,10 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 			}
 			return tempSlice, nil
 		case xsql.STRINGS:
-			var tempSlice []string
+			if srcSlice == nil {
+				return []string(nil), nil
+			}
+			tempSlice := make([]string, 0)
 			for i, t := range srcSlice {
 				if reflect.ValueOf(t).Kind() == reflect.String {
 					tempSlice = append(tempSlice, t.(string))
@@ -331,7 +362,10 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 			}
 			return tempSlice, nil
 		case xsql.DATETIME:
-			var tempSlice []time.Time
+			if srcSlice == nil {
+				return []time.Time(nil), nil
+			}
+			tempSlice := make([]time.Time, 0)
 			for i, t := range srcSlice {
 				jtype := reflect.ValueOf(t).Kind()
 				switch jtype {
@@ -353,7 +387,10 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 			}
 			return tempSlice, nil
 		case xsql.BOOLEAN:
-			var tempSlice []bool
+			if srcSlice == nil {
+				return []bool(nil), nil
+			}
+			tempSlice := make([]bool, 0)
 			for i, t := range srcSlice {
 				jtype := reflect.ValueOf(t).Kind()
 				if jtype == reflect.Bool {

+ 204 - 2
xsql/plans/preprocessor_test.go

@@ -33,6 +33,16 @@ func TestPreprocessor_Apply(t *testing.T) {
 		},
 		{
 			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "abc", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
+				},
+			},
+			data:   []byte(`{"abc": null}`),
+			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found <nil>(<nil>)"),
+		},
+		{
+			stmt: &xsql.StreamStmt{
 				Name:         xsql.StreamName("demo"),
 				StreamFields: nil,
 			},
@@ -164,7 +174,6 @@ func TestPreprocessor_Apply(t *testing.T) {
 			},
 			},
 		},
-		//Rec type
 		{
 			stmt: &xsql.StreamStmt{
 				Name: xsql.StreamName("demo"),
@@ -223,6 +232,113 @@ func TestPreprocessor_Apply(t *testing.T) {
 		},
 		{
 			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.ArrayType{
+						Type: xsql.STRUCT,
+						FieldType: &xsql.RecType{
+							StreamFields: []xsql.StreamField{
+								{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+							},
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": []}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": make([]map[string]interface{}, 0),
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.ArrayType{
+						Type: xsql.STRUCT,
+						FieldType: &xsql.RecType{
+							StreamFields: []xsql.StreamField{
+								{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+							},
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": null}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": []map[string]interface{}(nil),
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.ArrayType{
+						Type: xsql.STRUCT,
+						FieldType: &xsql.RecType{
+							StreamFields: []xsql.StreamField{
+								{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+							},
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": [null, {"b" : "hello2"}]}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": []map[string]interface{}{
+					nil,
+					{"b": "hello2"},
+				},
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.ArrayType{
+						Type: xsql.ARRAY,
+						FieldType: &xsql.ArrayType{
+							Type: xsql.BIGINT,
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": [[50, 60, 70],[66], [77]]}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": [][]int{
+					{50, 60, 70},
+					{66},
+					{77},
+				},
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.ArrayType{
+						Type: xsql.ARRAY,
+						FieldType: &xsql.ArrayType{
+							Type: xsql.BIGINT,
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": [null, [66], [77]]}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": [][]int{
+					[]int(nil),
+					{66},
+					{77},
+				},
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
 				Name:         xsql.StreamName("demo"),
 				StreamFields: nil,
 			},
@@ -297,6 +413,93 @@ func TestPreprocessor_Apply(t *testing.T) {
 		},
 		{
 			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.RecType{
+						StreamFields: []xsql.StreamField{
+							{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+							{Name: "c", FieldType: &xsql.RecType{
+								StreamFields: []xsql.StreamField{
+									{Name: "d", FieldType: &xsql.BasicType{Type: xsql.BIGINT}},
+								},
+							}},
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": null}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}(nil),
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.RecType{
+						StreamFields: []xsql.StreamField{
+							{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+							{Name: "c", FieldType: &xsql.ArrayType{
+								Type: xsql.FLOAT,
+							}},
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": {"b" : "hello", "c": [35.2, 38.2]}}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}{
+					"b": "hello",
+					"c": []float64{
+						35.2, 38.2,
+					},
+				},
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.RecType{
+						StreamFields: []xsql.StreamField{
+							{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+							{Name: "c", FieldType: &xsql.ArrayType{
+								Type: xsql.FLOAT,
+							}},
+						},
+					}},
+				},
+			},
+			data: []byte(`{"a": {"b" : "hello", "c": null}}`),
+			result: &xsql.Tuple{Message: xsql.Message{
+				"a": map[string]interface{}{
+					"b": "hello",
+					"c": []float64(nil),
+				},
+			},
+			},
+		},
+		{
+			stmt: &xsql.StreamStmt{
+				Name: xsql.StreamName("demo"),
+				StreamFields: []xsql.StreamField{
+					{Name: "a", FieldType: &xsql.RecType{
+						StreamFields: []xsql.StreamField{
+							{Name: "b", FieldType: &xsql.BasicType{Type: xsql.STRINGS}},
+							{Name: "c", FieldType: &xsql.ArrayType{
+								Type: xsql.FLOAT,
+							}},
+						},
+					}},
+				},
+			},
+			data:   []byte(`{"a": {"b" : "hello", "c": [null, 35.4]}}`),
+			result: errors.New("error in preprocessor: fail to parse field c: invalid data type for [0], expect float but found <nil>(<nil>)"),
+		},
+		{
+			stmt: &xsql.StreamStmt{
 				Name:         xsql.StreamName("demo"),
 				StreamFields: nil,
 			},
@@ -319,7 +522,6 @@ func TestPreprocessor_Apply(t *testing.T) {
 	contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply")
 	ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger)
 	for i, tt := range tests {
-
 		pp := &Preprocessor{streamStmt: tt.stmt}
 
 		dm := make(map[string]interface{})

+ 19 - 0
xsql/plans/project_test.go

@@ -158,6 +158,16 @@ func TestProjectPlan_Apply1(t *testing.T) {
 			data: &xsql.Tuple{
 				Emitter: "test",
 				Message: xsql.Message{
+					"a": map[string]interface{}(nil),
+				},
+			},
+			result: []map[string]interface{}{{}},
+		},
+		{
+			sql: `SELECT a->b AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
 					"name": "name",
 				},
 			},
@@ -1543,6 +1553,15 @@ func TestProjectPlanError(t *testing.T) {
 				},
 			},
 			result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
+		}, {
+			sql: `SELECT a[0]->b AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": []map[string]interface{}(nil),
+				},
+			},
+			result: errors.New("run Select error: out of index: 0 of 0"),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))