浏览代码

merge code from edgex

RockyJin 5 年之前
父节点
当前提交
b14a418a14

+ 19 - 1
docs/en_US/sqls/data_types.md

@@ -18,7 +18,25 @@ Below is the list of data types supported.
 | 6    | array     | The array type, can be any types from simple data or struct type (#1 - #5, and #7). |
 | 7    | struct    | The complex type. Set of name/value pairs. Values must be of supported data type. |
 
-
+## Compatibility of comparison and calculation
+
+There may be binary operations in each sql clause. In this example, `Select temperature * 2 from demo where temperature > 20`, a calculation operation is used in select clause and a comparison operation is used in the where clause. In the binary operations, if incompatible data types are used, a runtime error will happen and send to the sinks.
+
+Array and struct are not supported in any binary operations. The compatibility of other data types are listed in below table. Whereas, the row header is the left operand data type and the column header is the right operand data. The value is the compatibility in which Y stands for yes and N stands for no.
+
+ #    | bigint | float | string | datetime | boolean |
+ ---- | ------ | ----  | ----   | ----     | ---     |
+ bigint|  Y    |  Y    |   N    |   N      |   N     |
+ float |  Y    |  Y    |   N    |   N      |   N     |
+ string|  N    |  N    |   Y    |   N      |   N     |
+ datetime| Y   |  Y    | Y if in the valid format | Y | N |
+ boolean| N    |  N    |   N    |   N      |   N     |
+ 
+ The default format for datetime string is ``"2006-01-02T15:04:05.000Z07:00"``
+ 
+ For `nil` value, we follow the rules:
+ 1. Compare with nil always return false
+ 2. Calculate with nil always return nil
 
 ## Type conversions
 

+ 10 - 0
docs/en_US/sqls/streams.md

@@ -83,5 +83,15 @@ array: zero length array
 struct: null value
 ```
 
+### Schema-less stream
+If the data type of the stream is unknown or varying, we can define it without the fields. This is called schema-less. It is defined by leaving the fields empty.
+```sql
+schemaless_stream 
+  ()
+WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");
+```
+
+Schema-less stream field data type will be determined at runtime. If the field is used in an incompatible clause, a runtime error will be thrown and send to the sink. For example, ``where temperature > 30``. Once a temperature is not a number, an error will be sent to the sink.
+
 See [Query languange element](query_language_elements.md) for more inforamtion of SQL language.
 

+ 18 - 24
docs/en_US/sqls/windows.md

@@ -26,13 +26,6 @@ Tumbling window functions are used to segment a data stream into distinct time s
 
 ![Tumbling Window](resources/tumblingWindow.png)
 
-TODO: 
-
-- TIMESTAMP BY is required?
-- Count function is not supported.21
-
-
-
 ```sql
 SELECT count(*) FROM demo GROUP BY ID, TUMBLINGWINDOW(ss, 10);
 ```
@@ -43,12 +36,6 @@ Hopping window functions hop forward in time by a fixed period. It may be easy t
 
 ![Hopping Window](resources/hoppingWindow.png)
 
-TODO: 
-
-- TIMESTAMP BY is required?
-- Count function is not supported.
-
-
 
 ```sql
 SELECT count(*) FROM demo GROUP BY ID, HOPPINGWINDOW(ss, 10, 5);
@@ -62,10 +49,6 @@ Sliding window functions, unlike Tumbling or Hopping windows, produce an output
 
 ![Sliding Window](resources/slidingWindow.png)
 
-TODO: 
-
-- TIMESTAMP BY is required?
-- Count function is not supported.
 
 ```sql
 SELECT count(*) FROM demo GROUP BY ID, SLIDINGWINDOW(mm, 1);
@@ -79,12 +62,6 @@ Session window functions group events that arrive at similar times, filtering ou
 
 ![Session Window](resources/sessionWindow.png)
 
-TODO: 
-
-- TIMESTAMP BY is required?
-- Count function is not supported.
-
-
 
 ```sql
 SELECT count(*) FROM demo GROUP BY ID, SESSIONWINDOW(mm, 2, 1);
@@ -94,4 +71,21 @@ SELECT count(*) FROM demo GROUP BY ID, SESSIONWINDOW(mm, 2, 1);
 
 A session window begins when the first event occurs. If another event occurs within the specified timeout from the last ingested event, then the window extends to include the new event. Otherwise if no events occur within the timeout, then the window is closed at the timeout.
 
-If events keep occurring within the specified timeout, the session window will keep extending until maximum duration is reached. The maximum duration checking intervals are set to be the same size as the specified max duration. For example, if the max duration is 10, then the checks on if the window exceed maximum duration will happen at t = 0, 10, 20, 30, etc.
+If events keep occurring within the specified timeout, the session window will keep extending until maximum duration is reached. The maximum duration checking intervals are set to be the same size as the specified max duration. For example, if the max duration is 10, then the checks on if the window exceed maximum duration will happen at t = 0, 10, 20, 30, etc.
+
+## Timestamp Management
+
+Every event has a timestamp associated with it. The timestamp will be used to calculate the window. By default, a timestamp will be added when an event feed into the source which is called `processing time`. We also support to specify a field as the timestamp, which is called `event time`. The timestamp field is specified in the stream definition. In the below definition, the field `ts` is specified as the timestamp field.
+
+``
+CREATE STREAM demo (
+					color STRING,
+					size BIGINT,
+					ts BIGINT
+				) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts", TIMESTAMP="ts"
+``
+
+In event time mode, the watermark algorithm is used to calculate a window.
+
+## Runtime error in window
+If the window receive an error (for example, the data type does not comply to the stream definition) from upstream, the error event will be forwarded immediately to the sink. The current window calculation will ignore the error event.

+ 3 - 1
xsql/ast.go

@@ -933,7 +933,7 @@ func (a multiValuer) Call(name string, args []interface{}) (interface{}, bool) {
 			if v, ok := valuer.Call(name, args); ok {
 				return v, true
 			} else {
-				return fmt.Errorf("found error \"%s\" when call func %s", v, name), false
+				return fmt.Errorf("call func %s error: %v", name, v), false
 			}
 		}
 	}
