Quellcode durchsuchen

Feat/edgex array (#339)

* feat(edgex): add edgex array support

* feat(edgex): add testcase for edgex array

* feat(edgex): add testcase for edgex array

* feat(edgex): add edgex array support

* feat(edgex): add testcase for edgex array

* feat(edgex): add testcase for edgex array

* docs(file): update description of README

* docs(file): update data types support docs in EdgeX

* feat(edgex): refactor json expr runtime

* feat(edgex): refactor json expr runtime
jinfahua vor 4 Jahren
Ursprung
Commit
ab0d571928

+ 1 - 1
.github/workflows/run_fvt_tests.yaml

@@ -48,7 +48,7 @@ jobs:
             sudo ./fvt_scripts/setup_env.sh
             ln -s _build/kuiper-$(git describe --tags --always)-$(uname -s | tr "[A-Z]" "[a-z]")-x86_64/log kuiper_logs
         - name: run fvt tests
-          timeout-minutes: 5
+          timeout-minutes: 8
           run: ./fvt_scripts/run_jmeter.sh with_edgex=true
         - uses: actions/upload-artifact@v1
           if: always()

+ 3 - 3
README-CN.md

@@ -29,7 +29,7 @@ Kuiper 可以运行在各类物联网的边缘使用场景中,比如工业物
   - 通过 SQL 支持数据抽取、转换和过滤
   - 数据排序、分组、聚合、连接
   - 60+ 各类函数,覆盖数学运算、字符串处理、聚合运算和哈希运算等
-  - 4 类时间窗口
+  - 4 类时间窗口,以及计数窗口
 
 - 高可扩展性
 
@@ -41,8 +41,8 @@ Kuiper 可以运行在各类物联网的边缘使用场景中,比如工业物
 
 - 管理能力
 
-  - 命令行对流、规则进行管理
-  - 通过 REST API 也可以对流、规则进行管理(规划中)
+  - 通过命令行对流、规则和插件进行管理
+  - 通过 REST API 也可以对流、规则和插件进行管理
   - 与 [KubeEdge](https://github.com/kubeedge/kubeedge)、[K3s](https://github.com/rancher/k3s) 等基于边缘 Kubernetes 框架的集成能力
 
 - 与 EMQ X Edge 集成

+ 2 - 2
README.md

@@ -29,7 +29,7 @@ It can be run at various IoT edge use scenarios, such as real-time processing of
   - Support data extract, transform and filter through SQL 
   - Data order, group, aggregation and join
   - 60+ functions, includes mathematical, string, aggregate and hash etc
-  - 4 time windows
+  - 4 time windows & count window
 
 - Highly extensibile 
 
@@ -42,7 +42,7 @@ It can be run at various IoT edge use scenarios, such as real-time processing of
 - Management
 
   - Stream and rule management through CLI
-  - Stream and rule management through REST API (In planning)
+  - Stream and rule management through REST API
   - Easily be integrate with [KubeEdge](https://github.com/kubeedge/kubeedge) and [K3s](https://github.com/rancher/k3s), which bases Kubernetes
 
 - Integration with EMQ X Edge

+ 1 - 0
docs/en_US/edgex/edgex_rule_engine_tutorial.md

@@ -282,6 +282,7 @@ Current rule does not filter any data that are sent to Kuiper, so how to filter
 - Read [EdgeX source](../rules/sources/edgex.md) for more detailed information of configurations and data type conversion.
 - [How to use meta function to extract additional data from EdgeX message bus?](edgex_meta.md) There are some other information are sent along with device service, such as event created time, event id etc. If you want to use such metadata information in your SQL statements, please refer to this doc.
 - [EdgeX message bus sink doc](../rules/sinks/edgex.md). The document describes how to use EdgeX message bus sink. If you'd like to send the analysis result into message bus, you are probably interested in this article. 
+- [Kuiper plugin development tutorial](../plugins/plugins_tutorial.md): Kuiper plugin is based on the plugin mechanism of Golang, users can build loosely-coupled plugin applications,  dynamic loading and binding when it is running. You can refer to this article if you're interested in Kuiper plugin development.
 
  If you want to explore more features of EMQ X Kuiper, please refer to below resources.
 

+ 13 - 1
docs/en_US/rules/sources/edgex.md

@@ -35,12 +35,24 @@ If  ``Type`` value of ``ValueDescriptor`` is ``INT8`` , ``INT16``, ``INT32``,  `
 
 ### Float
 
-If  ``Type`` value of ``ValueDescriptor`` is ``FLOAT16`` , ``FLOAT32``, ``FLOAT64``then Kuiper tries to convert to ``Float`` type. 
+If  ``Type`` value of ``ValueDescriptor`` is ``FLOAT32``, ``FLOAT64``, then Kuiper tries to convert to ``Float`` type. 
 
 ### String
 
 If  ``Type`` value of ``ValueDescriptor`` is ``String``, then Kuiper tries to convert to ``String`` type. 
 
+### Boolean array
+
+`Bool` array type in EdgeX will be converted to `boolean` array.
+
+### Bigint array
+
+All of ``INT8`` , ``INT16``, ``INT32``,  ``INT64``,``UINT`` , ``UINT8`` , ``UINT16`` ,  ``UINT32`` , ``UINT64``  array types in EdgeX will be converted to `Bigint` array.
+
+### Float array
+
+All of ``FLOAT32``, ``FLOAT64``  array types in EdgeX will be converted to `Float` array.
+
 # Global configurations
 
 The configuration file of EdgeX source is at ``$kuiper/etc/sources/edgex.yaml``. Below is the file format.

+ 1 - 0
docs/zh_CN/edgex/edgex_rule_engine_tutorial.md

@@ -275,6 +275,7 @@ Connecting to 127.0.0.1:20498...
 - 阅读 [EdgeX 源](../rules/sources/edgex.md) 获取更多详细信息,以及类型转换等。
 - [如何使用 meta 函数抽取在 EdgeX 消息总线中发送的更多信息?](edgex_meta.md) 设备服务往总线上发送数据的时候,一些额外的信息也随之发送,比如时间创建时间,id 等。如果你想在 SQL 语句中使用这些信息,请参考这篇文章。
 - [EdgeX 消息总线目标](../rules/sinks/edgex.md). 该文档描述了如何使用 EdgeX 消息总线目标。如果你想把分析结果发送到消息总线中,你可能对此文章感兴趣。 
+- [Kuiper 插件开发教程](../plugins/plugins_tutorial.md): Kuiper 插件机制基于 Go 语言的插件机制,使用户可以构建松散耦合的插件程序,在运行时动态加载和绑定,如果您对开发插件有兴趣,请参考该文章。
 
 如想了解更多的 EMQ X Kuiper 的信息,请参考以下资源。
 

+ 15 - 3
docs/zh_CN/rules/sources/edgex.md

@@ -41,7 +41,19 @@ EdgeX 源会试图取得某个字段的类型,
 
 ### String
 
-如果 ``ValueDescriptor`` 中  ``Type`` 的值为 ``String``,那么 Kuiper 会试着将其转换为 ``String`` 类型。 
+如果 ``ValueDescriptor`` 中  ``Type`` 的值为 ``String``,那么 Kuiper 会试着将其转换为 ``String`` 类型。
+
+### Boolean 数组
+
+EdgeX 中的 `Bool` 数组类型会被转换为 `boolean` 数组。
+
+### Bigint 数组
+
+EdgeX 中所有的 ``INT8`` , ``INT16``, ``INT32``,  ``INT64``,``UINT`` , ``UINT8`` , ``UINT16`` ,  ``UINT32`` , ``UINT64``  数组类型会被转换为 `Bigint` 数组。
+
+### Float 数组
+
+EdgeX 中所有的 ``FLOAT32``, ``FLOAT64``  数组类型会被转换为 `Float` 数组。 
 
 # 全局配置
 
@@ -73,11 +85,11 @@ EdgeX 消息总线的地址,缺省为 ``localhost``
 
 ## port
 
-EdgeX 消息总线的端口,缺省为 ``5573``.
+EdgeX 消息总线的端口,缺省为 ``5573``
 
 ## topic
 
-EdgeX 消息总线上监听的主题名称,缺省为 ``events``.
+EdgeX 消息总线上监听的主题名称,缺省为 ``events``
 
 ## serviceServer
 

+ 6 - 0
fvt_scripts/README.md

@@ -193,6 +193,12 @@ For most of scripts, you can just start JMeter by default way, such as ``bin/jme
   # go build -o fvt_scripts/edgex/sub/sub fvt_scripts/edgex/sub/sub.go 
   ```
 
+- [EdgeX array data type](edgex_array_rule.jmx)
+
+  The test script verifies EdgeX array data type support. The rule uses JSON expression in both `SELECT` and `WHERE` clause. The sink result is sent to MQTT broker, and it verifies the project result in sampler assertions.
+
+  This test script, you need to prepare ``vdmocker`` & ``pub`` application.
+
 - [An end to end plugin test](plugin_end_2_end.jmx)
   The script is an end-2-end plugin test. It requires a mock http server, and also a plugin.
     ```shell

+ 42 - 0
fvt_scripts/edgex/pub.go

@@ -114,6 +114,46 @@ func pubToAnother() {
 	}
 }
 
+func pubArrayMessage() {
+	var msgConfig2 = types.MessageBusConfig{
+		PublishHost: types.HostInfo{
+			Host:     "*",
+			Port:     5563,
+			Protocol: "tcp",
+		},
+		Type: messaging.ZeroMQ,
+	}
+	if msgClient, err := messaging.NewMessageClient(msgConfig2); err != nil {
+		log.Fatal(err)
+	} else {
+		if ec := msgClient.Connect(); ec != nil {
+			log.Fatal(ec)
+		}
+		client := coredata.NewEventClient(local.New("test1"))
+		var testEvent = models.Event{Device: "demo1", Created: 123, Modified: 123, Origin: 123}
+		var r1 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "bool array", Name: "ba", Value: "[true, true, false]"}
+		var r2 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "int32 array", Name: "ia", Value: "[30, 40, 50]"}
+		var r3 = models.Reading{Pushed: 123, Created: 123, Origin: 123, Modified: 123, Device: "float64 array", Name: "fa", Value: "[3.14, 3.1415, 3.1415926]"}
+
+		testEvent.Readings = append(testEvent.Readings, r1, r2, r3)
+
+		data, err := client.MarshalEvent(testEvent)
+		if err != nil {
+			fmt.Errorf("unexpected error MarshalEvent %v", err)
+		} else {
+			fmt.Println(string(data))
+		}
+
+		env := types.NewMessageEnvelope([]byte(data), context.Background())
+		env.ContentType = "application/json"
+
+		if e := msgClient.Publish(env, "events"); e != nil {
+			log.Fatal(e)
+		}
+		time.Sleep(1500 * time.Millisecond)
+	}
+}
+
 func pubToMQTT(host string) {
 	var msgConfig2 = types.MessageBusConfig{
 		PublishHost: types.HostInfo{
@@ -205,6 +245,8 @@ func main() {
 			pubToAnother()
 		} else if v == "meta" {
 			pubMetaSource()
+		} else if v == "array" {
+			pubArrayMessage()
 		}
 	} else if len(os.Args) == 3 {
 		if v := os.Args[1]; v == "mqtt" {

+ 23 - 8
fvt_scripts/edgex/valuedesc/vd_server.go

@@ -10,13 +10,16 @@ import (
 )
 
 const (
-	desc1 = "Temperature descriptor1"
-	desc2 = "Humidity descriptor2"
-	desc3 = "Boolean descriptor"
-	desc4 = "Int descriptor"
-	desc5 = "Float descriptor"
-	desc6 = "String descriptor"
-	desc7 = "UInt64 descriptor"
+	desc1  = "Temperature descriptor1"
+	desc2  = "Humidity descriptor2"
+	desc3  = "Boolean descriptor"
+	desc4  = "Int descriptor"
+	desc5  = "Float descriptor"
+	desc6  = "String descriptor"
+	desc7  = "UInt64 descriptor"
+	desc8  = "Bool array descriptor"
+	desc9  = "Int array descriptor"
+	desc10 = "Float array descriptor"
 )
 
 var vd1 = models.ValueDescriptor{Id: "Temperature", Created: 123, Modified: 123, Origin: 123, Name: "Temperature",
@@ -32,6 +35,9 @@ var vd4 = models.ValueDescriptor{Id: "i1", Name: "i1", Formatting: "%d", Type: "
 var vd5 = models.ValueDescriptor{Id: "f1", Name: "f1", Formatting: "%f", Type: "FLOAT64", MediaType: clients.ContentTypeJSON}
 var vd6 = models.ValueDescriptor{Id: "s1", Name: "s1", Formatting: "%s", Type: "String", MediaType: clients.ContentTypeJSON}
 var vd7 = models.ValueDescriptor{Id: "ui64", Name: "ui64", Formatting: "%d", Type: "UINT64", MediaType: clients.ContentTypeJSON}
+var vd8 = models.ValueDescriptor{Id: "ba", Name: "ba", Formatting: "%s", Type: "BOOLARRAY", MediaType: clients.ContentTypeJSON}
+var vd9 = models.ValueDescriptor{Id: "ia", Name: "ia", Formatting: "%s", Type: "INT32ARRAY", MediaType: clients.ContentTypeJSON}
+var vd10 = models.ValueDescriptor{Id: "fa", Name: "fa", Formatting: "%s", Type: "FLOAT64ARRAY", MediaType: clients.ContentTypeJSON}
 
 func main() {
 	http.HandleFunc(clients.ApiValueDescriptorRoute, Hello)
@@ -62,7 +68,16 @@ func Hello(w http.ResponseWriter, req *http.Request) {
 	descriptor7 := vd7
 	descriptor7.Description = desc7
 
-	descriptors := []models.ValueDescriptor{descriptor1, descriptor2, descriptor3, descriptor4, descriptor5, descriptor6, descriptor7}
+	descriptor8 := vd8
+	descriptor8.Description = desc8
+
+	descriptor9 := vd9
+	descriptor9.Description = desc9
+
+	descriptor10 := vd10
+	descriptor10.Description = desc10
+
+	descriptors := []models.ValueDescriptor{descriptor1, descriptor2, descriptor3, descriptor4, descriptor5, descriptor6, descriptor7, descriptor8, descriptor9, descriptor10}
 
 	data, err := json.Marshal(descriptors)
 	if err != nil {

+ 477 - 0
fvt_scripts/edgex_array_rule.jmx

@@ -0,0 +1,477 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<jmeterTestPlan version="1.2" properties="4.0" jmeter="4.0 r1823414">
+  <hashTree>
+    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
+      <stringProp name="TestPlan.comments"></stringProp>
+      <boolProp name="TestPlan.functional_mode">false</boolProp>
+      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
+      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
+      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments"/>
+      </elementProp>
+      <stringProp name="TestPlan.user_define_classpath"></stringProp>
+    </TestPlan>
+    <hashTree>
+      <Arguments guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+        <collectionProp name="Arguments.arguments">
+          <elementProp name="srv" elementType="Argument">
+            <stringProp name="Argument.name">srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="rest_port" elementType="Argument">
+            <stringProp name="Argument.name">rest_port</stringProp>
+            <stringProp name="Argument.value">9081</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="fvt" elementType="Argument">
+            <stringProp name="Argument.name">fvt</stringProp>
+            <stringProp name="Argument.value">${__property(fvt,,)}</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+          <elementProp name="mqtt_srv" elementType="Argument">
+            <stringProp name="Argument.name">mqtt_srv</stringProp>
+            <stringProp name="Argument.value">127.0.0.1</stringProp>
+            <stringProp name="Argument.metadata">=</stringProp>
+          </elementProp>
+        </collectionProp>
+      </Arguments>
+      <hashTree/>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Rules" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <TransactionController guiclass="TransactionControllerGui" testclass="TransactionController" testname="API" enabled="true">
+          <boolProp name="TransactionController.includeTimers">false</boolProp>
+          <boolProp name="TransactionController.parent">false</boolProp>
+        </TransactionController>
+        <hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateStream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+&quot;sql&quot; : &quot;create stream demo () WITH (FORMAT=\&quot;JSON\&quot;, TYPE=\&quot;edgex\&quot;)&quot;&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-1754954177">Stream demo is created.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_CreateRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value">{&#xd;
+  &quot;id&quot;: &quot;rule1&quot;,&#xd;
+  &quot;sql&quot;: &quot;SELECT fa[:] as floatarr FROM demo WHERE ba[0]&quot;,&#xd;
+  &quot;actions&quot;: [&#xd;
+    {&#xd;
+      &quot;mqtt&quot;: {&#xd;
+        &quot;server&quot;: &quot;tcp://${mqtt_srv}:1883&quot;,&#xd;
+        &quot;topic&quot;: &quot;devices/result&quot;,&#xd;
+        &quot;qos&quot;: 1,&#xd;
+        &quot;clientId&quot;: &quot;demo_001&quot;&#xd;
+      }&#xd;
+    },&#xd;
+    {&#xd;
+    	&quot;log&quot;: {}&#xd;
+    	}&#xd;
+  ]&#xd;
+}</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules</stringProp>
+            <stringProp name="HTTPSampler.method">POST</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="-2022196798">Rule rule1 was created</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">true</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">0</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+          </hashTree>
+          <SystemSampler guiclass="SystemSamplerGui" testclass="SystemSampler" testname="OS Process Sampler" enabled="true">
+            <boolProp name="SystemSampler.checkReturnCode">false</boolProp>
+            <stringProp name="SystemSampler.expectedReturnCode">0</stringProp>
+            <stringProp name="SystemSampler.command">fvt_scripts/edgex/pub</stringProp>
+            <elementProp name="SystemSampler.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="Argument">
+                  <stringProp name="Argument.name"></stringProp>
+                  <stringProp name="Argument.value">array</stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <elementProp name="SystemSampler.environment" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
+              <collectionProp name="Arguments.arguments"/>
+            </elementProp>
+            <stringProp name="SystemSampler.directory">${__property(fvt,,)}</stringProp>
+          </SystemSampler>
+          <hashTree/>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_GetRuleStatus" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1/status</stringProp>
+            <stringProp name="HTTPSampler.method">GET</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="true">
+              <stringProp name="JSON_PATH">$.source_demo_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">1</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <JSONPathAssertion guiclass="JSONPathAssertionGui" testclass="JSONPathAssertion" testname="JSON Assertion" enabled="false">
+              <stringProp name="JSON_PATH">$.sink_mqtt_0_0_records_in_total</stringProp>
+              <stringProp name="EXPECTED_VALUE">6</stringProp>
+              <boolProp name="JSONVALIDATION">true</boolProp>
+              <boolProp name="EXPECT_NULL">false</boolProp>
+              <boolProp name="INVERT">false</boolProp>
+              <boolProp name="ISREGEX">false</boolProp>
+            </JSONPathAssertion>
+            <hashTree/>
+            <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+              <stringProp name="ConstantTimer.delay">5000</stringProp>
+            </ConstantTimer>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_DropRule" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/rules/rule1</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="717250485">Rule rule1 is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="API_Drop_Stream" enabled="true">
+            <boolProp name="HTTPSampler.postBodyRaw">true</boolProp>
+            <elementProp name="HTTPsampler.Arguments" elementType="Arguments">
+              <collectionProp name="Arguments.arguments">
+                <elementProp name="" elementType="HTTPArgument">
+                  <boolProp name="HTTPArgument.always_encode">false</boolProp>
+                  <stringProp name="Argument.value"></stringProp>
+                  <stringProp name="Argument.metadata">=</stringProp>
+                </elementProp>
+              </collectionProp>
+            </elementProp>
+            <stringProp name="HTTPSampler.domain">${srv}</stringProp>
+            <stringProp name="HTTPSampler.port">${rest_port}</stringProp>
+            <stringProp name="HTTPSampler.protocol"></stringProp>
+            <stringProp name="HTTPSampler.contentEncoding"></stringProp>
+            <stringProp name="HTTPSampler.path">/streams/demo</stringProp>
+            <stringProp name="HTTPSampler.method">DELETE</stringProp>
+            <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
+            <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
+            <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
+            <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
+            <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
+            <stringProp name="HTTPSampler.connect_timeout"></stringProp>
+            <stringProp name="HTTPSampler.response_timeout"></stringProp>
+          </HTTPSamplerProxy>
+          <hashTree>
+            <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="Response Assertion" enabled="true">
+              <collectionProp name="Asserion.test_strings">
+                <stringProp name="287881319">Stream demo is dropped.</stringProp>
+              </collectionProp>
+              <stringProp name="Assertion.custom_message"></stringProp>
+              <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+              <boolProp name="Assertion.assume_success">false</boolProp>
+              <intProp name="Assertion.test_type">16</intProp>
+            </ResponseAssertion>
+            <hashTree/>
+          </hashTree>
+          <ConstantTimer guiclass="ConstantTimerGui" testclass="ConstantTimer" testname="Constant Timer" enabled="true">
+            <stringProp name="ConstantTimer.delay">500</stringProp>
+          </ConstantTimer>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Result" enabled="true">
+        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
+        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
+          <boolProp name="LoopController.continue_forever">false</boolProp>
+          <stringProp name="LoopController.loops">1</stringProp>
+        </elementProp>
+        <stringProp name="ThreadGroup.num_threads">1</stringProp>
+        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
+        <boolProp name="ThreadGroup.scheduler">false</boolProp>
+        <stringProp name="ThreadGroup.duration"></stringProp>
+        <stringProp name="ThreadGroup.delay"></stringProp>
+      </ThreadGroup>
+      <hashTree>
+        <ResultCollector guiclass="ViewResultsFullVisualizer" testclass="ResultCollector" testname="View Results Tree" enabled="true">
+          <boolProp name="ResultCollector.error_logging">false</boolProp>
+          <objProp>
+            <name>saveConfig</name>
+            <value class="SampleSaveConfiguration">
+              <time>true</time>
+              <latency>true</latency>
+              <timestamp>true</timestamp>
+              <success>true</success>
+              <label>true</label>
+              <code>true</code>
+              <message>true</message>
+              <threadName>true</threadName>
+              <dataType>true</dataType>
+              <encoding>false</encoding>
+              <assertions>true</assertions>
+              <subresults>true</subresults>
+              <responseData>false</responseData>
+              <samplerData>false</samplerData>
+              <xml>false</xml>
+              <fieldNames>true</fieldNames>
+              <responseHeaders>false</responseHeaders>
+              <requestHeaders>false</requestHeaders>
+              <responseDataOnError>false</responseDataOnError>
+              <saveAssertionResultsFailureMessage>true</saveAssertionResultsFailureMessage>
+              <assertionsResultsToSave>0</assertionsResultsToSave>
+              <bytes>true</bytes>
+              <sentBytes>true</sentBytes>
+              <threadCounts>true</threadCounts>
+              <idleTime>true</idleTime>
+              <connectTime>true</connectTime>
+            </value>
+          </objProp>
+          <stringProp name="filename"></stringProp>
+        </ResultCollector>
+        <hashTree/>
+        <OnceOnlyController guiclass="OnceOnlyControllerGui" testclass="OnceOnlyController" testname="Once Only Controller" enabled="true"/>
+        <hashTree>
+          <net.xmeter.samplers.ConnectSampler guiclass="net.xmeter.gui.ConnectSamplerUI" testclass="net.xmeter.samplers.ConnectSampler" testname="MQTT Connect" enabled="true">
+            <stringProp name="mqtt.server">${mqtt_srv}</stringProp>
+            <stringProp name="mqtt.port">1883</stringProp>
+            <stringProp name="mqtt.version">3.1</stringProp>
+            <stringProp name="mqtt.conn_timeout">10</stringProp>
+            <boolProp name="mqtt.private_protocol">false</boolProp>
+            <stringProp name="mqtt.listener_timeout">10</stringProp>
+            <stringProp name="mqtt.protocol">TCP</stringProp>
+            <boolProp name="mqtt.dual_ssl_authentication">false</boolProp>
+            <stringProp name="mqtt.keystore_file_path"></stringProp>
+            <stringProp name="mqtt.keystore_password"></stringProp>
+            <stringProp name="mqtt.clientcert_file_path"></stringProp>
+            <stringProp name="mqtt.clientcert_password"></stringProp>
+            <stringProp name="mqtt.user_name"></stringProp>
+            <stringProp name="mqtt.password"></stringProp>
+            <stringProp name="mqtt.client_id_prefix">conn_</stringProp>
+            <boolProp name="mqtt.client_id_suffix">true</boolProp>
+            <stringProp name="mqtt.conn_keep_alive">300</stringProp>
+            <stringProp name="mqtt.conn_attampt_max">0</stringProp>
+            <stringProp name="mqtt.reconn_attampt_max">0</stringProp>
+          </net.xmeter.samplers.ConnectSampler>
+          <hashTree/>
+        </hashTree>
+        <net.xmeter.samplers.SubSampler guiclass="net.xmeter.gui.SubSamplerUI" testclass="net.xmeter.samplers.SubSampler" testname="AnalysisResult" enabled="true">
+          <stringProp name="mqtt.topic_name">devices/result</stringProp>
+          <stringProp name="mqtt.qos_level">0</stringProp>
+          <boolProp name="mqtt.add_timestamp">false</boolProp>
+          <boolProp name="mqtt.debug_response">true</boolProp>
+          <stringProp name="mqtt.sample_condition">number of received messages</stringProp>
+          <stringProp name="mqtt.sample_condition_value">1</stringProp>
+        </net.xmeter.samplers.SubSampler>
+        <hashTree>
+          <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="floatarr" enabled="true">
+            <collectionProp name="Asserion.test_strings">
+              <stringProp name="2010114693">floatarr</stringProp>
+            </collectionProp>
+            <stringProp name="Assertion.custom_message"></stringProp>
+            <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+            <boolProp name="Assertion.assume_success">false</boolProp>
+            <intProp name="Assertion.test_type">16</intProp>
+          </ResponseAssertion>
+          <hashTree/>
+          <ResponseAssertion guiclass="AssertionGui" testclass="ResponseAssertion" testname="floatarr-val" enabled="true">
+            <collectionProp name="Asserion.test_strings">
+              <stringProp name="104169983">3.14,3.1415,3.1415926</stringProp>
+            </collectionProp>
+            <stringProp name="Assertion.custom_message"></stringProp>
+            <stringProp name="Assertion.test_field">Assertion.response_data</stringProp>
+            <boolProp name="Assertion.assume_success">false</boolProp>
+            <intProp name="Assertion.test_type">16</intProp>
+          </ResponseAssertion>
+          <hashTree/>
+        </hashTree>
+      </hashTree>
+    </hashTree>
+  </hashTree>
+</jmeterTestPlan>

+ 2 - 0
fvt_scripts/run_jmeter.sh

@@ -83,6 +83,8 @@ if test $with_edgex = true; then
   /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/edgex_mqtt_sink_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_mqtt_sink_rule.jtl -j jmeter_logs/edgex_mqtt_sink_rule.log
   echo -e "---------------------------------------------\n"
 
+  /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/edgex_array_rule.jmx  -Dfvt="$fvt_dir" -l jmeter_logs/edgex_array_rule.jtl -j jmeter_logs/edgex_array_rule.log
+  echo -e "---------------------------------------------\n"
 fi
 
 /opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t fvt_scripts/plugin_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/plugin_end_2_end.jtl -j jmeter_logs/plugin_end_2_end.log

+ 2 - 2
go.mod

@@ -6,8 +6,8 @@ require (
 	github.com/benbjohnson/clock v1.0.0
 	github.com/buger/jsonparser v0.0.0-20191004114745-ee4c978eae7e
 	github.com/eclipse/paho.mqtt.golang v1.2.0
-	github.com/edgexfoundry/go-mod-core-contracts v0.1.57
-	github.com/edgexfoundry/go-mod-messaging v0.1.18
+	github.com/edgexfoundry/go-mod-core-contracts v0.1.59
+	github.com/edgexfoundry/go-mod-messaging v0.1.21
 	github.com/go-yaml/yaml v2.1.0+incompatible
 	github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
 	github.com/google/uuid v1.1.1

+ 46 - 34
xsql/ast.go

@@ -1159,12 +1159,12 @@ func (v *ValuerEval) evalBinaryExpr(expr *BinaryExpr) interface{} {
 	switch val := lhs.(type) {
 	case map[string]interface{}:
 		return v.evalJsonExpr(val, expr.OP, expr.RHS)
-	case []interface{}, []map[string]interface{}:
-		return v.evalJsonExpr(val, expr.OP, expr.RHS)
 	case error:
 		return val
 	}
-
+	if isSliceOrArray(lhs) {
+		return v.evalJsonExpr(lhs, expr.OP, expr.RHS)
+	}
 	rhs := v.Eval(expr.RHS)
 	if _, ok := rhs.(error); ok {
 		return rhs
@@ -1172,11 +1172,15 @@ func (v *ValuerEval) evalBinaryExpr(expr *BinaryExpr) interface{} {
 	return v.simpleDataEval(lhs, rhs, expr.OP)
 }
 
+func isSliceOrArray(v interface{}) bool {
+	kind := reflect.ValueOf(v).Kind()
+	return kind == reflect.Array || kind == reflect.Slice
+}
+
 func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) interface{} {
-	switch val := result.(type) {
-	case map[string]interface{}:
-		switch op {
-		case ARROW:
+	switch op {
+	case ARROW:
+		if val, ok := result.(map[string]interface{}); ok {
 			switch e := expr.(type) {
 			case *FieldRef, *MetaRef:
 				ve := &ValuerEval{Valuer: Message(val)}
@@ -1184,38 +1188,46 @@ func (v *ValuerEval) evalJsonExpr(result interface{}, op Token, expr Expr) inter
 			default:
 				return fmt.Errorf("the right expression is not a field reference node")
 			}
-		default:
-			return fmt.Errorf("%v is an invalid operation for %T", op, val)
+		} else {
+			return fmt.Errorf("the result %v is not a type of map[string]interface{}", result)
 		}
-	case []interface{}, []map[string]interface{}:
-		switch op {
-		case SUBSET:
-			val := reflect.ValueOf(result)
-			ber := v.Eval(expr)
-			if berVal, ok1 := ber.(*BracketEvalResult); ok1 {
-				if berVal.isIndex() {
-					if berVal.Start >= val.Len() {
-						return fmt.Errorf("out of index: %d of %d", berVal.Start, val.Len())
-					}
-					return val.Index(berVal.Start).Interface()
-				} else {
-					if berVal.Start >= val.Len() {
-						return fmt.Errorf("start value is out of index: %d of %d", berVal.Start, val.Len())
-					}
+	case SUBSET:
+		if isSliceOrArray(result) {
+			return v.subset(result, expr)
+		} else {
+			return fmt.Errorf("%v is an invalid operation for %T", op, result)
+		}
+	default:
+		return fmt.Errorf("%v is an invalid operation for %T", op, result)
+	}
+}
 
-					if berVal.End >= val.Len() {
-						return fmt.Errorf("end value is out of index: %d of %d", berVal.End, val.Len())
-					}
-					return val.Slice(berVal.Start, berVal.End).Interface()
-				}
-			} else {
-				return fmt.Errorf("invalid evaluation result - %v", berVal)
+func (v *ValuerEval) subset(result interface{}, expr Expr) interface{} {
+	val := reflect.ValueOf(result)
+	ber := v.Eval(expr)
+	if berVal, ok1 := ber.(*BracketEvalResult); ok1 {
+		if berVal.isIndex() {
+			if berVal.Start >= val.Len() {
+				return fmt.Errorf("out of index: %d of %d", berVal.Start, val.Len())
 			}
-		default:
-			return fmt.Errorf("%v is an invalid operation for %T", op, val)
+			return val.Index(berVal.Start).Interface()
+		} else {
+			if berVal.Start >= val.Len() {
+				return fmt.Errorf("start value is out of index: %d of %d", berVal.Start, val.Len())
+			}
+
+			if berVal.End >= val.Len() {
+				return fmt.Errorf("end value is out of index: %d of %d", berVal.End, val.Len())
+			}
+			end := berVal.End
+			if end == -1 {
+				end = val.Len()
+			}
+			return val.Slice(berVal.Start, end).Interface()
 		}
+	} else {
+		return fmt.Errorf("invalid evaluation result - %v", berVal)
 	}
-	return nil
 }
 
 //lhs and rhs are non-nil

+ 110 - 1
xsql/plans/project_test.go

@@ -242,6 +242,115 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				},
 			}},
 		},
+
+		{
+			sql: `SELECT a[2:] AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": []map[string]interface{}{
+						{"b": "hello1"},
+						{"b": "hello2"},
+						{"b": "hello3"},
+						{"b": "hello4"},
+						{"b": "hello5"},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"ab": []interface{}{
+					map[string]interface{}{"b": "hello3"},
+					map[string]interface{}{"b": "hello4"},
+					map[string]interface{}{"b": "hello5"},
+				},
+			}},
+		},
+
+		{
+			sql: `SELECT a[2:] AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": []interface{}{
+						true, false, true, false, true, true,
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"ab": []interface{}{
+					true, false, true, true,
+				},
+			}},
+		},
+
+		{
+			sql: `SELECT a[:4] AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": []interface{}{
+						true, false, true, false, true, true,
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"ab": []interface{}{
+					true, false, true, false,
+				},
+			}},
+		},
+
+		{
+			sql: `SELECT a[:4] AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": []interface{}{
+						3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926,
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"ab": []interface{}{
+					3.14, 3.141, 3.1415, 3.14159,
+				},
+			}},
+		},
+
+		{
+			sql: `SELECT a->b[:4] AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": map[string]interface{}{
+						"b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"ab": []interface{}{
+					3.14, 3.141, 3.1415, 3.14159,
+				},
+			}},
+		},
+
+		{
+			sql: `SELECT a->b[0:1] AS ab FROM test`,
+			data: &xsql.Tuple{
+				Emitter: "test",
+				Message: xsql.Message{
+					"a": map[string]interface{}{
+						"b": []float64{3.14, 3.141, 3.1415, 3.14159, 3.141592, 3.1415926},
+					},
+				},
+			},
+			result: []map[string]interface{}{{
+				"ab": []interface{}{
+					3.14,
+				},
+			}},
+		},
+
 		{
 			sql: `SELECT a->c->d AS f1 FROM test`,
 			data: &xsql.Tuple{
@@ -424,7 +533,7 @@ func TestProjectPlan_Apply1(t *testing.T) {
 				t.Errorf("%d. %q\n\nresult mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, tt.result, mapRes)
 			}
 		} else {
-			t.Errorf("%d. The returned result is not type of []byte\n", i)
+			t.Errorf("%d. The returned result %#v is not type of []byte\n", result, i)
 		}
 	}
 }

