浏览代码

support mqtt message bus

RockyJin 5 年之前
父节点
当前提交
b84d46ea59

+ 24 - 4
etc/sources/edgex.yaml

@@ -5,13 +5,33 @@ default:
   port: 5563
   topic: events
   serviceServer: http://localhost:48080
+#  Below is optional configurations settings for MQTT
 #  optional:
-#    ClientId: client1
-#    Username: user1
-#    Password: password
+#    ClientId: "client1"
+#    Username: "user1"
+#    Password: "password"
+#    Qos: "1"
+#    KeepAlive: "5000"
+#    Retained: "true/false"
+#    ConnectionPayload: ""
+#    CertFile: ""
+#    KeyFile: ""
+#    CertPEMBlock: ""
+#    KeyPEMBlock: ""
+#    SkipCertVerify: "true/false"
+
 #Override the global configurations
 application_conf: #Conf_key
   protocol: tcp
   server: localhost
   port: 5571
-  topic: application
+  topic: application
+
+mqtt_conf: #Conf_key
+  protocol: tcp
+  server: 10.211.55.6
+  port: 1883
+  topic: events
+  type: mqtt
+  optional:
+    ClientId: "client1"

+ 46 - 0
fvt_scripts/edgex/pub.go

@@ -113,6 +113,50 @@ func pubToAnother() {
 	}
 }
 
+func pubToMQTT() {
+	var msgConfig2 = types.MessageBusConfig{
+		PublishHost: types.HostInfo{
+			Host:     "10.211.55.6",
+			Port:     1883,
+			Protocol: "tcp",
+		},
+		Optional:map[string]string{
+			"ClientId": "0001_client_id",
+		},
+		Type:messaging.MQTT,
+	}
+	if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
+		log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			log.Fatal(ec)
+		}
+		client := coredata.NewEventClient(local.New("test1"))
+		var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
+		var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
+		var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
+
+		testEvent.Readings = append(testEvent.Readings, r1, r2)
+
+		data, err := client.MarshalEvent(testEvent)
+		if err != nil {
+			fmt.Errorf("unexpected error MarshalEvent %v", err)
+		} else {
+			fmt.Println(string(data))
+		}
+
+		env := types.NewMessageEnvelope([]byte(data), context.Background())
+		env.ContentType = "application/json"
+
+		if e := msgClient.Publish(env, "events"); e != nil {
+			log.Fatal(e)
+		} else {
+			fmt.Printf("pubToAnother successful: %s\n", data)
+		}
+		time.Sleep(1500 * time.Millisecond)
+	}
+}
+
 func pubMetaSource() {
 	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
 		log.Fatal(err)
@@ -160,6 +204,8 @@ func main() {
 			pubToAnother()
 		} else if v == "meta" {
 			pubMetaSource()
+		} else if v == "mqtt" {
+			pubToMQTT()
 		}
 	}
 }

+ 57 - 4
fvt_scripts/edgex/sub/sub.go

@@ -5,9 +5,10 @@ import (
 	"github.com/edgexfoundry/go-mod-messaging/messaging"
 	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
 	"github.com/emqx/kuiper/common"
+	"os"
 )
 