@@ -968,6 +968,8 @@ func (a *multiAggregateValuer) Call(name string, args []interface{}) (interface{
 		if a, ok := valuer.(AggregateCallValuer); ok {
 			if v, ok := a.Call(name, args); ok {
 				return v, true
+			} else {
+				return fmt.Errorf("call func %s error: %v", name, v), false
 			}
 		} else if c, ok := valuer.(CallValuer); ok {
 			if singleArgs == nil {

+ 122 - 62
xsql/funcs_aggregate.go

@@ -24,14 +24,20 @@ func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interfac
 		if len(arg0) > 0 {
 			v := getFirstValidArg(arg0)
 			switch v.(type) {
-			case int:
-				return sliceIntTotal(arg0) / len(arg0), true
-			case int64:
-				return sliceIntTotal(arg0) / len(arg0), true
+			case int, int64:
+				if r, err := sliceIntTotal(arg0); err != nil {
+					return err, false
+				} else {
+					return r / len(arg0), true
+				}
 			case float64:
-				return sliceFloatTotal(arg0) / float64(len(arg0)), true
+				if r, err := sliceFloatTotal(arg0); err != nil {
+					return err, false
+				} else {
+					return r / float64(len(arg0)), true
+				}
 			default:
-				return fmt.Errorf("invalid data type for avg function"), false
+				return fmt.Errorf("run avg function error: found invalid arg %[1]T(%[1]v)", v), false
 			}
 		}
 		return 0, true
@@ -44,49 +50,87 @@ func (v AggregateFunctionValuer) Call(name string, args []interface{}) (interfac
 			v := getFirstValidArg(arg0)
 			switch t := v.(type) {
 			case int:
-				return sliceIntMax(arg0, t), true
+				if r, err := sliceIntMax(arg0, t); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			case int64:
-				return sliceIntMax(arg0, int(t)), true
+				if r, err := sliceIntMax(arg0, int(t)); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			case float64:
-				return sliceFloatMax(arg0, t), true
+				if r, err := sliceFloatMax(arg0, t); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			case string:
-				return sliceStringMax(arg0, t), true
+				if r, err := sliceStringMax(arg0, t); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			default:
-				return fmt.Errorf("unsupported data type for avg function"), false
+				return fmt.Errorf("run max function error: found invalid arg %[1]T(%[1]v)", v), false
 			}
 		}
-		return fmt.Errorf("empty data for max function"), false
+		return fmt.Errorf("run max function error: empty data"), false
 	case "min":
 		arg0 := args[0].([]interface{})
 		if len(arg0) > 0 {
 			v := getFirstValidArg(arg0)
 			switch t := v.(type) {
 			case int:
-				return sliceIntMin(arg0, t), true
+				if r, err := sliceIntMin(arg0, t); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			case int64:
-				return sliceIntMin(arg0, int(t)), true
+				if r, err := sliceIntMin(arg0, int(t)); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			case float64:
-				return sliceFloatMin(arg0, t), true
+				if r, err := sliceFloatMin(arg0, t); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			case string:
-				return sliceStringMin(arg0, t), true
+				if r, err := sliceStringMin(arg0, t); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			default:
-				return fmt.Errorf("unsupported data type for avg function"), false
+				return fmt.Errorf("run min function error: found invalid arg %[1]T(%[1]v)", v), false
 			}
 		}
-		return fmt.Errorf("empty data for max function"), false
+		return fmt.Errorf("run min function error: empty data"), false
 	case "sum":
 		arg0 := args[0].([]interface{})
 		if len(arg0) > 0 {
 			v := getFirstValidArg(arg0)
 			switch v.(type) {
-			case int:
-				return sliceIntTotal(arg0), true
-			case int64:
-				return sliceIntTotal(arg0), true
+			case int, int64:
+				if r, err := sliceIntTotal(arg0); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			case float64:
-				return sliceFloatTotal(arg0), true
+				if r, err := sliceFloatTotal(arg0); err != nil {
+					return err, false
+				} else {
+					return r, true
+				}
 			default:
-				return fmt.Errorf("invalid data type for sum function"), false
+				return fmt.Errorf("run sum function error: found invalid arg %[1]T(%[1]v)", v), false
 			}
 		}
 		return 0, true
@@ -122,84 +166,100 @@ func getFirstValidArg(s []interface{}) interface{} {
 	return nil
 }
 
-func sliceIntTotal(s []interface{}) int {
+func sliceIntTotal(s []interface{}) (int, error) {
 	var total int
 	for _, v := range s {
-		if v, ok := v.(int); ok {
-			total += v
+		if vi, ok := v.(int); ok {
+			total += vi
+		} else {
+			return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
 		}
 	}
-	return total
+	return total, nil
 }
 
-func sliceFloatTotal(s []interface{}) float64 {
+func sliceFloatTotal(s []interface{}) (float64, error) {
 	var total float64
 	for _, v := range s {
-		if v, ok := v.(float64); ok {
-			total += v
+		if vf, ok := v.(float64); ok {
+			total += vf
+		} else {
+			return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
 		}
 	}
-	return total
+	return total, nil
 }
-func sliceIntMax(s []interface{}, max int) int {
+func sliceIntMax(s []interface{}, max int) (int, error) {
 	for _, v := range s {
-		if v, ok := v.(int); ok {
-			if max < v {
-				max = v
+		if vi, ok := v.(int); ok {
+			if max < vi {
+				max = vi
 			}
+		} else {
+			return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
 		}
 	}
-	return max
+	return max, nil
 }
-func sliceFloatMax(s []interface{}, max float64) float64 {
+func sliceFloatMax(s []interface{}, max float64) (float64, error) {
 	for _, v := range s {
-		if v, ok := v.(float64); ok {
-			if max < v {
-				max = v
+		if vf, ok := v.(float64); ok {
+			if max < vf {
+				max = vf
 			}
+		} else {
+			return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
 		}
 	}
-	return max
+	return max, nil
 }
 
-func sliceStringMax(s []interface{}, max string) string {
+func sliceStringMax(s []interface{}, max string) (string, error) {
 	for _, v := range s {
-		if v, ok := v.(string); ok {
-			if max < v {
-				max = v
+		if vs, ok := v.(string); ok {
+			if max < vs {
+				max = vs
 			}
+		} else {
+			return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
 		}
 	}
-	return max
+	return max, nil
 }
-func sliceIntMin(s []interface{}, min int) int {
+func sliceIntMin(s []interface{}, min int) (int, error) {
 	for _, v := range s {
-		if v, ok := v.(int); ok {
-			if min > v {
-				min = v
+		if vi, ok := v.(int); ok {
+			if min > vi {
+				min = vi
 			}
+		} else {
+			return 0, fmt.Errorf("requires int but found %[1]T(%[1]v)", v)
 		}
 	}
-	return min
+	return min, nil
 }
-func sliceFloatMin(s []interface{}, min float64) float64 {
+func sliceFloatMin(s []interface{}, min float64) (float64, error) {
 	for _, v := range s {
-		if v, ok := v.(float64); ok {
-			if min > v {
-				min = v
+		if vf, ok := v.(float64); ok {
+			if min > vf {
+				min = vf
 			}
+		} else {
+			return 0, fmt.Errorf("requires float64 but found %[1]T(%[1]v)", v)
 		}
 	}
-	return min
+	return min, nil
 }
 
-func sliceStringMin(s []interface{}, min string) string {
+func sliceStringMin(s []interface{}, min string) (string, error) {
 	for _, v := range s {
-		if v, ok := v.(string); ok {
-			if min < v {
-				min = v
+		if vs, ok := v.(string); ok {
+			if min < vs {
+				min = vs
 			}
+		} else {
+			return "", fmt.Errorf("requires string but found %[1]T(%[1]v)", v)
 		}
 	}
-	return min
+	return min, nil
 }

+ 1 - 3
xsql/plans/having_operator.go

@@ -19,7 +19,7 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 	case xsql.GroupedTuplesSet:
 		r := xsql.GroupedTuplesSet{}
 		for _, v := range input {
-			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: v})}
+			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(v, v[0], &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: v}, &xsql.WildcardValuer{Data: v[0]})}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
 			case error:
@@ -42,7 +42,6 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		ms := input[0].Tuples
 		r := ms[:0]
 		for _, v := range ms {
-			//ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {
@@ -64,7 +63,6 @@ func (p *HavingPlan) Apply(ctx api.StreamContext, data interface{}) interface{}
 		ms := input
 		r := ms[:0]
 		for _, v := range ms {
-			//ve := &xsql.ValuerEval{Valuer: xsql.MultiValuer(&v, &xsql.FunctionValuer{})}
 			ve := &xsql.ValuerEval{Valuer: xsql.MultiAggregateValuer(input, &v, &xsql.FunctionValuer{}, &xsql.AggregateFunctionValuer{Data: input}, &xsql.WildcardValuer{Data: &v})}
 			result := ve.Eval(p.Condition)
 			switch val := result.(type) {

+ 134 - 1
xsql/plans/having_test.go

@@ -112,7 +112,6 @@ func TestHavingPlan_Apply(t *testing.T) {
 			},
 			result: nil,
 		},
-
 		{
 			sql: `SELECT id1 FROM src1 HAVING max(id1) = 1`,
 			data: xsql.WindowTuplesSet{
@@ -137,6 +136,119 @@ func TestHavingPlan_Apply(t *testing.T) {
 					},
 				},
 			},
+		}, {
+			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": "v1"},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": "v1"},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2"},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2"},
+					},
+				},
+			},
+		}, {
+			sql: "SELECT count(*) as c, round(a) as r FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color having a > 100",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+						},
+					},
+				},
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 2, "a": 89.03}},
+							{Emitter: "src2", Message: xsql.Message{"id": 2, "color": "w1"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 4, "a": 14.6}},
+							{Emitter: "src2", Message: xsql.Message{"id": 4, "color": "w1"}},
+						},
+					},
+				},
+			},
+			result: xsql.GroupedTuplesSet{
+				{
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+							{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+						},
+					},
+					&xsql.JoinTuple{
+						Tuples: []xsql.Tuple{
+							{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+							{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+						},
+					},
+				},
+			},
+		}, {
+			sql: "SELECT * FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10) having a > 100",
+			data: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 68.55}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
+
+			result: xsql.JoinTupleSets{
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 1, "a": 122.33}},
+						{Emitter: "src2", Message: xsql.Message{"id": 1, "color": "w2"}},
+					},
+				},
+				xsql.JoinTuple{
+					Tuples: []xsql.Tuple{
+						{Emitter: "test", Message: xsql.Message{"id": 5, "a": 177.51}},
+						{Emitter: "src2", Message: xsql.Message{"id": 5, "color": "w2"}},
+					},
+				},
+			},
 		},
 	}
 
@@ -188,6 +300,27 @@ func TestHavingPlanError(t *testing.T) {
 			sql:    `SELECT id1 FROM src1 HAVING avg(id1) > "str"`,
 			data:   errors.New("an error from upstream"),
 			result: errors.New("an error from upstream"),
+		}, {
+			sql: "SELECT id1 FROM src1 GROUP BY TUMBLINGWINDOW(ss, 10), f1 having f1 = \"v2\"",
+			data: xsql.GroupedTuplesSet{
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 1, "f1": 3},
+					},
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 3, "f1": 3},
+					},
+				},
+				{
+					&xsql.Tuple{
+						Emitter: "src1",
+						Message: xsql.Message{"id1": 2, "f1": "v2"},
+					},
+				},
+			},
+			result: errors.New("run Having error: invalid operation int64(3) = string(v2)"),
 		},
 	}
 

