Browse Source

Merge pull request #156 from emqx/edgex_sink

add edgex message bus sink
ngjaying 5 years atrás
parent
commit
b7ea1cde85

+ 11 - 0
fvt_scripts/README.md

@@ -183,3 +183,14 @@ For most of scripts, you can just start JMeter by default way, such as ``bin/jme
 
   - As same steps that required in the ``select_edgex_condition_rule.jmx``, EdgeX value descriptor service & message bus publish tool should be ready.
 
+- [EdgeX message bus sink](edgex_sink_rule.jmx)
+
+  The test script verifies EdgeX message bus sink.  Only one message meet the condition of created rule, and it will be sent to EdgeX message bus sink.
+
+  As with the previous 2 testcases, besides to prepare ``vdmocker`` & ``pub`` application, another ``sub`` application should also be prepared.
+
+  ```shell
+  # go build -o fvt_scripts/edgex/sub/sub fvt_scripts/edgex/sub/sub.go 
+  ```
+
+  

+ 54 - 0
fvt_scripts/edgex/sub/sub.go

@@ -0,0 +1,54 @@
+package main
+
+import (
+	"fmt"
+	"github.com/edgexfoundry/go-mod-messaging/messaging"
+	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
+	"github.com/emqx/kuiper/common"
+)
+
+func main() {
+	var msgConfig1 = types.MessageBusConfig{
+		SubscribeHost: types.HostInfo{
+			Host:     "localhost",
+			Port:     5571,
+			Protocol: "tcp",
+		},
+		Type:messaging.ZeroMQ,
+	}
+
+	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
+		common.Log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			common.Log.Fatal(ec)
+		} else {
+			if err := msgClient.Connect(); err != nil {
+				common.Log.Fatal(err)
+			}
+			//log.Infof("The connection to edgex messagebus is established successfully.")
+			messages := make(chan types.MessageEnvelope)
+			topics := []types.TopicChannel{{Topic: "", Messages: messages}}
+			err := make(chan error)
+			if e := msgClient.Subscribe(topics, err); e != nil {
+				//log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
+				common.Log.Fatal(e)
+			} else {
+				var count int = 0
+				for {
+					select {
+					case e1 := <-err:
+						common.Log.Errorf("%s\n", e1)
+						return
+					case env := <-messages:
+						count ++
+						fmt.Printf("%s\n", env.Payload)
+						if count == 1 {
+							return
+						}
+					}
+				}
+			}
+		}
+	}
+}

+ 441 - 0
fvt_scripts/edgex_sink_rule.jmx

@@ -0,0 +1,441 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<jmeterTestPlan version="1.2" properties="4.0" jmeter="4.0 r1823414">
+  <hashTree>
+    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
+      <stringProp name="TestPlan.comments"></stringProp>
+      <boolProp name="TestPlan.functional_mode">false</boolProp>
+      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
+      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
+      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments"/>
+      </elementProp>
+      <stringProp name="TestPlan.user_define_classpath"></stringProp>
+    </TestPlan>
+    <hashTree>
+      <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments">
+          <elementProp name="srv" elementType="Argument">
+            <stringProp name="Argument.name">srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="rest_port" elementType="Argument">
+            <stringProp name="Argument.name">rest_port</stringProp>
+            <stringProp name="Argument.value">9081</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="fvt" elementType="Argument">
+            <stringProp name="Argument.name">fvt</stringProp>
+            <stringProp name="Argument.value">${__property(fvt,,)}</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+        </collectionProp>
+      </Arguments>
+      <hashTree/>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Rules" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <TransactionController guiclass="TransactionControllerGui" testclass="TransactionController" testname="Rule" enabled="true">
+          <boolProp name="TransactionController.includeTimers">false</boolProp>
+          <boolProp name="TransactionController.parent">false</boolProp>
+        </TransactionController>
+        <hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateStream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+&quot;sql&quot; : &quot;create stream demo () WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;edgex\&quot;)&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1754954177">Stream demo is created.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;id&quot;: &quot;rule1&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT * FROM demo WHERE temperature = 72&quot;,&#xd;
+  &quot;actions&quot;: [&#xd;
+    {&#xd;
+      &quot;edgex&quot;: {&#xd;
+        &quot;protocol&quot;: &quot;tcp&quot;,&#xd;
+        &quot;host&quot;: &quot;*&quot;,&#xd;
+        &quot;port&quot;: 5571,&#xd;
+        &quot;topic&quot;: &quot;application&quot;,&#xd;
+        &quot;contentType&quot;: &quot;application/json&quot;&#xd;
+      }&#xd;
+    }&#xd;
+  ]&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">0</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+          </hashTree>
+          <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+            <boolProp name="SystemSampler.checkReturnCode">false</boolProp>
+            <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+            <stringProp name="SystemSampler.command">fvt_scripts/edgex/pub</stringProp>
+            <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+          </SystemSampler>
+          <hashTree/>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">10</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="false">
+              <stringProp name="JSON_PATH">$.sink_sink_mqtt_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">6</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">5000</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_Drop_Stream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams/demo</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="287881319">Stream demo is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+            <stringProp name="ConstantTimer.delay">500</stringProp>
+          </ConstantTimer>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Result" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+          <boolProp name="SystemSampler.checkReturnCode">true</boolProp>
+          <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+          <stringProp name="SystemSampler.command">fvt_scripts/edgex/sub/sub</stringProp>
+          <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments"/>
+          </elementProp>
+          <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments"/>
+          </elementProp>
+          <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+        </SystemSampler>
+        <hashTree>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.[0].temperature</stringProp>
+            <stringProp name="EXPECTED_VALUE">72</stringProp>
+            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.[0].humidity</stringProp>
+            <stringProp name="EXPECTED_VALUE">81</stringProp>
+            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+    </hashTree>
+  </hashTree>
+</jmeterTestPlan>

+ 3 - 0
fvt_scripts/run_jmeter.sh

@@ -74,6 +74,9 @@ if test $with_edgex = true; then
   /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_edgex_another_bus_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_another_bus_rule.jtl -j jmeter_logs/select_edgex_another_bus_rule.log
   echo -e "---------------------------------------------\n"
 
+  /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/edgex_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_sink_rule.jtl -j jmeter_logs/edgex_sink_rule.log
+  echo -e "---------------------------------------------\n"
+  
   /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_edgex_meta_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_meta_rule.jtl -j jmeter_logs/select_edgex_meta_rule.log
   echo -e "---------------------------------------------\n"
 fi

+ 3 - 0
fvt_scripts/start_vdmock.sh

@@ -4,12 +4,15 @@ echo "starting edgex value descriptor mockup server."
 
 rm -rf fvt_scripts/edgex/valuedesc/vdmocker
 rm -rf fvt_scripts/edgex/pub
+rm -rf fvt_scripts/edgex/sub/sub
 
 go build -o fvt_scripts/edgex/valuedesc/vdmocker fvt_scripts/edgex/valuedesc/vd_server.go
 go build -o fvt_scripts/edgex/pub fvt_scripts/edgex/pub.go
+go build -o fvt_scripts/edgex/sub/sub fvt_scripts/edgex/sub/sub.go
 
 chmod +x fvt_scripts/edgex/valuedesc/vdmocker
 chmod +x fvt_scripts/edgex/pub
+chmod +x fvt_scripts/edgex/sub/sub
 
 export BUILD_ID=dontKillMe
 nohup fvt_scripts/edgex/valuedesc/vdmocker > vdmocker.out 2>&1 &

+ 1 - 1
xstream/nodes/sink_node.go

@@ -197,7 +197,7 @@ func doCollect(sink api.Sink, item *CacheTuple, stats StatManager, retryInterval
 	stats.ProcessTimeEnd()
 }
 
-func getSink(name string, action map[string]interface{}) (api.Sink, error) {
+func doGetSink(name string, action map[string]interface{}) (api.Sink, error) {
 	var s api.Sink
 	switch name {
 	case "log":

+ 14 - 0
xstream/nodes/with_edgex.go

@@ -5,6 +5,7 @@ package nodes
 import (
 	"github.com/emqx/kuiper/xstream/api"
 	"github.com/emqx/kuiper/xstream/extensions"
+	"github.com/emqx/kuiper/xstream/sinks"
 )
 
 func getSource(t string) (api.Source, error) {
@@ -13,3 +14,16 @@ func getSource(t string) (api.Source, error) {
 	}
 	return doGetSource(t)
 }
+
+
+func getSink(name string, action map[string]interface{}) (api.Sink, error) {
+	if name == "edgex" {
+		s := &sinks.EdgexMsgBusSink{}
+		if err := s.Configure(action); err != nil {
+			return nil, err
+		} else {
+			return s, nil
+		}
+	}
+	return doGetSink(name, action)
+}

+ 4 - 0
xstream/nodes/without_edgex.go

@@ -7,3 +7,7 @@ import "github.com/emqx/kuiper/xstream/api"
 func getSource(t string) (api.Source, error) {
 	return doGetSource(t)
 }
+
+func getSink(name string, action map[string]interface{}) (api.Sink, error) {
+	return doGetSink(name, action)
+}

+ 142 - 0
xstream/sinks/edgex_sink.go

@@ -0,0 +1,142 @@
+// +build edgex
+
+package sinks
+
+import (
+	"fmt"
+	"github.com/edgexfoundry/go-mod-messaging/messaging"
+	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
+	"github.com/emqx/kuiper/common"
+	"github.com/emqx/kuiper/xstream/api"
+)
+
+type EdgexMsgBusSink struct {
+	protocol string
+	host     string
+	port     int
+	ptype    string
+
+	topic       string
+	contentType string
+
+	optional *OptionalConf
+	client   messaging.MessageClient
+}
+
+type OptionalConf struct {
+	clientid string
+	username string
+	password string
+}
+
+func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
+	ems.host = "*"
+	ems.protocol = "tcp"
+	ems.port = 5570
+	ems.contentType = "application/json"
+	ems.ptype = messaging.ZeroMQ
+
+	if host, ok := ps["host"]; ok {
+		ems.host = host.(string)
+	} else {
+		common.Log.Infof("Not find host conf, will use default value '*'.")
+	}
+
+	if pro, ok := ps["protocol"]; ok {
+		ems.protocol = pro.(string)
+	} else {
+		common.Log.Infof("Not find protocol conf, will use default value 'tcp'.")
+	}
+
+	if port, ok := ps["port"]; ok {
+		if pv, ok := port.(float64); ok {
+			ems.port = int(pv)
+		} else if pv, ok := port.(float32); ok {
+			ems.port = int(pv)
+		} else {
+			common.Log.Infof("Not valid port value, will use default value '5570'.")
+		}
+
+	} else {
+		common.Log.Infof("Not find port conf, will use default value '5570'.")
+	}
+
+	if topic, ok := ps["topic"]; ok {
+		ems.topic = topic.(string)
+	} else {
+		return fmt.Errorf("Topic must be specified.")
+	}
+
+	if contentType, ok := ps["contentType"]; ok {
+		ems.contentType = contentType.(string)
+	} else {
+		common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
+	}
+
+	if optIntf, ok := ps["optional"]; ok {
+		if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
+			optional := &OptionalConf{}
+			ems.optional = optional
+			if cid, ok2 := opt["clientid"]; ok2 {
+				optional.clientid = cid.(string)
+			}
+			if uname, ok2 := opt["username"]; ok2 {
+				optional.username = uname.(string)
+			}
+			if password, ok2 := opt["password"]; ok2 {
+				optional.password = password.(string)
+			}
+		}
+	}
+	return nil
+}
+
+func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
+	log := ctx.GetLogger()
+	conf := types.MessageBusConfig{
+		PublishHost: types.HostInfo{
+			Host:     ems.host,
+			Port:     ems.port,
+			Protocol: ems.protocol,
+		},
+		Type: ems.ptype,
+	}
+	log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
+	if msgClient, err := messaging.NewMessageClient(conf); err != nil {
+		return err
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			return ec
+		} else {
+			ems.client = msgClient
+		}
+	}
+	return nil
+}
+
+func (ems *EdgexMsgBusSink) Collect(ctx api.StreamContext, item interface{}) error {
+	logger := ctx.GetLogger()
+	if payload, ok := item.([]byte); ok {
+		logger.Debugf("EdgeX message bus sink: %s\n", payload)
+		env := types.NewMessageEnvelope(payload, ctx)
+		env.ContentType = ems.contentType
+		if e := ems.client.Publish(env, ems.topic); e != nil {
+			logger.Errorf("Found error %s when publish to EdgeX message bus.\n", e)
+			return e
+		}
+	} else {
+		return fmt.Errorf("Unkown type %t, the message cannot be published.\n", item)
+	}
+	return nil
+}
+
+func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
+	logger := ctx.GetLogger()
+	logger.Infof("Closing edgex sink")
+	if ems.client != nil {
+		if e := ems.client.Disconnect(); e != nil {
+			return e
+		}
+	}
+	return nil
+}