-func main() {
+func subEventsFromZMQ() {
 	var msgConfig1 = types.MessageBusConfig{
 		SubscribeHost: types.HostInfo{
 			Host:     "localhost",
@@ -23,9 +24,6 @@ func main() {
 		if ec := msgClient.Connect(); ec != nil {
 			common.Log.Fatal(ec)
 		} else {
-			if err := msgClient.Connect(); err != nil {
-				common.Log.Fatal(err)
-			}
 			//log.Infof("The connection to edgex messagebus is established successfully.")
 			messages := make(chan types.MessageEnvelope)
 			topics := []types.TopicChannel{{Topic: "", Messages: messages}}
@@ -52,3 +50,58 @@ func main() {
 		}
 	}
 }
+
+func subEventsFromMQTT() {
+	var msgConfig1 = types.MessageBusConfig{
+		SubscribeHost: types.HostInfo{
+			Host:     "10.211.55.6",
+			Port:     1883,
+			Protocol: "tcp",
+		},
+		Type:messaging.MQTT,
+	}
+
+	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
+		common.Log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			common.Log.Fatal(ec)
+		} else {
+			//log.Infof("The connection to edgex messagebus is established successfully.")
+			messages := make(chan types.MessageEnvelope)
+			topics := []types.TopicChannel{{Topic: "result", Messages: messages}}
+			err := make(chan error)
+			if e := msgClient.Subscribe(topics, err); e != nil {
+				//log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
+				common.Log.Fatal(e)
+			} else {
+				var count int = 0
+				for {
+					select {
+					case e1 := <-err:
+						common.Log.Errorf("%s\n", e1)
+						return
+					case env := <-messages:
+						count ++
+						fmt.Printf("%s\n", env.Payload)
+						if count == 1 {
+							return
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+func main() {
+
+	if len(os.Args) == 1 {
+		subEventsFromZMQ()
+	} else if len(os.Args) == 2 {
+		if v := os.Args[1]; v == "mqtt" {
+			subEventsFromMQTT()
+		}
+	}
+
+}

+ 461 - 0
fvt_scripts/edgex_mqtt_sink_rule.jmx

@@ -0,0 +1,461 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<jmeterTestPlan version="1.2" properties="4.0" jmeter="4.0 r1823414">
+  <hashTree>
+    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
+      <stringProp name="TestPlan.comments"></stringProp>
+      <boolProp name="TestPlan.functional_mode">false</boolProp>
+      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
+      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
+      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments"/>
+      </elementProp>
+      <stringProp name="TestPlan.user_define_classpath"></stringProp>
+    </TestPlan>
+    <hashTree>
+      <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments">
+          <elementProp name="srv" elementType="Argument">
+            <stringProp name="Argument.name">srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="rest_port" elementType="Argument">
+            <stringProp name="Argument.name">rest_port</stringProp>
+            <stringProp name="Argument.value">9081</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="fvt" elementType="Argument">
+            <stringProp name="Argument.name">fvt</stringProp>
+            <stringProp name="Argument.value">${__property(fvt,,)}</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="mqtt_srv" elementType="Argument">
+            <stringProp name="Argument.name">mqtt_srv</stringProp>
+            <stringProp name="Argument.value">10.211.55.6</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+        </collectionProp>
+      </Arguments>
+      <hashTree/>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Rules" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <TransactionController guiclass="TransactionControllerGui" testclass="TransactionController" testname="Rule" enabled="true">
+          <boolProp name="TransactionController.includeTimers">false</boolProp>
+          <boolProp name="TransactionController.parent">false</boolProp>
+        </TransactionController>
+        <hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateStream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+&quot;sql&quot; : &quot;create stream demo () WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;edgex\&quot; Conf_key=\&quot;mqtt_conf\&quot;)&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1754954177">Stream demo is created.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;id&quot;: &quot;rule1&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT * FROM demo WHERE temperature = 72&quot;,&#xd;
+  &quot;actions&quot;: [&#xd;
+    {&#xd;
+    	 &quot;log&quot;: {}&#xd;
+    }&#xd;
+  ]&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">0</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+          </hashTree>
+          <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+            <boolProp name="SystemSampler.checkReturnCode">false</boolProp>
+            <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+            <stringProp name="SystemSampler.command">fvt_scripts/edgex/pub</stringProp>
+            <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="Argument">
+                  <stringProp name="Argument.name"></stringProp>
+                  <stringProp name="Argument.value">mqtt</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+          </SystemSampler>
+          <hashTree/>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">10</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="false">
+              <stringProp name="JSON_PATH">$.sink_sink_mqtt_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">6</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">5000</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_Drop_Stream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams/demo</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="287881319">Stream demo is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+            <stringProp name="ConstantTimer.delay">500</stringProp>
+          </ConstantTimer>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Result" enabled="false">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+          <boolProp name="SystemSampler.checkReturnCode">true</boolProp>
+          <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+          <stringProp name="SystemSampler.command">fvt_scripts/edgex/sub/sub</stringProp>
+          <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments">
+              <elementProp name="" elementType="Argument">
+                <stringProp name="Argument.name"></stringProp>
+                <stringProp name="Argument.value">mqtt</stringProp>
+                <stringProp name="Argument.metadata">=</stringProp>
+              </elementProp>
+            </collectionProp>
+          </elementProp>
+          <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+            <collectionProp name="Arguments.arguments"/>
+          </elementProp>
+          <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+        </SystemSampler>
+        <hashTree>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="device Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.device</stringProp>
+            <stringProp name="EXPECTED_VALUE">demo</stringProp>
+            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.readings[0].value</stringProp>
+            <stringProp name="EXPECTED_VALUE">81</stringProp>
+            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$.readings[1].value</stringProp>
+            <stringProp name="EXPECTED_VALUE">72</stringProp>
+            <boolProp name="JSONVALIDATION">true</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+    </hashTree>
+  </hashTree>
+</jmeterTestPlan>

+ 11 - 7
xstream/extensions/edgex_source.go

@@ -1,4 +1,3 @@
-// +build edgex
 
 package extensions
 
@@ -46,10 +45,9 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 	var mbusType = messaging.ZeroMQ
 	if t, ok := props["type"]; ok {
 		mbusType = t.(string)
-	}
-
-	if messaging.ZeroMQ != strings.ToLower(mbusType) {
-		mbusType = messaging.MQTT
+		if mbusType != messaging.ZeroMQ && mbusType != messaging.MQTT {
+			return fmt.Errorf("Specified wrong message type value %s, will use zeromq messagebus.\n", mbusType)
+		}
 	}
 
 	if serviceServer, ok := props["serviceServer"]; ok {
@@ -61,7 +59,7 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 		return fmt.Errorf("The service server cannot be empty.")
 	}
 
-	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: messaging.ZeroMQ}
+	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: mbusType}
 	common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
 
 	var optional = make(map[string]string)
@@ -69,6 +67,9 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 		if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
 			for k, v := range ops1 {
 				k1 := k.(string)
+				//if !xstream.IsAllowedEdgeOptionalKeys(k1) {
+				//	return fmt.Errorf("The optional key %s is not allowed. ", k1)
+				//}
 				v1 := v.(string)
 				optional[k1] = v1
 			}
@@ -88,7 +89,10 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	log := ctx.GetLogger()
 	if err := es.client.Connect(); err != nil {
-		errCh <- fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
+		info := fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
+		log.Errorf(info.Error())
+		errCh <- info
+		return
 	}
 	log.Infof("The connection to edgex messagebus is established successfully.")
 	messages := make(chan types.MessageEnvelope)

+ 23 - 18
xstream/sinks/edgex_sink.go

@@ -1,4 +1,3 @@
-// +build edgex
 
 package sinks
 
@@ -26,16 +25,10 @@ type EdgexMsgBusSink struct {
 	deviceName string
 	metadata   string
 
-	optional *OptionalConf
+	optional map[string]string
 	client   messaging.MessageClient
 }
 
-type OptionalConf struct {
-	clientid string
-	username string
-	password string
-}
-
 func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 	ems.host = "*"
 	ems.protocol = "tcp"
@@ -81,6 +74,14 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 		common.Log.Infof("Not find contentType conf, will use default value 'application/json'.")
 	}
 
+	if ptype, ok := ps["type"]; ok {
+		ems.ptype = ptype.(string)
+		if ems.ptype != messaging.ZeroMQ && ems.ptype != messaging.MQTT {
+			common.Log.Infof("Specified wrong message type value %s, will use zeromq messagebus.\n", ems.ptype)
+			ems.ptype = messaging.ZeroMQ
+		}
+	}
+
 	if dname, ok := ps["deviceName"]; ok {
 		ems.deviceName = dname.(string)
 	}
@@ -91,17 +92,20 @@ func (ems *EdgexMsgBusSink) Configure(ps map[string]interface{}) error {
 
 	if optIntf, ok := ps["optional"]; ok {
 		if opt, ok1 := optIntf.(map[string]interface{}); ok1 {
-			optional := &OptionalConf{}
-			ems.optional = optional
-			if cid, ok2 := opt["clientid"]; ok2 {
-				optional.clientid = cid.(string)
-			}
-			if uname, ok2 := opt["username"]; ok2 {
-				optional.username = uname.(string)
-			}
-			if password, ok2 := opt["password"]; ok2 {
-				optional.password = password.(string)
+			optional := make(map[string]string)
+			for k, v := range opt {
+				//if !xstream.IsAllowedEdgeOptionalKeys(k) {
+				//	return fmt.Errorf("The optional key %s is not allowed. ", k)
+				//}
+				if sv, ok2 := v.(string); ok2 {
+					optional[k] = sv
+				} else {
+					info := fmt.Sprintf("Only string value is allowed for optional value, the value for key %s is not a string.", k)
+					common.Log.Infof(info)
+					return fmt.Errorf(info)
+				}
 			}
+			ems.optional = optional
 		}
 	}
 	return nil
@@ -116,6 +120,7 @@ func (ems *EdgexMsgBusSink) Open(ctx api.StreamContext) error {
 			Protocol: ems.protocol,
 		},
 		Type: ems.ptype,
+		Optional: ems.optional,
 	}
 	log.Infof("Using configuration for EdgeX message bus sink: %+v", conf)
 	if msgClient, err := messaging.NewMessageClient(conf); err != nil {