+ 39 - 28
xsql/plans/project_test.go

@@ -1040,6 +1040,17 @@ func TestProjectPlan_Funcs(t *testing.T) {
 			}, {
 				"concat": "388.886",
 			}},
+		}, {
+			sql: "SELECT count(a) as r FROM test",
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": 47.5,
+				},
+			},
+			result: []map[string]interface{}{{
+				"r": float64(1),
+			}},
 		},
 	}
 
@@ -1051,7 +1062,7 @@ func TestProjectPlan_Funcs(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		result := pp.Apply(ctx, tt.data)
 		var mapRes []map[string]interface{}
@@ -1269,7 +1280,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 					Tuples: []xsql.Tuple{
 						{
 							Emitter: "src1",
-							Message: xsql.Message{"b": 53},
+							Message: xsql.Message{"a": 53},
 						}, {
 							Emitter: "src1",
 							Message: xsql.Message{"a": 27},
@@ -1281,28 +1292,8 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				},
 			},
 			result: []map[string]interface{}{{
-				"sum": float64(123150),
+				"sum": float64(123203),
 			}},
-		}, {
-			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
-			data: xsql.WindowTuplesSet{
-				xsql.WindowTuples{
-					Emitter: "test",
-					Tuples: []xsql.Tuple{
-						{
-							Emitter: "src1",
-							Message: xsql.Message{"a": "nan"},
-						}, {
-							Emitter: "src1",
-							Message: xsql.Message{"a": 27},
-						}, {
-							Emitter: "src1",
-							Message: xsql.Message{"a": 123123},
-						},
-					},
-				},
-			},
-			result: []map[string]interface{}{{}},
 		},
 	}
 
