Browse Source

feat(stream): support error sink

ngjaying 5 years atrás
parent
commit
61eb5e40f1

+ 28 - 28
xsql/plans/preprocessor.go

@@ -115,30 +115,30 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 					r[n] = int(t.(float64))
 				} else if jtype == reflect.String {
 					if i, err := strconv.Atoi(t.(string)); err != nil {
-						return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
+						return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
 					} else {
 						r[n] = i
 					}
 				} else {
-					return fmt.Errorf("invalid data type for %s, expect bigint but found %s", n, t)
+					return fmt.Errorf("invalid data type for %s, expect bigint but found %[2]T(%[2]v)", n, t)
 				}
 			case xsql.FLOAT:
 				if jtype == reflect.Float64 {
 					r[n] = t.(float64)
 				} else if jtype == reflect.String {
 					if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
-						return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
+						return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
 					} else {
 						r[n] = f
 					}
 				} else {
-					return fmt.Errorf("invalid data type for %s, expect float but found %s", n, t)
+					return fmt.Errorf("invalid data type for %s, expect float but found %[2]T(%[2]v)", n, t)
 				}
 			case xsql.STRINGS:
 				if jtype == reflect.String {
 					r[n] = t.(string)
 				} else {
-					return fmt.Errorf("invalid data type for %s, expect string but found %s", n, t)
+					return fmt.Errorf("invalid data type for %s, expect string but found %[2]T(%[2]v)", n, t)
 				}
 			case xsql.DATETIME:
 				switch jtype {
@@ -155,19 +155,19 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 						r[n] = t
 					}
 				default:
-					return fmt.Errorf("invalid data type for %s, expect datatime but find %v", n, t)
+					return fmt.Errorf("invalid data type for %s, expect datatime but find %[2]T(%[2]v)", n, t)
 				}
 			case xsql.BOOLEAN:
 				if jtype == reflect.Bool {
 					r[n] = t.(bool)
 				} else if jtype == reflect.String {
 					if i, err := strconv.ParseBool(t.(string)); err != nil {
-						return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
+						return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
 					} else {
 						r[n] = i
 					}
 				} else {
-					return fmt.Errorf("invalid data type for %s, expect boolean but found %s", n, t)
+					return fmt.Errorf("invalid data type for %s, expect boolean but found %[2]T(%[2]v)", n, t)
 				}
 			default:
 				return fmt.Errorf("invalid data type for %s, it is not supported yet", st)
@@ -179,10 +179,10 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 			} else if jtype == reflect.String {
 				err := json.Unmarshal([]byte(t.(string)), &s)
 				if err != nil {
-					return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
+					return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
 				}
 			} else {
-				return fmt.Errorf("invalid data type for %s, expect array but found %s", n, t)
+				return fmt.Errorf("invalid data type for %s, expect array but found %[2]T(%[2]v)", n, t)
 			}
 
 			if tempArr, err := p.addArrayField(st, s); err != nil {
@@ -195,15 +195,15 @@ func (p *Preprocessor) addRecField(ft xsql.FieldType, r map[string]interface{},
 			if jtype == reflect.Map {
 				nextJ, ok = t.(map[string]interface{})
 				if !ok {
-					return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
+					return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
 				}
 			} else if jtype == reflect.String {
 				err := json.Unmarshal([]byte(t.(string)), &nextJ)
 				if err != nil {
-					return fmt.Errorf("invalid data type for %s, expect map but found %s", n, t)
+					return fmt.Errorf("invalid data type for %s, expect map but found %[2]T(%[2]v)", n, t)
 				}
 			} else {
-				return fmt.Errorf("invalid data type for %s, expect struct but found %s", n, t)
+				return fmt.Errorf("invalid data type for %s, expect struct but found %[2]T(%[2]v)", n, t)
 			}
 			nextR := make(map[string]interface{})
 			for _, nextF := range st.StreamFields {
@@ -237,10 +237,10 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 				} else if jtype == reflect.String {
 					err := json.Unmarshal([]byte(t.(string)), &s)
 					if err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
+						return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
 					}
 				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect array but found %s", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect array but found %[2]T(%[2]v)", i, t)
 				}
 				if tempArr, err := p.addArrayField(st, s); err != nil {
 					return nil, err
@@ -258,16 +258,16 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 				if jtype == reflect.Map {
 					j, ok = t.(map[string]interface{})
 					if !ok {
-						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
+						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
 					}
 
 				} else if jtype == reflect.String {
 					err := json.Unmarshal([]byte(t.(string)), &j)
 					if err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
+						return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
 					}
 				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect map but found %s", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect map but found %[2]T(%[2]v)", i, t)
 				}
 				r := make(map[string]interface{})
 				for _, f := range st.StreamFields {
@@ -294,12 +294,12 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 					tempSlice = append(tempSlice, int(t.(float64)))
 				} else if jtype == reflect.String {
 					if v, err := strconv.Atoi(t.(string)); err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
+						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
 					} else {
 						tempSlice = append(tempSlice, v)
 					}
 				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
 				}
 			}
 			return tempSlice, nil