+ 73 - 20
xstream/extensions/edgex_source.go

@@ -7,6 +7,7 @@ import (
 	"context"
 	"encoding/base64"
 	"encoding/binary"
+	"encoding/json"
 	"fmt"
 	"github.com/edgexfoundry/go-mod-core-contracts/clients"
 	"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
@@ -219,9 +220,61 @@ func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{
 		if r.ValueType == "" {
 			r.ValueType = ot
 		}
-		return es.getFloatValue(r, logger)
+		return es.getFloatValue(r.FloatEncoding, r.Value, r.ValueType, logger)
 	case "STRING":
 		return v, nil
+	case "BOOLARRAY":
+		var val []bool
+		if e := json.Unmarshal([]byte(v), &val); e == nil {
+			return val, nil
+		} else {
+			return nil, e
+		}
+	case "UINT8ARRAY", "UINT16ARRAY", "UINT32ARRAY", "INT8ARRAY", "INT16ARRAY", "INT32ARRAY", "INT64ARRAY":
+		var val []int
+		if e := json.Unmarshal([]byte(v), &val); e == nil {
+			return val, nil
+		} else {
+			return nil, e
+		}
+	case "UINT64ARRAY":
+		var val []uint64
+		if e := json.Unmarshal([]byte(v), &val); e == nil {
+			return val, nil
+		} else {
+			return nil, e
+		}
+	case "FLOAT32ARRAY", "FLOAT64ARRAY":
+		if r.ValueType == "" {
+			r.ValueType = ot
+		}
+		var val1 []string
+		if e := json.Unmarshal([]byte(v), &val1); e == nil {
+			ret := []float64{}
+			for _, v := range val1 {
+				if fv, err := es.getFloatValue(r.FloatEncoding, v, r.ValueType, logger); err != nil {
+					return nil, err
+				} else {
+					if f, ok := fv.(float64); ok {
+						ret = append(ret, f)
+					} else {
+						return nil, fmt.Errorf("The %v is not a float64 type.", f)
+					}
+				}
+			}
+			return ret, nil
+		} else {
+			var val []float64
+			ret := []float64{}
+			if e := json.Unmarshal([]byte(v), &val); e == nil {
+				for _, v := range val {
+					ret = append(ret, v)
+				}
+				return ret, nil
+			} else {
+				return nil, e
+			}
+		}
 	case "BINARY":
 		return nil, fmt.Errorf("Unsupport for binary type, the value will be ignored.")
 	default:
@@ -230,22 +283,22 @@ func (es *EdgexSource) getValue(r models.Reading, logger api.Logger) (interface{
 	}
 }
 
-func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (interface{}, error) {
-	if len(r.FloatEncoding) == 0 {
-		if strings.Contains(r.Value, "=") {
-			r.FloatEncoding = models.Base64Encoding
+func (es *EdgexSource) getFloatValue(FloatEncoding string, Value string, ValueType string, logger api.Logger) (interface{}, error) {
+	if len(FloatEncoding) == 0 {
+		if strings.Contains(Value, "=") {
+			FloatEncoding = models.Base64Encoding
 		} else {
-			r.FloatEncoding = models.ENotation
+			FloatEncoding = models.ENotation
 		}
 	}
-	switch strings.ToLower(r.ValueType) {
-	case strings.ToLower(models.ValueTypeFloat32):
+	switch strings.ToLower(ValueType) {
+	case strings.ToLower(models.ValueTypeFloat32), strings.ToLower(models.ValueTypeFloat32Array):
 		var value float64
-		switch r.FloatEncoding {
+		switch FloatEncoding {
 		case models.Base64Encoding:
-			data, err := base64.StdEncoding.DecodeString(r.Value)
+			data, err := base64.StdEncoding.DecodeString(Value)
 			if err != nil {
-				return false, fmt.Errorf("unable to Base 64 decode float32 value ('%s'): %s", r.Value, err.Error())
+				return false, fmt.Errorf("unable to Base 64 decode float32 value ('%s'): %s", Value, err.Error())
 			}
 			var value1 float32
 			err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value1)
@@ -256,7 +309,7 @@ func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (inter
 		case models.ENotation:
 			var err error
 			var temp float64
-			temp, err = strconv.ParseFloat(r.Value, 64)
+			temp, err = strconv.ParseFloat(Value, 64)
 			if err != nil {
 				return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
 			}
@@ -264,18 +317,18 @@ func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (inter
 			value = float64(temp)
 
 		default:
-			return false, fmt.Errorf("unkown FloatEncoding for float32 value: %s", r.FloatEncoding)
+			return false, fmt.Errorf("unkown FloatEncoding for float32 value: %s", FloatEncoding)
 
 		}
 		return value, nil
 
-	case strings.ToLower(models.ValueTypeFloat64):
+	case strings.ToLower(models.ValueTypeFloat64), strings.ToLower(models.ValueTypeFloat64Array):
 		var value float64
-		switch r.FloatEncoding {
+		switch FloatEncoding {
 		case models.Base64Encoding:
-			data, err := base64.StdEncoding.DecodeString(r.Value)
+			data, err := base64.StdEncoding.DecodeString(Value)
 			if err != nil {
-				return false, fmt.Errorf("unable to Base 64 decode float64 value ('%s'): %s", r.Value, err.Error())
+				return false, fmt.Errorf("unable to Base 64 decode float64 value ('%s'): %s", Value, err.Error())
 			}
 
 			err = binary.Read(bytes.NewReader(data), binary.BigEndian, &value)
@@ -285,16 +338,16 @@ func (es *EdgexSource) getFloatValue(r models.Reading, logger api.Logger) (inter
 			return value, nil
 		case models.ENotation:
 			var err error
-			value, err = strconv.ParseFloat(r.Value, 64)
+			value, err = strconv.ParseFloat(Value, 64)
 			if err != nil {
 				return false, fmt.Errorf("unable to parse Float64 eNotation value: %s", err.Error())
 			}
 			return value, nil
 		default:
-			return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", r.FloatEncoding)
+			return false, fmt.Errorf("unkown FloatEncoding for float64 value: %s", FloatEncoding)
 		}
 	default:
-		return nil, fmt.Errorf("unkown value type: %s, reading:%v", r.ValueType, r)
+		return nil, fmt.Errorf("unkown value type: %s", ValueType)
 	}
 }
 

+ 136 - 16
xstream/extensions/edgex_source_test.go

@@ -3,6 +3,7 @@
 package extensions
 
 import (
+	"encoding/json"
 	"fmt"
 	"github.com/edgexfoundry/go-mod-core-contracts/models"
 	"github.com/emqx/kuiper/common"
@@ -10,22 +11,33 @@ import (
 )
 
 var es = EdgexSource{valueDescs: map[string]string{
-	"b1": "bool",
-	"i1": "int8",
-	"i2": "INT16",
-	"i3": "INT32",
-	"i4": "INT64",
-	"i5": "UINT8",
-	"i6": "UINT16",
-	"i7": "UINT32",
-	"s1": "String",
-	"f1": "Float32", //FLOAT32 will be handled by special case
-	"f2": "Float64", //FLOAT64 will be handled by special case
-	"i8": "UINT64",  //UINT64 will be handled by special case
+	"b1":  "bool",
+	"i1":  "int8",
+	"i2":  "INT16",
+	"i3":  "INT32",
+	"i4":  "INT64",
+	"i5":  "UINT8",
+	"i6":  "UINT16",
+	"i7":  "UINT32",
+	"s1":  "String",
+	"f1":  "Float32", //FLOAT32 will be handled by special case
+	"f2":  "Float64", //FLOAT64 will be handled by special case
+	"i8":  "UINT64",  //UINT64 will be handled by special case
+	"ba":  "BOOLARRAY",
+	"ia1": "INT8ARRAY",
+	"ia2": "INT16ARRAY",
+	"ia3": "INT32ARRAY",
+	"ia4": "INT64ARRAY",
+	"ia5": "UINT8ARRAY",
+	"ia6": "UINT16ARRAY",
+	"ia7": "UINT32ARRAY",
+	"ia8": "UINT64ARRAY",
+	"fa1": "FLOAT32ARRAY",
+	"fa2": "FLOAT64ARRAY",
 },
 }
 
-func TestGetValue_Int(t *testing.T) {
+func TestGetValue_IntFloat(t *testing.T) {
 	var testEvent = models.Event{Device: "test"}
 	for i := 1; i < 8; i++ {
 		r1 := models.Reading{Name: fmt.Sprintf("i%d", i), Value: "1"}
@@ -44,10 +56,25 @@ func TestGetValue_Int(t *testing.T) {
 	if v, e := es.getValue(rf_01, common.Log); e != nil {
 		t.Errorf("%s", e)
 	} else {
-		if v1, ok := v.(float32); ok {
-			if v1 != 1.8516986e+38 {
-				t.Errorf("expected 1.8516986e+38, but it's %f.", v1)
+		if v1, ok := v.(float64); ok {
+			if v1 != 185169860786896613617389922448534667264.000000 {
+				t.Errorf("expected 185169860786896613617389922448534667264.000000, but it's %f.", v1)
 			}
+		} else {
+			t.Errorf("expected float32 type, but it's %T.", v)
+		}
+	}
+
+	rf_02 := models.Reading{Name: "f2", Value: "QAkeuFHrhR8="}
+	if v, e := es.getValue(rf_02, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.(float64); ok {
+			if v1 != 3.14 {
+				t.Errorf("expected 3.14, but it's %f.", v1)
+			}
+		} else {
+			t.Errorf("expected float64 type, but it's %T.", v)
 		}
 	}
 
@@ -61,6 +88,99 @@ func TestGetValue_Int(t *testing.T) {
 			}
 		}
 	}
+
+	r2 := models.Reading{Name: "f1", Value: "3.14"}
+	if v, e := es.getValue(r2, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.(float64); ok {
+			if v1 != 3.14 {
+				t.Errorf("expected 3.14, but it's %f.", v1)
+			}
+		}
+	}
+}
+
+func TestGetValue_IntFloatArr(t *testing.T) {
+	var testEvent = models.Event{Device: "test"}
+	for i := 1; i < 8; i++ {
+		ia := []int{i, i * 2}
+		jsonValue, _ := json.Marshal(ia)
+		r1 := models.Reading{Name: fmt.Sprintf("ia%d", i), Value: string(jsonValue)}
+		testEvent.Readings = append(testEvent.Readings, r1)
+	}
+
+	for i, r := range testEvent.Readings {
+		if v, e := es.getValue(r, common.Log); e != nil {
+			t.Errorf("%s", e)
+		} else {
+			checkArray(t, i, v)
+		}
+	}
+
+	r1 := models.Reading{Name: "ia8", Value: string(`[10796529505058023104, 10796529505058023105]`)}
+	testEvent.Readings = append(testEvent.Readings, r1)
+	if v, e := es.getValue(r1, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.([]uint64); ok {
+			if v1[0] != 10796529505058023104 || v1[1] != 10796529505058023105 {
+				t.Errorf("Failed, the array value is not correct %v.", v1)
+			}
+		} else {
+			t.Errorf("expected uint64 array type, but it's %T.", v1)
+		}
+	}
+
+	rf_00 := models.Reading{Name: "fa1", Value: `[3.14, 2.71828]`}
+	if v, e := es.getValue(rf_00, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.([]float64); ok {
+			if v1[0] != 3.14 || v1[1] != 2.71828 {
+				t.Errorf("expected 3.14 & 2.71828, but it's %v.", v1)
+			}
+		} else {
+			t.Errorf("expected float32 array type, but it's %T.", v)
+		}
+	}
+
+	rf_01 := models.Reading{Name: "fa1", Value: `["fwtOaw==","fwtOaw=="]`}
+	if v, e := es.getValue(rf_01, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.([]float64); ok {
+			if v1[0] != 185169860786896613617389922448534667264.000000 || v1[1] != 185169860786896613617389922448534667264.000000 {
+				t.Errorf("expected 185169860786896613617389922448534667264.000000, but it's %v.", v1)
+			}
+		} else {
+			t.Errorf("expected float64 array type, but it's %T.", v)
+		}
+	}
+
+	rf_02 := models.Reading{Name: "fa2", Value: `["QAkeuFHrhR8=","QAW/CZWq95A="]`}
+	if v, e := es.getValue(rf_02, common.Log); e != nil {
+		t.Errorf("%s", e)
+	} else {
+		if v1, ok := v.([]float64); ok {
+			if v1[0] != 3.14 || v1[1] != 2.71828 {
+				t.Errorf("expected 3.14 and 2.71828, but it's %v.", v1)
+			}
+		} else {
+			t.Errorf("expected float64 array type, but it's %T.", v)
+		}
+	}
+}
+
+func checkArray(t *testing.T, index int, val interface{}) {
+	if v1, ok := val.([]int); ok {
+		newIdx := index + 1
+		if v1[0] != newIdx || v1[1] != newIdx*2 {
+			t.Errorf("Failed, the array value is not correct %v.", v1)
+		}
+	} else {
+		t.Errorf("expected int array type, but it's %T.", val)
+	}
 }
 
 func expectOne(t *testing.T, expected interface{}) {