@@ -1331,7 +1322,7 @@ func TestProjectPlan_AggFuncs(t *testing.T) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
 			}
 		} else {
-			t.Errorf("The returned result is not type of []byte\n")
+			t.Errorf("%d. %q\n\nThe returned result is not type of []byte: %#v\n", i, tt.sql, result)
 		}
 	}
 }
@@ -1372,7 +1363,7 @@ func TestProjectPlanError(t *testing.T) {
 					"a": "common string",
 				},
 			},
-			result: errors.New("run Select error: found error \"only float64 & int type are supported\" when call func round"),
+			result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
 		}, {
 			sql: `SELECT round(a) as r FROM test`,
 			data: &xsql.Tuple{
@@ -1381,7 +1372,7 @@ func TestProjectPlanError(t *testing.T) {
 					"abc": "common string",
 				},
 			},
-			result: errors.New("run Select error: found error \"only float64 & int type are supported\" when call func round"),
+			result: errors.New("run Select error: call func round error: only float64 & int type are supported"),
 		}, {
 			sql: "SELECT avg(a) as avg FROM test Inner Join test1 on test.id = test1.id GROUP BY TumblingWindow(ss, 10), test1.color",
 			data: xsql.GroupedTuplesSet{
@@ -1426,7 +1417,27 @@ func TestProjectPlanError(t *testing.T) {
 					},
 				},
 			},