@@ -311,12 +311,12 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 					tempSlice = append(tempSlice, t.(float64))
 				} else if jtype == reflect.String {
 					if f, err := strconv.ParseFloat(t.(string), 64); err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
+						return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
 					} else {
 						tempSlice = append(tempSlice, f)
 					}
 				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %s", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect float but found %[2]T(%[2]v)", i, t)
 				}
 			}
 			return tempSlice, nil
@@ -326,7 +326,7 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 				if reflect.ValueOf(t).Kind() == reflect.String {
 					tempSlice = append(tempSlice, t.(string))
 				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect string but found %s", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect string but found %[2]T(%[2]v)", i, t)
 				}
 			}
 			return tempSlice, nil
@@ -343,12 +343,12 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 					tempSlice = append(tempSlice, common.TimeFromUnixMilli(ai))
 				case reflect.String:
 					if ai, err := p.parseTime(t.(string)); err != nil {
-						return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %s", t, err)
+						return nil, fmt.Errorf("invalid data type for %s, cannot convert to datetime: %[2]T(%[2]v)", t, err)
 					} else {
 						tempSlice = append(tempSlice, ai)
 					}
 				default:
-					return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %v", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect datetime but found %[2]T(%[2]v)", i, t)
 				}
 			}
 			return tempSlice, nil
@@ -360,17 +360,17 @@ func (p *Preprocessor) addArrayField(ft *xsql.ArrayType, srcSlice []interface{})
 					tempSlice = append(tempSlice, t.(bool))
 				} else if jtype == reflect.String {
 					if v, err := strconv.ParseBool(t.(string)); err != nil {
-						return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
+						return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
 					} else {
 						tempSlice = append(tempSlice, v)
 					}
 				} else {
-					return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %s", i, t)
+					return nil, fmt.Errorf("invalid data type for [%d], expect boolean but found %[2]T(%[2]v)", i, t)
 				}
 			}
 			return tempSlice, nil
 		default:
-			return nil, fmt.Errorf("invalid data type for %T, datetime type is not supported yet", ft.Type)
+			return nil, fmt.Errorf("invalid data type for %T", ft.Type)
 		}
 	}
 }

+ 3 - 3
xsql/plans/preprocessor_test.go

@@ -118,7 +118,7 @@ func TestPreprocessor_Apply(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"abc": 77, "def" : "hello"}`),
-			result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found hello"),
+			result: errors.New("error in preprocessor: invalid data type for def, expect boolean but found string(hello)"),
 		},
 		{
 			stmt: &xsql.StreamStmt{
@@ -663,7 +663,7 @@ func TestPreprocessorError(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"abc": "dafsad"}`),
-			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found dafsad"),
+			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(dafsad)"),
 		}, {
 			stmt: &xsql.StreamStmt{
 				Name: xsql.StreamName("demo"),
@@ -694,7 +694,7 @@ func TestPreprocessorError(t *testing.T) {
 				},
 			},
 			data:   []byte(`{"abc": "not a time"}`),
-			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found not a time"),
+			result: errors.New("error in preprocessor: invalid data type for abc, expect bigint but found string(not a time)"),
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))

