Jelajahi Sumber

feat(watermark): add watermark node in graph API

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
Jiyong Huang 1 tahun lalu
induk
melakukan
af66a03510

+ 4 - 0
internal/topo/graph/io.go

@@ -147,6 +147,10 @@ var OpIO = map[string][]*IOType{
 		{Type: IOINPUT_TYPE_ANY, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY},
 		{Type: IOINPUT_TYPE_SAME},
 	},
+	"watermark": {
+		{Type: IOINPUT_TYPE_ROW, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY, AllowMulti: true},
+		{Type: IOINPUT_TYPE_SAME},
+	},
 	"window": {
 		{Type: IOINPUT_TYPE_ROW, RowType: IOROW_TYPE_ANY, CollectionType: IOCOLLECTION_TYPE_ANY, AllowMulti: true},
 		{Type: IOINPUT_TYPE_COLLECTION, CollectionType: IOCOLLECTION_TYPE_SINGLE, RowType: IOROW_TYPE_SINGLE},

+ 4 - 0
internal/topo/graph/node.go

@@ -26,6 +26,10 @@ type Select struct {
 	Fields []string `json:"fields"`
 }
 
+type Watermark struct {
+	Emitters []string `json:"emitters"`
+}
+
 type Window struct {
 	Type     string `json:"type"`
 	Unit     string `json:"unit"`

+ 3 - 3
internal/topo/node/window_op.go

@@ -32,9 +32,9 @@ import (
 )
 
 type WindowConfig struct {
-	Type     ast.WindowType
-	Length   int64
-	Interval int64 // If the interval is not set, it is equals to Length
+	Type        ast.WindowType
+	Length      int64
+	Interval    int64 // If the interval is not set, it is equals to Length
 	RawInterval int
 	TimeUnit    ast.Token
 }

+ 24 - 0
internal/topo/planner/planner_graph.go

@@ -110,6 +110,13 @@ func PlanByGraph(rule *api.Rule) (*topo.Topo, error) {
 			}
 			nt := strings.ToLower(gn.NodeType)
 			switch nt {
+			case "watermark":
+				emitters, err := parseWatermark(gn.Props, streamEmitters)
+				if err != nil {
+					return nil, fmt.Errorf("parse watermark %s with %v error: %w", nodeName, gn.Props, err)
+				}
+				op := node.NewWatermarkOp(nodeName, emitters, rule.Options)
+				nodeMap[nodeName] = op
 			case "function":
 				fop, err := parseFunc(gn.Props, sourceNames)
 				if err != nil {
@@ -561,6 +568,23 @@ func parseJoinAst(props map[string]interface{}, sourceNames []string) (*ast.Sele
 	return xsql.NewParserWithSources(strings.NewReader(stmt), sourceNames).Parse()
 }
 
+func parseWatermark(props map[string]interface{}, streamEmitters map[string]struct{}) ([]string, error) {
+	n := &graph.Watermark{}
+	err := cast.MapToStruct(props, n)
+	if err != nil {
+		return nil, err
+	}
+	if len(n.Emitters) == 0 {
+		return nil, fmt.Errorf("watermark must have at least one emitter")
+	}
+	for _, e := range n.Emitters {
+		if _, ok := streamEmitters[e]; !ok {
+			return nil, fmt.Errorf("emitter %s does not exist", e)
+		}
+	}
+	return n.Emitters, nil
+}
+
 func parseWindow(props map[string]interface{}) (*node.WindowConfig, error) {
 	n := &graph.Window{}
 	err := cast.MapToStruct(props, n)

+ 84 - 76
test/graph_group_order_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2022 EMQ Technologies Co., Ltd.
+  ~ Copyright 2023 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -133,44 +133,51 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-  &quot;id&quot;: &quot;rule1&quot;,&#xd;
-  &quot;name&quot;: &quot;Group order规则&quot;,&#xd;
-  &quot;graph&quot;: {&#xd;
-    &quot;nodes&quot;: {&#xd;
-      &quot;device1&quot;: {&#xd;
-        &quot;type&quot;: &quot;source&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;datasource&quot;: &quot;devices/1/messages&quot;,&#xd;
-          &quot;timestamp&quot;: &quot;ts&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;device2&quot;: {&#xd;
-        &quot;type&quot;: &quot;source&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;datasource&quot;: &quot;devices/2/messages&quot;,&#xd;
-          &quot;timestamp&quot;: &quot;ts&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;window&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;window&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;type&quot;: &quot;hoppingwindow&quot;,&#xd;
-          &quot;unit&quot;: &quot;ms&quot;,&#xd;
-          &quot;size&quot;: 1500,&#xd;
-          &quot;interval&quot;: 300&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;groupop&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;groupby&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;dimensions&quot;: [&quot;device_id&quot;]&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;countop&quot;: {&#xd;
+                    &quot;id&quot;: &quot;rule1&quot;,&#xd;
+                    &quot;name&quot;: &quot;Group order规则&quot;,&#xd;
+                    &quot;graph&quot;: {&#xd;
+                    &quot;nodes&quot;: {&#xd;
+                    &quot;device1&quot;: {&#xd;
+                    &quot;type&quot;: &quot;source&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;datasource&quot;: &quot;devices/1/messages&quot;,&#xd;
+                    &quot;timestamp&quot;: &quot;ts&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;device2&quot;: {&#xd;
+                    &quot;type&quot;: &quot;source&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;datasource&quot;: &quot;devices/2/messages&quot;,&#xd;
+                    &quot;timestamp&quot;: &quot;ts&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;watermark&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;watermark&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;emitters&quot;:[&quot;device1&quot;,&quot;device2&quot;]&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;window&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;window&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;type&quot;: &quot;hoppingwindow&quot;,&#xd;
+                    &quot;unit&quot;: &quot;ms&quot;,&#xd;
+                    &quot;size&quot;: 1500,&#xd;
+                    &quot;interval&quot;: 300&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;groupop&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;groupby&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;dimensions&quot;: [&quot;device_id&quot;]&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;countop&quot;: {&#xd;
         &quot;type&quot;: &quot;operator&quot;,&#xd;
         &quot;nodeType&quot;: &quot;aggfunc&quot;,&#xd;
         &quot;props&quot;: {&#xd;
@@ -198,42 +205,43 @@
         &quot;type&quot;: &quot;operator&quot;,&#xd;
         &quot;nodeType&quot;: &quot;pick&quot;,&#xd;
         &quot;props&quot;: {&#xd;
-          &quot;fields&quot;: [&quot;device_id&quot;, &quot;count&quot;]&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;mqtt&quot;: {&#xd;
-        &quot;type&quot;: &quot;sink&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
-          &quot;topic&quot;: &quot;devices/result&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;log&quot;: {&#xd;
-        &quot;type&quot;: &quot;sink&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;log&quot;,&#xd;
-        &quot;props&quot;: {}&#xd;
-      }&#xd;
-    },&#xd;
-    &quot;topo&quot;: {&#xd;
-      &quot;sources&quot;: [&quot;device1&quot;, &quot;device2&quot;],&#xd;
-      &quot;edges&quot;: {&#xd;
-        &quot;device1&quot;: [&quot;window&quot;],&#xd;
-        &quot;device2&quot;: [&quot;window&quot;],&#xd;
-        &quot;window&quot;: [&quot;groupop&quot;],&#xd;
-        &quot;groupop&quot;: [&quot;countop&quot;],&#xd;
-        &quot;countop&quot;: [&quot;filterop&quot;],&#xd;
-        &quot;filterop&quot;: [&quot;orderop&quot;],&#xd;
-        &quot;orderop&quot;: [&quot;pick&quot;],&#xd;
-        &quot;pick&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
-      }&#xd;
-    }&#xd;
-  },&#xd;
-  &quot;options&quot;: {&#xd;
-    &quot;isEventTime&quot;: true,&#xd;
-    &quot;lateTolerance&quot; : 0&#xd;
-  }&#xd;
-}</stringProp>
+                    &quot;fields&quot;: [&quot;device_id&quot;, &quot;count&quot;]&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;mqtt&quot;: {&#xd;
+                    &quot;type&quot;: &quot;sink&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
+                    &quot;topic&quot;: &quot;devices/result&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;log&quot;: {&#xd;
+                    &quot;type&quot;: &quot;sink&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;log&quot;,&#xd;
+                    &quot;props&quot;: {}&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;topo&quot;: {&#xd;
+                    &quot;sources&quot;: [&quot;device1&quot;, &quot;device2&quot;],&#xd;
+                    &quot;edges&quot;: {&#xd;
+                    &quot;device1&quot;: [&quot;watermark&quot;],&#xd;
+                    &quot;device2&quot;: [&quot;watermark&quot;],&#xd;
+                    &quot;watermark&quot;:[&quot;window&quot;],&#xd;
+                    &quot;window&quot;: [&quot;groupop&quot;],&#xd;
+                    &quot;groupop&quot;: [&quot;countop&quot;],&#xd;
+                    &quot;countop&quot;: [&quot;filterop&quot;],&#xd;
+                    &quot;filterop&quot;: [&quot;orderop&quot;],&#xd;
+                    &quot;orderop&quot;: [&quot;pick&quot;],&#xd;
+                    &quot;pick&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
+                    }&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;options&quot;: {&#xd;
+                    &quot;isEventTime&quot;: true,&#xd;
+                    &quot;lateTolerance&quot; : 0&#xd;
+                    }&#xd;
+                    }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>
               </collectionProp>
@@ -342,7 +350,7 @@
           <hashTree>
             <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
               <stringProp name="JSON_PATH">$.op_window_0_records_in_total</stringProp>
-              <stringProp name="EXPECTED_VALUE">15</stringProp>
+              <stringProp name="EXPECTED_VALUE">13</stringProp>
               <boolProp name="JSONVALIDATION">true</boolProp>
               <boolProp name="EXPECT_NULL">false</boolProp>
               <boolProp name="INVERT">false</boolProp>

+ 83 - 75
test/graph_group_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2022 EMQ Technologies Co., Ltd.
+  ~ Copyright 2023 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -133,44 +133,51 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-  &quot;id&quot;: &quot;rule1&quot;,&#xd;
-  &quot;name&quot;: &quot;Group规则&quot;,&#xd;
-  &quot;graph&quot;: {&#xd;
-    &quot;nodes&quot;: {&#xd;
-      &quot;device1&quot;: {&#xd;
-        &quot;type&quot;: &quot;source&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;datasource&quot;: &quot;devices/1/messages&quot;,&#xd;
-          &quot;timestamp&quot;: &quot;ts&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;device2&quot;: {&#xd;
-        &quot;type&quot;: &quot;source&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;datasource&quot;: &quot;devices/2/messages&quot;,&#xd;
-          &quot;timestamp&quot;: &quot;ts&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;window&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;window&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;type&quot;: &quot;tumblingwindow&quot;,&#xd;
-          &quot;unit&quot;: &quot;ms&quot;,&#xd;
-          &quot;size&quot;: 600&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;joinop&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;join&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;from&quot;: &quot;device1&quot;,&#xd;
-          &quot;joins&quot;: [&#xd;
-            {&#xd;
-              &quot;name&quot;: &quot;device2&quot;,&#xd;
-              &quot;type&quot;: &quot;inner&quot;,&#xd;
+                    &quot;id&quot;: &quot;rule1&quot;,&#xd;
+                    &quot;name&quot;: &quot;Group规则&quot;,&#xd;
+                    &quot;graph&quot;: {&#xd;
+                    &quot;nodes&quot;: {&#xd;
+                    &quot;device1&quot;: {&#xd;
+                    &quot;type&quot;: &quot;source&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;datasource&quot;: &quot;devices/1/messages&quot;,&#xd;
+                    &quot;timestamp&quot;: &quot;ts&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;device2&quot;: {&#xd;
+                    &quot;type&quot;: &quot;source&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;datasource&quot;: &quot;devices/2/messages&quot;,&#xd;
+                    &quot;timestamp&quot;: &quot;ts&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;watermark&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;watermark&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;emitters&quot;:[&quot;device1&quot;,&quot;device2&quot;]&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;window&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;window&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;type&quot;: &quot;tumblingwindow&quot;,&#xd;
+                    &quot;unit&quot;: &quot;ms&quot;,&#xd;
+                    &quot;size&quot;: 600&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;joinop&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;join&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;from&quot;: &quot;device1&quot;,&#xd;
+                    &quot;joins&quot;: [&#xd;
+                    {&#xd;
+                    &quot;name&quot;: &quot;device2&quot;,&#xd;
+                    &quot;type&quot;: &quot;inner&quot;,&#xd;
               &quot;on&quot;: &quot;abs(device1.ts - device2.ts) &lt; 200&quot;&#xd;
             }&#xd;
           ]&#xd;
@@ -194,41 +201,42 @@
         &quot;type&quot;: &quot;operator&quot;,&#xd;
         &quot;nodeType&quot;: &quot;pick&quot;,&#xd;
         &quot;props&quot;: {&#xd;
-          &quot;fields&quot;: [&quot;device1.humidity&quot;, &quot;count&quot;]&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;mqtt&quot;: {&#xd;
-        &quot;type&quot;: &quot;sink&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
-          &quot;topic&quot;: &quot;devices/result&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;log&quot;: {&#xd;
-        &quot;type&quot;: &quot;sink&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;log&quot;,&#xd;
-        &quot;props&quot;: {}&#xd;
-      }&#xd;
-    },&#xd;
-    &quot;topo&quot;: {&#xd;
-      &quot;sources&quot;: [&quot;device1&quot;, &quot;device2&quot;],&#xd;
-      &quot;edges&quot;: {&#xd;
-        &quot;device1&quot;: [&quot;window&quot;],&#xd;
-        &quot;device2&quot;: [&quot;window&quot;],&#xd;
-        &quot;window&quot;: [&quot;joinop&quot;],&#xd;
-        &quot;joinop&quot;: [&quot;groupop&quot;],&#xd;
-        &quot;groupop&quot;: [&quot;countop&quot;],&#xd;
-        &quot;countop&quot;: [&quot;pick&quot;],&#xd;
-        &quot;pick&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
-      }&#xd;
-    }&#xd;
-  },&#xd;
-  &quot;options&quot;: {&#xd;
-    &quot;isEventTime&quot;: true,&#xd;
-    &quot;lateTolerance&quot; : 0&#xd;
-  }&#xd;
-}</stringProp>
+                    &quot;fields&quot;: [&quot;device1.humidity&quot;, &quot;count&quot;]&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;mqtt&quot;: {&#xd;
+                    &quot;type&quot;: &quot;sink&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
+                    &quot;topic&quot;: &quot;devices/result&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;log&quot;: {&#xd;
+                    &quot;type&quot;: &quot;sink&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;log&quot;,&#xd;
+                    &quot;props&quot;: {}&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;topo&quot;: {&#xd;
+                    &quot;sources&quot;: [&quot;device1&quot;, &quot;device2&quot;],&#xd;
+                    &quot;edges&quot;: {&#xd;
+                    &quot;device1&quot;: [&quot;watermark&quot;],&#xd;
+                    &quot;device2&quot;: [&quot;watermark&quot;],&#xd;
+                    &quot;watermark&quot;:[&quot;window&quot;],&#xd;
+                    &quot;window&quot;: [&quot;joinop&quot;],&#xd;
+                    &quot;joinop&quot;: [&quot;groupop&quot;],&#xd;
+                    &quot;groupop&quot;: [&quot;countop&quot;],&#xd;
+                    &quot;countop&quot;: [&quot;pick&quot;],&#xd;
+                    &quot;pick&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
+                    }&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;options&quot;: {&#xd;
+                    &quot;isEventTime&quot;: true,&#xd;
+                    &quot;lateTolerance&quot; : 0&#xd;
+                    }&#xd;
+                    }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>
               </collectionProp>
@@ -337,7 +345,7 @@
           <hashTree>
             <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
               <stringProp name="JSON_PATH">$.op_window_0_records_in_total</stringProp>
-              <stringProp name="EXPECTED_VALUE">15</stringProp>
+              <stringProp name="EXPECTED_VALUE">13</stringProp>
               <boolProp name="JSONVALIDATION">true</boolProp>
               <boolProp name="EXPECT_NULL">false</boolProp>
               <boolProp name="INVERT">false</boolProp>

+ 83 - 74
test/graph_join_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2022 EMQ Technologies Co., Ltd.
+  ~ Copyright 2023 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -133,78 +133,87 @@
                 <elementProp name="" elementType="HTTPArgument">
                   <boolProp name="HTTPArgument.always_encode">false</boolProp>
                   <stringProp name="Argument.value">{&#xd;
-  &quot;id&quot;: &quot;rule1&quot;,&#xd;
-  &quot;name&quot;: &quot;Join规则&quot;,&#xd;
-  &quot;graph&quot;: {&#xd;
-    &quot;nodes&quot;: {&#xd;
-      &quot;device1&quot;: {&#xd;
-        &quot;type&quot;: &quot;source&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;datasource&quot;: &quot;devices/1/messages&quot;,&#xd;
-          &quot;timestamp&quot;: &quot;ts&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;device2&quot;: {&#xd;
-        &quot;type&quot;: &quot;source&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;datasource&quot;: &quot;devices/2/messages&quot;,&#xd;
-          &quot;timestamp&quot;: &quot;ts&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;window&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;window&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;type&quot;: &quot;tumblingwindow&quot;,&#xd;
-          &quot;unit&quot;: &quot;ms&quot;,&#xd;
-          &quot;size&quot;: 600&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;joinop&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;join&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;from&quot;: &quot;device1&quot;,&#xd;
-          &quot;joins&quot;: [&#xd;
-            {&#xd;
-              &quot;name&quot;: &quot;device2&quot;,&#xd;
-              &quot;type&quot;: &quot;inner&quot;,&#xd;
-              &quot;on&quot;: &quot;abs(device1.ts - device2.ts) &lt; 200&quot;&#xd;
-            }&#xd;
-          ]&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;mqtt&quot;: {&#xd;
-        &quot;type&quot;: &quot;sink&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
-          &quot;topic&quot;: &quot;devices/result&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;log&quot;: {&#xd;
-        &quot;type&quot;: &quot;sink&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;log&quot;,&#xd;
-        &quot;props&quot;: {}&#xd;
-      }&#xd;
-    },&#xd;
-    &quot;topo&quot;: {&#xd;
-      &quot;sources&quot;: [&quot;device1&quot;, &quot;device2&quot;],&#xd;
-      &quot;edges&quot;: {&#xd;
-        &quot;device1&quot;: [&quot;window&quot;],&#xd;
-        &quot;device2&quot;: [&quot;window&quot;],&#xd;
-        &quot;window&quot;: [&quot;joinop&quot;],&#xd;
-        &quot;joinop&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
-      }&#xd;
-    }&#xd;
-  },&#xd;
-  &quot;options&quot;: {&#xd;
-    &quot;isEventTime&quot;: true,&#xd;
-    &quot;lateTolerance&quot; : 0&#xd;
-  }&#xd;
-}</stringProp>
+                    &quot;id&quot;: &quot;rule1&quot;,&#xd;
+                    &quot;name&quot;: &quot;Join规则&quot;,&#xd;
+                    &quot;graph&quot;: {&#xd;
+                    &quot;nodes&quot;: {&#xd;
+                    &quot;device1&quot;: {&#xd;
+                    &quot;type&quot;: &quot;source&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;datasource&quot;: &quot;devices/1/messages&quot;,&#xd;
+                    &quot;timestamp&quot;: &quot;ts&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;device2&quot;: {&#xd;
+                    &quot;type&quot;: &quot;source&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;datasource&quot;: &quot;devices/2/messages&quot;,&#xd;
+                    &quot;timestamp&quot;: &quot;ts&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;watermark&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;watermark&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;emitters&quot;:[&quot;device1&quot;,&quot;device2&quot;]&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;window&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;window&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;type&quot;: &quot;tumblingwindow&quot;,&#xd;
+                    &quot;unit&quot;: &quot;ms&quot;,&#xd;
+                    &quot;size&quot;: 600&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;joinop&quot;: {&#xd;
+                    &quot;type&quot;: &quot;operator&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;join&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;from&quot;: &quot;device1&quot;,&#xd;
+                    &quot;joins&quot;: [&#xd;
+                    {&#xd;
+                    &quot;name&quot;: &quot;device2&quot;,&#xd;
+                    &quot;type&quot;: &quot;inner&quot;,&#xd;
+                    &quot;on&quot;: &quot;abs(device1.ts - device2.ts) &lt; 200&quot;&#xd;
+                    }&#xd;
+                    ]&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;mqtt&quot;: {&#xd;
+                    &quot;type&quot;: &quot;sink&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                    &quot;props&quot;: {&#xd;
+                    &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
+                    &quot;topic&quot;: &quot;devices/result&quot;&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;log&quot;: {&#xd;
+                    &quot;type&quot;: &quot;sink&quot;,&#xd;
+                    &quot;nodeType&quot;: &quot;log&quot;,&#xd;
+                    &quot;props&quot;: {}&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;topo&quot;: {&#xd;
+                    &quot;sources&quot;: [&quot;device1&quot;, &quot;device2&quot;],&#xd;
+                    &quot;edges&quot;: {&#xd;
+                    &quot;device1&quot;: [&quot;watermark&quot;],&#xd;
+                    &quot;device2&quot;: [&quot;watermark&quot;],&#xd;
+                    &quot;watermark&quot;:[&quot;window&quot;],&#xd;
+                    &quot;window&quot;: [&quot;joinop&quot;],&#xd;
+                    &quot;joinop&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
+                    }&#xd;
+                    }&#xd;
+                    },&#xd;
+                    &quot;options&quot;: {&#xd;
+                    &quot;isEventTime&quot;: true,&#xd;
+                    &quot;lateTolerance&quot; : 0&#xd;
+                    }&#xd;
+                    }
+                  </stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>
                 </elementProp>
               </collectionProp>
@@ -317,7 +326,7 @@
           <hashTree>
             <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
               <stringProp name="JSON_PATH">$.op_window_0_records_in_total</stringProp>
-              <stringProp name="EXPECTED_VALUE">15</stringProp>
+              <stringProp name="EXPECTED_VALUE">13</stringProp>
               <boolProp name="JSONVALIDATION">true</boolProp>
               <boolProp name="EXPECT_NULL">false</boolProp>
               <boolProp name="INVERT">false</boolProp>

+ 50 - 42
test/graph_window_rule.jmx

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ Copyright 2022 EMQ Technologies Co., Ltd.
+  ~ Copyright 2023 EMQ Technologies Co., Ltd.
   ~
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -135,26 +135,33 @@
                   <stringProp name="Argument.value">{&#xd;
   &quot;id&quot;: &quot;rule1&quot;,&#xd;
   &quot;name&quot;: &quot;窗口规则&quot;,&#xd;
-  &quot;graph&quot;: {&#xd;
-    &quot;nodes&quot;: {&#xd;
-      &quot;demo&quot;: {&#xd;
-        &quot;type&quot;: &quot;source&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;datasource&quot;: &quot;devices/+/messages&quot;,&#xd;
-          &quot;timestamp&quot;: &quot;ts&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;roundfunc&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;function&quot;,&#xd;
-        &quot;props&quot;: {&#xd;
-          &quot;expr&quot;: &quot;round(temperature) as temp&quot;&#xd;
-        }&#xd;
-      },&#xd;
-      &quot;window&quot;: {&#xd;
-        &quot;type&quot;: &quot;operator&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;window&quot;,&#xd;
+                      &quot;graph&quot;: {&#xd;
+                      &quot;nodes&quot;: {&#xd;
+                      &quot;demo&quot;: {&#xd;
+                      &quot;type&quot;: &quot;source&quot;,&#xd;
+                      &quot;nodeType&quot;: &quot;mqtt&quot;,&#xd;
+                      &quot;props&quot;: {&#xd;
+                      &quot;datasource&quot;: &quot;devices/+/messages&quot;,&#xd;
+                      &quot;timestamp&quot;: &quot;ts&quot;&#xd;
+                      }&#xd;
+                      },&#xd;
+                      &quot;watermark&quot;: {&#xd;
+                      &quot;type&quot;: &quot;operator&quot;,&#xd;
+                      &quot;nodeType&quot;: &quot;watermark&quot;,&#xd;
+                      &quot;props&quot;: {&#xd;
+                      &quot;emitters&quot;:[&quot;demo&quot;]&#xd;
+                      }&#xd;
+                      },&#xd;
+                      &quot;roundfunc&quot;: {&#xd;
+                      &quot;type&quot;: &quot;operator&quot;,&#xd;
+                      &quot;nodeType&quot;: &quot;function&quot;,&#xd;
+                      &quot;props&quot;: {&#xd;
+                      &quot;expr&quot;: &quot;round(temperature) as temp&quot;&#xd;
+                      }&#xd;
+                      },&#xd;
+                      &quot;window&quot;: {&#xd;
+                      &quot;type&quot;: &quot;operator&quot;,&#xd;
+                      &quot;nodeType&quot;: &quot;window&quot;,&#xd;
         &quot;props&quot;: {&#xd;
           &quot;type&quot;: &quot;tumblingwindow&quot;,&#xd;
           &quot;unit&quot;: &quot;ms&quot;,&#xd;
@@ -183,27 +190,28 @@
           &quot;topic&quot;: &quot;devices/result&quot;,&#xd;
           &quot;sendSingle&quot;: true&#xd;
         }&#xd;
-      },&#xd;
-      &quot;log&quot;: {&#xd;
-        &quot;type&quot;: &quot;sink&quot;,&#xd;
-        &quot;nodeType&quot;: &quot;log&quot;,&#xd;
-        &quot;props&quot;: {}&#xd;
-      }&#xd;
-    },&#xd;
-    &quot;topo&quot;: {&#xd;
-      &quot;sources&quot;: [&quot;demo&quot;],&#xd;
-      &quot;edges&quot;: {&#xd;
-        &quot;demo&quot;: [&quot;roundfunc&quot;],&#xd;
-        &quot;roundfunc&quot;: [&quot;window&quot;],&#xd;
-        &quot;window&quot;: [&quot;avgfunc&quot;],&#xd;
-        &quot;avgfunc&quot;: [&quot;pick&quot;],&#xd;
-        &quot;pick&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
-      }&#xd;
-    }&#xd;
-  },&#xd;
-  &quot;options&quot;: {&#xd;
-    &quot;isEventTime&quot;: true,&#xd;
-    &quot;lateTolerance&quot; : 0&#xd;
+                      },&#xd;
+                      &quot;log&quot;: {&#xd;
+                      &quot;type&quot;: &quot;sink&quot;,&#xd;
+                      &quot;nodeType&quot;: &quot;log&quot;,&#xd;
+                      &quot;props&quot;: {}&#xd;
+                      }&#xd;
+                      },&#xd;
+                      &quot;topo&quot;: {&#xd;
+                      &quot;sources&quot;: [&quot;demo&quot;],&#xd;
+                      &quot;edges&quot;: {&#xd;
+                      &quot;demo&quot;: [&quot;watermark&quot;],&#xd;
+                      &quot;watermark&quot;: [&quot;roundfunc&quot;],&#xd;
+                      &quot;roundfunc&quot;: [&quot;window&quot;],&#xd;
+                      &quot;window&quot;: [&quot;avgfunc&quot;],&#xd;
+                      &quot;avgfunc&quot;: [&quot;pick&quot;],&#xd;
+                      &quot;pick&quot;: [&quot;mqtt&quot;, &quot;log&quot;]&#xd;
+                      }&#xd;
+                      }&#xd;
+                      },&#xd;
+                      &quot;options&quot;: {&#xd;
+                      &quot;isEventTime&quot;: true,&#xd;
+                      &quot;lateTolerance&quot; : 0&#xd;
   }&#xd;
 }</stringProp>
                   <stringProp name="Argument.metadata">=</stringProp>