-			result: errors.New("run Select error: found error \"%!s(<nil>)\" when call func avg"),
+			result: errors.New("run Select error: call func avg error: requires float64 but found string(dde)"),
+		}, {
+			sql: "SELECT sum(a) as sum FROM test GROUP BY TumblingWindow(ss, 10)",
+			data: xsql.WindowTuplesSet{
+				xsql.WindowTuples{
+					Emitter: "test",
+					Tuples: []xsql.Tuple{
+						{
+							Emitter: "src1",
+							Message: xsql.Message{"a": 53},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": "ddd"},
+						}, {
+							Emitter: "src1",
+							Message: xsql.Message{"a": 123123},
+						},
+					},
+				},
+			},
+			result: errors.New("run Select error: call func sum error: requires int but found string(ddd)"),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -1435,7 +1446,7 @@ func TestProjectPlanError(t *testing.T) {
 	for i, tt := range tests {
 		stmt, _ := xsql.NewParser(strings.NewReader(tt.sql)).Parse()
 
-		pp := &ProjectPlan{Fields: stmt.Fields}
+		pp := &ProjectPlan{Fields: stmt.Fields, IsAggregate: xsql.IsAggStatement(stmt)}
 		pp.isTest = true
 		result := pp.Apply(ctx, tt.data)
 		if !reflect.DeepEqual(tt.result, result) {

+ 54 - 64
xsql/processors/xsql_processor_test.go

@@ -1762,67 +1762,57 @@ func TestWindowError(t *testing.T) {
 				"op_join_0_records_in_total":   int64(10),
 				"op_join_0_records_out_total":  int64(5),
 			},
-			//}, {
-			//	name: `rule4`,
-			//	sql:  `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having color > 5`,
-			//	size: 5,
-			//	r: [][]map[string]interface{}{
-			//		{{
-			//			"color": "red",
-			//		}}, {{
-			//			"color": "blue",
-			//		}, {
-			//			"color": "red",
-			//		}}, {{
-			//			"color": "blue",
-			//		}, {
-			//			"color": "red",
-			//		}}, {{
-			//			"color": "blue",
-			//		}, {
-			//			"color": "yellow",
-			//		}}, {{
-			//			"color": "blue",
-			//		}, {
-			//			"color": "red",
-			//		}, {
-			//			"color": "yellow",
-			//		}},
-			//	},
-			//	m: map[string]interface{}{
-			//		"op_preprocessor_demo_0_exceptions_total":   int64(0),
-			//		"op_preprocessor_demo_0_process_latency_ms": int64(0),
-			//		"op_preprocessor_demo_0_records_in_total":   int64(5),
-			//		"op_preprocessor_demo_0_records_out_total":  int64(5),
-			//
-			//		"op_project_0_exceptions_total":   int64(0),
-			//		"op_project_0_process_latency_ms": int64(0),
-			//		"op_project_0_records_in_total":   int64(5),
-			//		"op_project_0_records_out_total":  int64(5),
-			//
-			//		"sink_mockSink_0_exceptions_total":  int64(0),
-			//		"sink_mockSink_0_records_in_total":  int64(5),
-			//		"sink_mockSink_0_records_out_total": int64(5),
-			//
-			//		"source_demo_0_exceptions_total":  int64(0),
-			//		"source_demo_0_records_in_total":  int64(5),
-			//		"source_demo_0_records_out_total": int64(5),
-			//
-			//		"op_window_0_exceptions_total":   int64(0),
-			//		"op_window_0_process_latency_ms": int64(0),
-			//		"op_window_0_records_in_total":   int64(5),
-			//		"op_window_0_records_out_total":  int64(5),
-			//
-			//		"op_aggregate_0_exceptions_total":   int64(0),
-			//		"op_aggregate_0_process_latency_ms": int64(0),
-			//		"op_aggregate_0_records_in_total":   int64(5),
-			//		"op_aggregate_0_records_out_total":  int64(5),
-			//
-			//		"op_order_0_exceptions_total":   int64(0),
-			//		"op_order_0_process_latency_ms": int64(0),
-			//		"op_order_0_records_in_total":   int64(5),
-			//		"op_order_0_records_out_total":  int64(5),
-			//	},
+		}, {
+			name: `rule4`,
+			sql:  `SELECT color FROM ldemo GROUP BY SlidingWindow(ss, 2), color having size >= 2 order by color`,
+			size: 5,
+			r: [][]map[string]interface{}{
+				{{
+					"color": "red",
+				}}, {{
+					"error": "run Having error: invalid operation string(string) >= int64(2)",
+				}}, {{
+					"error": "run Having error: invalid operation string(string) >= int64(2)",
+				}}, {{
+					"error": "run Having error: invalid operation string(string) >= int64(2)",
+				}}, {{
+					"color": float64(49),
+				}, {}},
+			},
+			m: map[string]interface{}{
+				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(3),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(5),
+				"op_project_0_records_out_total":  int64(2),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
+
+				"source_ldemo_0_exceptions_total":  int64(0),
+				"source_ldemo_0_records_in_total":  int64(5),
+				"source_ldemo_0_records_out_total": int64(5),
+
+				"op_window_0_exceptions_total":   int64(0),
+				"op_window_0_process_latency_ms": int64(0),
+				"op_window_0_records_in_total":   int64(5),
+				"op_window_0_records_out_total":  int64(5),
+
+				"op_aggregate_0_exceptions_total":   int64(0),
+				"op_aggregate_0_process_latency_ms": int64(0),
+				"op_aggregate_0_records_in_total":   int64(5),
+				"op_aggregate_0_records_out_total":  int64(5),
+
+				"op_having_0_exceptions_total":   int64(3),
+				"op_having_0_process_latency_ms": int64(0),
+				"op_having_0_records_in_total":   int64(5),
+				"op_having_0_records_out_total":  int64(2),
+			},
 		}, {
 			name: `rule5`,
 			sql:  `SELECT color, size FROM ldemo GROUP BY tumblingwindow(ss, 1) ORDER BY size`,
@@ -2832,9 +2822,9 @@ func getMetric(tp *xstream.TopologyNew, name string) int {
 
 func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}, sql string) (err error) {
 	keys, values := tp.GetMetrics()
-	//for i, k := range keys{
-	//	log.Printf("%s:%v", k, values[i])
-	//}
+	for i, k := range keys {
+		log.Printf("%s:%v", k, values[i])
+	}
 	for k, v := range m {
 		var (
 			index   int

+ 1 - 3
xstream/nodes/source_node.go

@@ -146,14 +146,12 @@ func (m *SourceNode) reset() {
 	m.statManagers = nil
 }
 
-func getSource(t string) (api.Source, error) {
+func doGetSource(t string) (api.Source, error) {
 	var s api.Source
 	var ok bool
 	switch t {
 	case "mqtt":
 		s = &extensions.MQTTSource{}
-	case "edgex":
-		s = &extensions.EdgexSource{}
 	default:
 		nf, err := plugin_manager.GetPlugin(t, "sources")
 		if err != nil {

+ 15 - 0
xstream/nodes/with_edgex.go

@@ -0,0 +1,15 @@
+// +build linux
+
+package nodes
+
+import (
+	"github.com/emqx/kuiper/xstream/api"
+	"github.com/emqx/kuiper/xstream/extensions"
+)
+
+func getSource(t string) (api.Source, error) {
+	if t == "edgex" {
+		return &extensions.EdgexSource{}, nil
+	}
+	return doGetSource(t)
+}

+ 9 - 0
xstream/nodes/without_edgex.go

@@ -0,0 +1,9 @@
+// +build !linux
+
+package nodes
+
+import "github.com/emqx/kuiper/xstream/api"
+
+func getSource(t string) (api.Source, error) {
+	return doGetSource(t)
+}