Explorar el Código

Merge pull request #139 from emqx/edgex_src

add more unit testcase and doc
ngjaying hace 5 años
padre
commit
3ec115454e

+ 6 - 0
.github/workflows/fvt_tests.yaml

@@ -39,6 +39,10 @@ jobs:
             wget -O emqx.deb https://www.emqx.io/downloads/broker/v4.0.2/emqx-ubuntu18.04-${EMQX_VERSION}_amd64.deb
             sudo dpkg -i emqx.deb
         - uses: actions/checkout@v2
+        - name: cat user.properties
+          run:  cat /tmp/apache-jmeter-5.2.1/user.properties
+        - name: cat edgex.yaml
+          run: cat etc/sources/edgex.yaml
         - name: build kuiper
           run: |
             sudo apt update && sudo apt install pkg-config libczmq-dev -y
@@ -56,6 +60,7 @@ jobs:
             name: kuiper_logs_with_edgex
             path: ./kuiper_logs
         - uses: actions/upload-artifact@v1
+          if: always()
           with:
             name: jmeter_logs_with_edgex
             path: ./jmeter_logs
@@ -116,6 +121,7 @@ jobs:
             name: kuiper_logs_without_edgex
             path: ./kuiper_logs
         - uses: actions/upload-artifact@v1
+          if: always()
           with:
             name: jmeter_logs_without_edgex
             path: ./jmeter_logs

+ 4 - 4
etc/sources/edgex.yaml

@@ -10,8 +10,8 @@ default:
 #    Username: user1
 #    Password: password
 #Override the global configurations
-demo_conf: #Conf_key
+application_conf: #Conf_key
   protocol: tcp
-  server: 10.211.55.6
-  port: 5570
-  topic: events
+  server: localhost
+  port: 5571
+  topic: application

+ 23 - 0
fvt_scripts/README.md

@@ -158,5 +158,28 @@ For most of scripts, you can just start JMeter by default way, such as ``bin/jme
     ```
 
   - The processing SQL is ``SELECT * FROM demo WHERE temperature > 30``, so all of the data that with temperature less than 30 will be fitered. 
+  
   - Another JMeter mock-up user subscribes MQTT result topic, and assert message number and contents.
+  
+- [Multiple EdgeX source configurations](fvt_scripts/select_edgex_another_bus_rule.jmx)
+
+  The test script is used for testing specifying another EdgeX source configurations in Kuiper.
+
+  - In the ``edgex.yaml`` configuration file, below additional configurations are specified.
+
+  ```yaml
+  application_conf: #Conf_key
+    protocol: tcp
+    server: localhost
+    port: 5571
+    topic: application
+  ```
+
+  - In the create stream statement, test script uses ``CONF_KEY`` keyword to use overrided configuration value that specified in ``edgex.yaml``.
+
+  ```sql
+  CREATE STREAM application () WITH (FORMAT="JSON", TYPE="edgex", CONF_KEY = "application_conf")
+  ```
+
+  - As same steps that required in the ``select_edgex_condition_rule.jmx``, EdgeX value descriptor service & message bus publish tool should be ready.
 

+ 45 - 21
fvt_scripts/edgex/pub.go

@@ -18,10 +18,19 @@ var msgConfig1 = types.MessageBusConfig{
 		Port:     5570,
 		Protocol: "tcp",
 	},
+	Type:messaging.ZeroMQ,
+}
+
+var msgConfig2 = types.MessageBusConfig{
+	PublishHost: types.HostInfo{
+		Host:     "*",
+		Port:     5571,
+		Protocol: "tcp",
+	},
+	Type:messaging.ZeroMQ,
 }
 
 func pubEventClientZeroMq() {
-	msgConfig1.Type = messaging.ZeroMQ
 	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
 		log.Fatal(err)
 	} else {
@@ -71,31 +80,46 @@ func pubEventClientZeroMq() {
 	}
 }
 
-//func pubErrValue() {
-//	msgConfig1.Type = messaging.ZeroMQ
-//	if msgClient, err := messaging.NewMessageClient(msgConfig1); err != nil {
-//		log.Fatal(err)
-//	} else {
-//		if ec := msgClient.Connect(); ec != nil {
-//			log.Fatal(ec)
-//		}
-//		client := coredata.NewEventClient(local.New("test"))
-//		var testEvent = models.Event{Device: "test"}
-//		r1 := models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name",
-//			Name: "Temperature", Value: fmt.Sprintf("%d", i*8)}
-//		r2 := models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name",
-//			Name: "Humidity", Value: fmt.Sprintf("%d", i*9)}
-//
-//	}
-//}
+func pubToAnother() {
+	msgConfig1.Type = messaging.ZeroMQ
+	if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
+		log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			log.Fatal(ec)
+		}
+		client := coredata.NewEventClient(local.New("test1"))
+		var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
+		var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Temperature", Value: "20"}
+		var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "test device name", Name: "Humidity", Value: "30"}
+
+		testEvent.Readings = append(testEvent.Readings, r1, r2)
+
+		data, err := client.MarshalEvent(testEvent)
+		if err != nil {
+			fmt.Errorf("unexpected error MarshalEvent %v", err)
+		} else {
+			fmt.Println(string(data))
+		}
+
+		env := types.NewMessageEnvelope([]byte(data), context.Background())
+		env.ContentType = "application/json"
+
+		if e := msgClient.Publish(env, "application"); e != nil {
+			log.Fatal(e)
+		} else {
+			fmt.Printf("Pub successful: %s\n", data)
+		}
+	}
+}
 
 func main() {
 	if len(os.Args) == 1 {
 		pubEventClientZeroMq()
 	} else if len(os.Args) == 2 {
-		//if v := os.Args[0]; v == "err_value" {
-		//
-		//}
+		if v := os.Args[1]; v == "another" {
+			pubToAnother()
+		}
 	}
 }
 

+ 12 - 9
fvt_scripts/run_jmeter.sh

@@ -43,31 +43,34 @@ fvt_dir=`pwd`
 
 rm -rf jmeter_logs
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/streams_test.jmx -Dbase="$base_dir" -l jmeter_logs/stream_test.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/streams_test.jmx -Dbase="$base_dir" -l jmeter_logs/stream_test.jtl -j jmeter_logs/stream_test.log
 echo -e "---------------------------------------------\n"
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/rule_test.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/rule_test.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/rule_test.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/rule_test.jtl -j jmeter_logs/rule_test.log
 echo -e "---------------------------------------------\n"
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_all_rule.jmx -l jmeter_logs/select_all_rule.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_all_rule.jmx -l jmeter_logs/select_all_rule.jtl -j jmeter_logs/select_all_rule.log
 echo -e "---------------------------------------------\n"
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_condition_rule.jmx -l jmeter_logs/select_condition_rule.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_condition_rule.jmx -l jmeter_logs/select_condition_rule.jtl -j jmeter_logs/select_condition_rule.log
 echo -e "---------------------------------------------\n"
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_aggr_rule.jmx -l jmeter_logs/select_aggr_rule.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_aggr_rule.jmx -l jmeter_logs/select_aggr_rule.jtl -j jmeter_logs/select_aggr_rule.log
 echo -e "---------------------------------------------\n"
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/change_rule_status.jmx -l jmeter_logs/change_rule_status.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/change_rule_status.jmx -l jmeter_logs/change_rule_status.jtl -j jmeter_logs/change_rule_status.log
 echo -e "---------------------------------------------\n"
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/change_stream_rule.jmx -l jmeter_logs/change_stream_rule.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/change_stream_rule.jmx -l jmeter_logs/change_stream_rule.jtl -j jmeter_logs/change_stream_rule.log
 echo -e "---------------------------------------------\n"
 
-/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_aggr_rule_order.jmx -l jmeter_logs/select_aggr_rule_order.jtl
+/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_aggr_rule_order.jmx -l jmeter_logs/select_aggr_rule_order.jtl -j jmeter_logs/select_aggr_rule_order.log
 echo -e "---------------------------------------------\n"
 
 if test $with_edgex = true; then
-  /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_edgex_condition_rule.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_condition_rule.jtl
+  /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_edgex_condition_rule.jmx -Dbase="$base_dir" -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_condition_rule.jtl -j jmeter_logs/select_edgex_condition_rule.log
+  echo -e "---------------------------------------------\n"
+
+  /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/select_edgex_another_bus_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/select_edgex_another_bus_rule.jtl -j jmeter_logs/select_edgex_another_bus_rule.log
   echo -e "---------------------------------------------\n"
 fi

+ 483 - 0
fvt_scripts/select_edgex_another_bus_rule.jmx

@@ -0,0 +1,483 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<jmeterTestPlan version="1.2" properties="4.0" jmeter="4.0 r1823414">
+  <hashTree>
+    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
+      <stringProp name="TestPlan.comments"></stringProp>
+      <boolProp name="TestPlan.functional_mode">false</boolProp>
+      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
+      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
+      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments"/>
+      </elementProp>
+      <stringProp name="TestPlan.user_define_classpath"></stringProp>
+    </TestPlan>
+    <hashTree>
+      <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments">
+          <elementProp name="srv" elementType="Argument">
+            <stringProp name="Argument.name">srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="rest_port" elementType="Argument">
+            <stringProp name="Argument.name">rest_port</stringProp>
+            <stringProp name="Argument.value">9081</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="fvt" elementType="Argument">
+            <stringProp name="Argument.name">fvt</stringProp>
+            <stringProp name="Argument.value">${__property(fvt,,)}</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="mqtt_srv" elementType="Argument">
+            <stringProp name="Argument.name">mqtt_srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+        </collectionProp>
+      </Arguments>
+      <hashTree/>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Rules" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <TransactionController guiclass="TransactionControllerGui" testclass="TransactionController" testname="API" enabled="true">
+          <boolProp name="TransactionController.includeTimers">false</boolProp>
+          <boolProp name="TransactionController.parent">false</boolProp>
+        </TransactionController>
+        <hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateStream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+&quot;sql&quot; : &quot;create stream application () WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;edgex\&quot;, CONF_KEY=\&quot;application_conf\&quot;)&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1625239252">Stream application is created.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;id&quot;: &quot;rule1&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT * FROM application&quot;,&#xd;
+  &quot;actions&quot;: [&#xd;
+    {&#xd;
+      &quot;mqtt&quot;: {&#xd;
+        &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
+        &quot;topic&quot;: &quot;devices/result&quot;,&#xd;
+        &quot;qos&quot;: 1,&#xd;
+        &quot;clientId&quot;: &quot;demo_001&quot;&#xd;
+      }&#xd;
+    }&#xd;
+  ]&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_application_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">0</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+          </hashTree>
+          <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+            <boolProp name="SystemSampler.checkReturnCode">false</boolProp>
+            <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+            <stringProp name="SystemSampler.command">fvt_scripts/edgex/pub</stringProp>
+            <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="Argument">
+                  <stringProp name="Argument.name"></stringProp>
+                  <stringProp name="Argument.value">another</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+          </SystemSampler>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="响应断言" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-99726403">Pub successful</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_application_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">1</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="false">
+              <stringProp name="JSON_PATH">$.sink_sink_mqtt_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">6</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">5000</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_Drop_Stream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams/application</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="417596244">Stream application is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+            <stringProp name="ConstantTimer.delay">500</stringProp>
+          </ConstantTimer>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Result" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <OnceOnlyController guiclass="OnceOnlyControllerGui" testclass="OnceOnlyController" testname="Once Only Controller" enabled="true"/>
+        <hashTree>
+          <net.xmeter.samplers.ConnectSampler guiclass="net.xmeter.gui.ConnectSamplerUI" testclass="net.xmeter.samplers.ConnectSampler" testname="MQTT Connect" enabled="true">
+            <stringProp name="mqtt.server">${mqtt_srv}</stringProp>
+            <stringProp name="mqtt.port">1883</stringProp>
+            <stringProp name="mqtt.version">3.1</stringProp>
+            <stringProp name="mqtt.conn_timeout">10</stringProp>
+            <boolProp name="mqtt.private_protocol">false</boolProp>
+            <stringProp name="mqtt.listener_timeout">10</stringProp>
+            <stringProp name="mqtt.protocol">TCP</stringProp>
+            <boolProp name="mqtt.dual_ssl_authentication">false</boolProp>
+            <stringProp name="mqtt.keystore_file_path"></stringProp>
+            <stringProp name="mqtt.keystore_password"></stringProp>
+            <stringProp name="mqtt.clientcert_file_path"></stringProp>
+            <stringProp name="mqtt.clientcert_password"></stringProp>
+            <stringProp name="mqtt.user_name"></stringProp>
+            <stringProp name="mqtt.password"></stringProp>
+            <stringProp name="mqtt.client_id_prefix">conn_</stringProp>
+            <boolProp name="mqtt.client_id_suffix">true</boolProp>
+            <stringProp name="mqtt.conn_keep_alive">300</stringProp>
+            <stringProp name="mqtt.conn_attampt_max">0</stringProp>
+            <stringProp name="mqtt.reconn_attampt_max">0</stringProp>
+          </net.xmeter.samplers.ConnectSampler>
+          <hashTree/>
+        </hashTree>
+        <net.xmeter.samplers.SubSampler guiclass="net.xmeter.gui.SubSamplerUI" testclass="net.xmeter.samplers.SubSampler" testname="AnalysisResult" enabled="true">
+          <stringProp name="mqtt.topic_name">devices/result</stringProp>
+          <stringProp name="mqtt.qos_level">0</stringProp>
+          <boolProp name="mqtt.add_timestamp">false</boolProp>
+          <boolProp name="mqtt.debug_response">true</boolProp>
+          <stringProp name="mqtt.sample_condition">number of received messages</stringProp>
+          <stringProp name="mqtt.sample_condition_value">1</stringProp>
+        </net.xmeter.samplers.SubSampler>
+        <hashTree>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="temperature Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$[0].temperature</stringProp>
+            <stringProp name="EXPECTED_VALUE"></stringProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+          <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="humidity Assertion" enabled="true">
+            <stringProp name="JSON_PATH">$[0].humidity</stringProp>
+            <stringProp name="EXPECTED_VALUE"></stringProp>
+            <boolProp name="JSONVALIDATION">false</boolProp>
+            <boolProp name="EXPECT_NULL">false</boolProp>
+            <boolProp name="INVERT">false</boolProp>
+            <boolProp name="ISREGEX">false</boolProp>
+          </JSONPathAssertion>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+    </hashTree>
+  </hashTree>
+</jmeterTestPlan>

+ 22 - 9
xstream/extensions/edgex_source.go

@@ -11,14 +11,15 @@ import (
 	"github.com/edgexfoundry/go-mod-core-contracts/models"
 	"github.com/edgexfoundry/go-mod-messaging/messaging"
 	"github.com/edgexfoundry/go-mod-messaging/pkg/types"
+	"github.com/emqx/kuiper/common"
 	"github.com/emqx/kuiper/xstream/api"
-	"github.com/prometheus/common/log"
 	"strconv"
 	"strings"
 )
 
 type EdgexSource struct {
 	client     messaging.MessageClient
+	subscribed bool
 	vdc        coredata.ValueDescriptorClient
 	topic      string
 	valueDescs map[string]string
@@ -48,18 +49,21 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 	}
 
 	if messaging.ZeroMQ != strings.ToLower(mbusType) {
-		log.Info("Using MQTT message bus.")
 		mbusType = messaging.MQTT
 	}
 
 	if serviceServer, ok := props["serviceServer"]; ok {
-		es.vdc = coredata.NewValueDescriptorClient(local.New(serviceServer.(string) + clients.ApiValueDescriptorRoute))
+		svr := serviceServer.(string) + clients.ApiValueDescriptorRoute
+		common.Log.Infof("Connect to value descriptor service at: %s \n", svr)
+		es.vdc = coredata.NewValueDescriptorClient(local.New(svr))
 		es.valueDescs = make(map[string]string)
 	} else {
 		return fmt.Errorf("The service server cannot be empty.")
 	}
 
 	mbconf := types.MessageBusConfig{SubscribeHost: types.HostInfo{Protocol: protocol, Host: server, Port: port}, Type: messaging.ZeroMQ}
+	common.Log.Infof("Use configuration for edgex messagebus %v\n", mbconf)
+
 	var optional = make(map[string]string)
 	if ops, ok := props["optional"]; ok {
 		if ops1, ok1 := ops.(map[interface{}]interface{}); ok1 {
@@ -84,15 +88,18 @@ func (es *EdgexSource) Configure(device string, props map[string]interface{}) er
 func (es *EdgexSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error) {
 	log := ctx.GetLogger()
 	if err := es.client.Connect(); err != nil {
-		errCh <- fmt.Errorf("Failed to connect to message bus: " + err.Error())
+		errCh <- fmt.Errorf("Failed to connect to edgex message bus: " + err.Error())
 	}
+	log.Infof("The connection to edgex messagebus is established successfully.")
 	messages := make(chan types.MessageEnvelope)
 	topics := []types.TopicChannel{{Topic: es.topic, Messages: messages}}
 	err := make(chan error)
 	if e := es.client.Subscribe(topics, err); e != nil {
-		log.Errorf("Failed to subscribe to topic %s.\n", e)
+		log.Errorf("Failed to subscribe to edgex messagebus topic %s.\n", e)
 		errCh <- e
 	} else {
+		es.subscribed = true
+		log.Infof("Successfully subscribed to edgex messagebus topic %s.", es.topic)
 		for {
 			select {
 			case e1 := <-err:
@@ -196,6 +203,11 @@ func (es *EdgexSource) fetchAllDataDescriptors() error {
 		for _, vd := range vdArr {
 			es.valueDescs[vd.Id] = vd.Type
 		}
+		if len(vdArr) == 0 {
+			common.Log.Infof("Cannot find any value descriptors from value descriptor services.")
+		} else {
+			common.Log.Infof("Get %d of value descriptors from service.", len(vdArr))
+		}
 	}
 	return nil
 }
@@ -216,9 +228,10 @@ func (es *EdgexSource) getType(id string, logger api.Logger) (string, error) {
 }
 
 func (es *EdgexSource) Close(ctx api.StreamContext) error {
-	if e := es.client.Disconnect(); e != nil {
-		return e
-	} else {
-		return nil
+	if es.subscribed {
+		if e := es.client.Disconnect(); e != nil {
+			return e
+		}
 	}
+	return nil
 }

+ 158 - 0
xstream/extensions/edgex_source_test.go

@@ -0,0 +1,158 @@
+// +build edgex
+
+package extensions
+
+import (
+	"fmt"
+	"github.com/edgexfoundry/go-mod-core-contracts/models"
+	"github.com/emqx/kuiper/common"
+	"testing"
+)
+
+var es = EdgexSource{valueDescs: map[string]string{
+	"b1" : "bool",
+	"i1" : "int8",
+	"i2" : "INT16",
+	"i3" : "INT32",
+	"i4" : "INT64",
+	"i5" : "UINT8",
+	"i6" : "UINT16",
+	"i7" : "UINT32",
+	"i8" : "UINT64",
+	"f1" : "FLOAT32",
+	"f2" : "FLOAT64",
+	"s1" : "String",
+	},
+}
+
+func TestGetValue_Int(t *testing.T) {
+	var testEvent = models.Event{Device: "test"}
+	for i := 1; i < 9; i++{
+		r1 := models.Reading{Name: fmt.Sprintf("i%d", i), Value: "1"}
+		testEvent.Readings = append(testEvent.Readings, r1)
+	}
+
+	for _, r := range testEvent.Readings {
+		if v, e := es.getValue(r, common.Log); e != nil {
+			t.Errorf("%s", e)
+		} else {
+			expectOne(t, v)
+		}
+	}
+}
+
+func expectOne(t *testing.T, expected interface{}) {
+	if v1, ok := expected.(int); ok {
+		if v1 != 1 {
+			t.Errorf("expected 1, but it's %d.", v1)
+		}
+	} else {
+		t.Errorf("expected int type, but it's %t.", expected)
+	}
+}
+
+func TestGetValue_Float(t *testing.T) {
+	var testEvent = models.Event{Device: "test"}
+	for i := 1; i < 3; i++{
+		r1 := models.Reading{Name: fmt.Sprintf("f%d", i), Value: "3.14"}
+		testEvent.Readings = append(testEvent.Readings, r1)
+	}
+
+	for _, r := range testEvent.Readings {
+		if v, e := es.getValue(r, common.Log); e != nil {
+			t.Errorf("%s", e)
+		} else {
+			expectPi(t, v)
+		}
+	}
+}
+
+func expectPi(t *testing.T, expected interface{}) {
+	if v1, ok := expected.(float64); ok {
+		if v1 != 3.14 {
+			t.Errorf("expected 3.14, but it's %f.", v1)
+		}
+	} else {
+		t.Errorf("expected float type, but it's %t.", expected)
+	}
+}
+
+
+func TestGetValue_Bool(t *testing.T) {
+	///////////True
+	trues := []string{"1", "t", "T", "true", "TRUE", "True"}
+	for _, v := range trues {
+		r1 := models.Reading{Name: "b1", Value: v}
+		if v, e := es.getValue(r1, common.Log); e != nil {
+			t.Errorf("%s", e)
+		} else {
+			expectTrue(t, v)
+		}
+	}
+
+	r1 := models.Reading{Name: "b1", Value: "TRue"}
+	if _, e := es.getValue(r1, common.Log); e == nil {
+		t.Errorf("%s", e)
+	}
+
+	///////////False
+	falses := []string{"0", "f", "F", "false", "FALSE", "False"}
+	for _, v := range falses {
+		r1 := models.Reading{Name: "b1", Value: v}
+		if v, e := es.getValue(r1, common.Log); e != nil {
+			t.Errorf("%s", e)
+		} else {
+			expectFalse(t, v)
+		}
+	}
+
+	r1 = models.Reading{Name: "b1", Value: "FAlse"}
+	if _, e := es.getValue(r1, common.Log); e == nil {
+		t.Errorf("%s", e)
+	}
+}
+
+func expectTrue(t *testing.T, expected interface{}) {
+	if v1, ok := expected.(bool); ok {
+		if !v1 {
+			t.Errorf("expected true, but it's false.")
+		}
+	} else {
+		t.Errorf("expected boolean type, but it's %t.", expected)
+	}
+}
+
+func expectFalse(t *testing.T, expected interface{}) {
+	if v1, ok := expected.(bool); ok {
+		if v1 {
+			t.Errorf("expected false, but it's true.")
+		}
+	} else {
+		t.Errorf("expected boolean type, but it's %t.", expected)
+	}
+}
+
+func TestWrongType(t *testing.T) {
+	es1 := EdgexSource{valueDescs: map[string]string{
+		"f": "FLOAT", //A not exsited type
+		},
+	}
+	r1 := models.Reading{Name: "f", Value: "100"}
+	if v, _ := es1.getValue(r1, common.Log); v != "100" {
+		t.Errorf("Expected 100, but it's %s!", v)
+	}
+}
+
+func TestWrongValue(t *testing.T) {
+	var testEvent = models.Event{Device: "test"}
+	r1 := models.Reading{Name: "b1", Value: "100"} //100 cannot be converted to a boolean value
+	r2 := models.Reading{Name: "i1", Value: "int"} //'int' string cannot be converted to int value
+	r3 := models.Reading{Name: "f1", Value: "float"} //'float' string cannot be converted to int value
+	testEvent.Readings = append(testEvent.Readings, r1, r2, r3)
+
+	for _, v := range testEvent.Readings {
+		if _, e := es.getValue(v, common.Log); e == nil {
+			t.Errorf("Expected an error!")
+		}
+	}
+}