+ 512 - 0
xsql/processors/xsql_processor_test.go

@@ -112,6 +112,15 @@ func createStreams(t *testing.T) {
 	if err != nil {
 		t.Log(err)
 	}
+	demoE := `CREATE STREAM demoE (
+					color STRING,
+					size BIGINT,
+					ts BIGINT
+				) WITH (DATASOURCE="demoE", FORMAT="json", KEY="ts");`
+	_, err = p.ExecStmt(demoE)
+	if err != nil {
+		t.Log(err)
+	}
 	demo1 := `CREATE STREAM demo1 (
 					temp FLOAT,
 					hum BIGINT,
@@ -139,6 +148,11 @@ func dropStreams(t *testing.T) {
 	if err != nil {
 		t.Log(err)
 	}
+	demoE := `DROP STREAM demoE`
+	_, err = p.ExecStmt(demoE)
+	if err != nil {
+		t.Log(err)
+	}
 	demo1 := `DROP STREAM demo1`
 	_, err = p.ExecStmt(demo1)
 	if err != nil {
@@ -151,6 +165,47 @@ func dropStreams(t *testing.T) {
 	}
 }
 
+func createSchemalessStreams(t *testing.T) {
+	p := NewStreamProcessor(path.Join(DbDir, "stream"))
+	demo := `CREATE STREAM ldemo (					
+				) WITH (DATASOURCE="ldemo", FORMAT="json");`
+	_, err := p.ExecStmt(demo)
+	if err != nil {
+		t.Log(err)
+	}
+	demo1 := `CREATE STREAM ldemo1 (
+				) WITH (DATASOURCE="ldemo1", FORMAT="json");`
+	_, err = p.ExecStmt(demo1)
+	if err != nil {
+		t.Log(err)
+	}
+	sessionDemo := `CREATE STREAM lsessionDemo (
+				) WITH (DATASOURCE="lsessionDemo", FORMAT="json");`
+	_, err = p.ExecStmt(sessionDemo)
+	if err != nil {
+		t.Log(err)
+	}
+}
+
+func dropSchemalessStreams(t *testing.T) {
+	p := NewStreamProcessor(path.Join(DbDir, "stream"))
+	demo := `DROP STREAM ldemo`
+	_, err := p.ExecStmt(demo)
+	if err != nil {
+		t.Log(err)
+	}
+	demo1 := `DROP STREAM ldemo1`
+	_, err = p.ExecStmt(demo1)
+	if err != nil {
+		t.Log(err)
+	}
+	sessionDemo := `DROP STREAM lsessionDemo`
+	_, err = p.ExecStmt(sessionDemo)
+	if err != nil {
+		t.Log(err)
+	}
+}
+
 func getMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
 	var data []*xsql.Tuple
 	switch name {
@@ -202,6 +257,54 @@ func getMockSource(name string, done <-chan int, size int) *nodes.SourceNode {
 				Timestamp: 1541152489252,
 			},
 		}
+	case "demoE":
+		data = []*xsql.Tuple{
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": 3,
+					"size":  "red",
+					"ts":    1541152486013,
+				},
+				Timestamp: 1541152486013,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "blue",
+					"size":  6,
+					"ts":    "1541152486822",
+				},
+				Timestamp: 1541152486822,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "blue",
+					"size":  2,
+					"ts":    1541152487632,
+				},
+				Timestamp: 1541152487632,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": 7,
+					"size":  4,
+					"ts":    1541152488442,
+				},
+				Timestamp: 1541152488442,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "red",
+					"size":  "blue",
+					"ts":    1541152489252,
+				},
+				Timestamp: 1541152489252,
+			},
+		}
 	case "demo1":
 		data = []*xsql.Tuple{
 			{
@@ -492,6 +595,49 @@ func TestSingleSQL(t *testing.T) {
 				"op_filter_0_records_in_total":   int64(5),
 				"op_filter_0_records_out_total":  int64(2),
 			},
+		}, {
+			name: `rule4`,
+			sql:  `SELECT size as Int8, ts FROM demoE where size > 3`,
+			r: [][]map[string]interface{}{
+				{{
+					"error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
+				}},
+				{{
+					"Int8": float64(6),
+					"ts":   float64(1541152486822),
+				}},
+				{{
+					"error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
+				}},
+				{{
+					"error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
+				}},
+			},
+			s: "op_filter_0_records_in_total",
+			m: map[string]interface{}{
+				"op_preprocessor_demoE_0_exceptions_total":   int64(3),
+				"op_preprocessor_demoE_0_process_latency_ms": int64(0),
+				"op_preprocessor_demoE_0_records_in_total":   int64(5),
+				"op_preprocessor_demoE_0_records_out_total":  int64(2),
+
+				"op_project_0_exceptions_total":   int64(3),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(4),
+				"op_project_0_records_out_total":  int64(1),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(4),
+				"sink_mockSink_0_records_out_total": int64(4),
+
+				"source_demoE_0_exceptions_total":  int64(0),
+				"source_demoE_0_records_in_total":  int64(5),
+				"source_demoE_0_records_out_total": int64(5),
+
+				"op_filter_0_exceptions_total":   int64(3),
+				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_records_in_total":   int64(5),
+				"op_filter_0_records_out_total":  int64(1),
+			},
 		},
 	}
 	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
@@ -571,6 +717,372 @@ func TestSingleSQL(t *testing.T) {
 	}
 }
 
+func getMockSourceL(name string, done <-chan int, size int) *nodes.SourceNode {
+	var data []*xsql.Tuple
+	switch name {
+	case "ldemo":
+		data = []*xsql.Tuple{
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "red",
+					"size":  3,
+					"ts":    1541152486013,
+				},
+				Timestamp: 1541152486013,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "blue",
+					"size":  "string",
+					"ts":    1541152486822,
+				},
+				Timestamp: 1541152486822,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"size": 2,
+					"ts":   1541152487632,
+				},
+				Timestamp: 1541152487632,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": 49,
+					"size":  2,
+					"ts":    1541152488442,
+				},
+				Timestamp: 1541152488442,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"color": "red",
+					"ts":    1541152489252,
+				},
+				Timestamp: 1541152489252,
+			},
+		}
+	case "demo1":
+		data = []*xsql.Tuple{
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 25.5,
+					"hum":  65,
+					"ts":   1541152486013,
+				},
+				Timestamp: 1541152486013,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 27.5,
+					"hum":  59,
+					"ts":   1541152486823,
+				},
+				Timestamp: 1541152486823,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 28.1,
+					"hum":  75,
+					"ts":   1541152487632,
+				},
+				Timestamp: 1541152487632,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 27.4,
+					"hum":  80,
+					"ts":   1541152488442,
+				},
+				Timestamp: 1541152488442,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 25.5,
+					"hum":  62,
+					"ts":   1541152489252,
+				},
+				Timestamp: 1541152489252,
+			},
+		}
+	case "sessionDemo":
+		data = []*xsql.Tuple{
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 25.5,
+					"hum":  65,
+					"ts":   1541152486013,
+				},
+				Timestamp: 1541152486013,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 27.5,
+					"hum":  59,
+					"ts":   1541152486823,
+				},
+				Timestamp: 1541152486823,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 28.1,
+					"hum":  75,
+					"ts":   1541152487932,
+				},
+				Timestamp: 1541152487932,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 27.4,
+					"hum":  80,
+					"ts":   1541152488442,
+				},
+				Timestamp: 1541152488442,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 25.5,
+					"hum":  62,
+					"ts":   1541152489252,
+				},
+				Timestamp: 1541152489252,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 26.2,
+					"hum":  63,
+					"ts":   1541152490062,
+				},
+				Timestamp: 1541152490062,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 26.8,
+					"hum":  71,
+					"ts":   1541152490872,
+				},
+				Timestamp: 1541152490872,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 28.9,
+					"hum":  85,
+					"ts":   1541152491682,
+				},
+				Timestamp: 1541152491682,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 29.1,
+					"hum":  92,
+					"ts":   1541152492492,
+				},
+				Timestamp: 1541152492492,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 2.2,
+					"hum":  99,
+					"ts":   1541152493202,
+				},
+				Timestamp: 1541152493202,
+			},
+			{
+				Emitter: name,
+				Message: map[string]interface{}{
+					"temp": 30.9,
+					"hum":  87,
+					"ts":   1541152494112,
+				},
+				Timestamp: 1541152494112,
+			},
+		}
+	}
+	return nodes.NewSourceNodeWithSource(name, test.NewMockSource(data[:size], done, false), map[string]string{
+		"DATASOURCE": name,
+	})
+}
+func TestSingleSQLError(t *testing.T) {
+	var tests = []struct {
+		name string
+		sql  string
+		r    [][]map[string]interface{}
+		s    string
+		m    map[string]interface{}
+	}{
+		{
+			name: `rule1`,
+			sql:  `SELECT color, ts FROM ldemo where size >= 3`,
+			r: [][]map[string]interface{}{
+				{{
+					"color": "red",
+					"ts":    float64(1541152486013),
+				}},
+				{{
+					"error": "invalid operation string >= int64",
+				}},
+			},
+			s: "op_filter_0_records_in_total",
+			m: map[string]interface{}{
+				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(1),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(2),
+				"op_project_0_records_out_total":  int64(1),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(2),
+				"sink_mockSink_0_records_out_total": int64(2),
+
+				"source_ldemo_0_exceptions_total":  int64(0),
+				"source_ldemo_0_records_in_total":  int64(5),
+				"source_ldemo_0_records_out_total": int64(5),
+
+				"op_filter_0_exceptions_total":   int64(1),
+				"op_filter_0_process_latency_ms": int64(0),
+				"op_filter_0_records_in_total":   int64(5),
+				"op_filter_0_records_out_total":  int64(1),
+			},
+		}, {
+			name: `rule2`,
+			sql:  `SELECT size * 5 FROM ldemo`,
+			r: [][]map[string]interface{}{
+				{{
+					"rengine_field_0": float64(15),
+				}},
+				{{
+					"error": "invalid operation string * int64",
+				}},
+				{{
+					"rengine_field_0": float64(10),
+				}},
+				{{
+					"rengine_field_0": float64(10),
+				}},
+				{{}},
+			},
+			s: "op_filter_0_records_in_total",
+			m: map[string]interface{}{
+				"op_preprocessor_ldemo_0_exceptions_total":   int64(0),
+				"op_preprocessor_ldemo_0_process_latency_ms": int64(0),
+				"op_preprocessor_ldemo_0_records_in_total":   int64(5),
+				"op_preprocessor_ldemo_0_records_out_total":  int64(5),
+
+				"op_project_0_exceptions_total":   int64(1),
+				"op_project_0_process_latency_ms": int64(0),
+				"op_project_0_records_in_total":   int64(5),
+				"op_project_0_records_out_total":  int64(4),
+
+				"sink_mockSink_0_exceptions_total":  int64(0),
+				"sink_mockSink_0_records_in_total":  int64(5),
+				"sink_mockSink_0_records_out_total": int64(5),
+
+				"source_ldemo_0_exceptions_total":  int64(0),
+				"source_ldemo_0_records_in_total":  int64(5),
+				"source_ldemo_0_records_out_total": int64(5),
+			},
+		},
+	}
+	fmt.Printf("The test bucket size is %d.\n\n", len(tests))
+	createSchemalessStreams(t)
+	defer dropSchemalessStreams(t)
+	//defer close(done)
+	for i, tt := range tests {
+		test.ResetClock(1541152486000)
+		p := NewRuleProcessor(DbDir)
+		parser := xsql.NewParser(strings.NewReader(tt.sql))
+		var (
+			sources []*nodes.SourceNode
+			syncs   []chan int
+		)
+		if stmt, err := xsql.Language.Parse(parser); err != nil {
+			t.Errorf("parse sql %s error: %s", tt.sql, err)
+		} else {
+			if selectStmt, ok := stmt.(*xsql.SelectStatement); !ok {
+				t.Errorf("sql %s is not a select statement", tt.sql)
+			} else {
+				streams := xsql.GetStreams(selectStmt)
+				for _, stream := range streams {
+					next := make(chan int)
+					syncs = append(syncs, next)
+					source := getMockSourceL(stream, next, 5)
+					sources = append(sources, source)
+				}
+			}
+		}
+		tp, inputs, err := p.createTopoWithSources(&api.Rule{Id: tt.name, Sql: tt.sql, Options: map[string]interface{}{
+			"bufferLength": float64(100),
+		}}, sources)
+		if err != nil {
+			t.Error(err)
+		}
+		mockSink := test.NewMockSink()
+		sink := nodes.NewSinkNodeWithSink("mockSink", mockSink)
+		tp.AddSink(inputs, sink)
+		errCh := tp.Open()
+		func() {
+			for i := 0; i < 5; i++ {
+				syncs[i%len(syncs)] <- i
+				select {
+				case err = <-errCh:
+					t.Log(err)
+					tp.Cancel()
+					return
+				default:
+				}
+			}
+			for retry := 100; retry > 0; retry-- {
+				if err := compareMetrics(tp, tt.m, tt.sql); err == nil {
+					break
+				}
+				time.Sleep(time.Duration(retry) * time.Millisecond)
+			}
+		}()
+		results := mockSink.GetResults()
+		var maps [][]map[string]interface{}
+		for _, v := range results {
+			var mapRes []map[string]interface{}
+			err := json.Unmarshal(v, &mapRes)
+			if err != nil {
+				t.Errorf("Failed to parse the input into map")
+				continue
+			}
+			maps = append(maps, mapRes)
+		}
+		if !reflect.DeepEqual(tt.r, maps) {
+			t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.r, maps)
+			continue
+		}
+		if err := compareMetrics(tp, tt.m, tt.sql); err != nil {
+			t.Errorf("%d. %q\n\n%v", i, tt.sql, err)
+		}
+		tp.Cancel()
+	}
+}
+
 func TestWindow(t *testing.T) {
 	var tests = []struct {
 		name string

+ 11 - 2
xstream/nodes/sink_node.go

@@ -168,10 +168,19 @@ func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval
 	stats.IncTotalRecordsIn()
 	stats.ProcessTimeStart()
 	logger := ctx.GetLogger()
+	var outdata []byte
+	switch val := item.data.(type) {
+	case []byte:
+		outdata = val
+	case error:
+		outdata = []byte(fmt.Sprintf(`[{"error":"%s"}]`, val.Error()))
+	default:
+		outdata = []byte(fmt.Sprintf(`[{"error":"result is not a string but found %#v"}]`, val))
+	}
 	for {
-		if err := sink.Collect(ctx, item.data); err != nil {
+		if err := sink.Collect(ctx, outdata); err != nil {
 			stats.IncTotalExceptions()
-			logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), item.data, err)
+			logger.Warnf("sink node %s instance %d publish %s error: %v", ctx.GetOpId(), ctx.GetInstanceId(), outdata, err)
 			if retryInterval > 0 {
 				time.Sleep(time.Duration(retryInterval) * time.Millisecond)
 				logger.Debugf("try again")

+ 2 - 2
xstream/operators/operations.go

@@ -138,8 +138,8 @@ func (o *UnaryOperator) doOp(ctx api.StreamContext, errCh chan<- error) {
 			case nil:
 				continue
 			case error: //TODO error handling
-				logger.Infoln(val)
-				logger.Infoln(val.Error())
+				logger.Errorln(val)
+				nodes.Broadcast(o.outputs, val, ctx)
 				stats.IncTotalExceptions()
 				continue
 			default: